update vendor from go.mod

This commit is contained in:
James Bardin 2018-12-05 14:27:05 -05:00
parent bf71b89a78
commit 1178c799b6
22 changed files with 721 additions and 223 deletions

View File

@ -109,7 +109,7 @@ high-level steps that must be done. Examples are available in the
1. Choose the interface(s) you want to expose for plugins.
2. For each interface, implement an implementation of that interface
that communicates over a `net/rpc` connection or other a
that communicates over a `net/rpc` connection or over a
[gRPC](http://www.grpc.io) connection or both. You'll have to implement
both a client and server implementation.
@ -150,19 +150,19 @@ user experience.
When we started using plugins (late 2012, early 2013), plugins over RPC
were the only option since Go didn't support dynamic library loading. Today,
Go still doesn't support dynamic library loading, but they do intend to.
Since 2012, our plugin system has stabilized from millions of users using it,
and has many benefits we've come to value greatly.
Go supports the [plugin](https://golang.org/pkg/plugin/) standard library with
a number of limitations. Since 2012, our plugin system has stabilized
from tens of millions of users using it, and has many benefits we've come to
value greatly.
For example, we intend to use this plugin system in
[Vault](https://www.vaultproject.io), and dynamic library loading will
simply never be acceptable in Vault for security reasons. That is an extreme
For example, we use this plugin system in
[Vault](https://www.vaultproject.io) where dynamic library loading is
not acceptable for security reasons. That is an extreme
example, but we believe our library system has more upsides than downsides
over dynamic library loading and since we've had it built and tested for years,
we'll likely continue to use it.
we'll continue to use it.
Shared libraries have one major advantage over our system which is much
higher performance. In real world scenarios across our various tools,
we've never required any more performance out of our plugin system and it
has seen very high throughput, so this isn't a concern for us at the moment.

View File

@ -5,6 +5,8 @@ import (
"context"
"crypto/subtle"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"hash"
@ -170,6 +172,29 @@ type ClientConfig struct {
// Logger is the logger that the client will used. If none is provided,
// it will default to hclog's default logger.
Logger hclog.Logger
// AutoMTLS has the client and server automatically negotiate mTLS for
// transport authentication. This ensures that only the original client will
// be allowed to connect to the server, and all other connections will be
// rejected. The client will also refuse to connect to any server that isn't
// the original instance started by the client.
//
// In this mode of operation, the client generates a one-time use tls
// certificate, sends the public x.509 certificate to the new server, and
// the server generates a one-time use tls certificate, and sends the public
// x.509 certificate back to the client. These are used to authenticate all
// rpc connections between the client and server.
//
// Setting AutoMTLS to true implies that the server must support the
// protocol, and correctly negotiate the tls certificates, or a connection
// failure will result.
//
// The client should not set TLSConfig, nor should the server set a
// TLSProvider, because AutoMTLS implies that a new certificate and tls
// configuration will be generated at startup.
//
// You cannot Reattach to a server with this option enabled.
AutoMTLS bool
}
// ReattachConfig is used to configure a client to reattach to an
@ -400,11 +425,9 @@ func (c *Client) Kill() {
if graceful {
select {
case <-doneCh:
// FIXME: this is never reached under normal circumstances, because
// the plugin process is never signaled to exit. We can reach this
// if the child process exited abnormally before the Kill call.
c.logger.Debug("plugin exited")
return
case <-time.After(250 * time.Millisecond):
case <-time.After(1500 * time.Millisecond):
c.logger.Warn("plugin failed to exit gracefully")
}
}
@ -527,15 +550,19 @@ func (c *Client) Start() (addr net.Addr, err error) {
fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
}
stdout_r, stdout_w := io.Pipe()
stderr_r, stderr_w := io.Pipe()
cmd := c.config.Cmd
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Stdin = os.Stdin
cmd.Stderr = stderr_w
cmd.Stdout = stdout_w
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
if c.config.SecureConfig != nil {
if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
@ -545,6 +572,29 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}
// Setup a temporary certificate for client/server mtls, and send the public
// certificate to the plugin.
if c.config.AutoMTLS {
c.logger.Info("configuring client automatic mTLS")
certPEM, keyPEM, err := generateCert()
if err != nil {
c.logger.Error("failed to generate client certificate", "error", err)
return nil, err
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
c.logger.Error("failed to parse client certificate", "error", err)
return nil, err
}
cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
c.config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "localhost",
}
}
c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
err = cmd.Start()
if err != nil {
@ -571,20 +621,20 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Start goroutine to wait for process to exit
exitCh := make(chan struct{})
go func() {
// Make sure we close the write end of our stderr/stdout so
// that the readers send EOF properly.
defer stderr_w.Close()
defer stdout_w.Close()
// ensure the context is cancelled when we're done
defer ctxCancel()
// get the cmd info early, since the process information will be removed
// in Kill.
pid := c.process.Pid
path := cmd.Path
// Wait for the command to end.
err := cmd.Wait()
debugMsgArgs := []interface{}{
"path", cmd.Path,
"pid", c.process.Pid,
"path", path,
"pid", pid,
}
if err != nil {
debugMsgArgs = append(debugMsgArgs,
@ -605,32 +655,25 @@ func (c *Client) Start() (addr net.Addr, err error) {
}()
// Start goroutine that logs the stderr
go c.logStderr(stderr_r)
go c.logStderr(cmdStderr)
// Start a goroutine that is going to be reading the lines
// out of stdout
linesCh := make(chan []byte)
linesCh := make(chan string)
go func() {
defer close(linesCh)
buf := bufio.NewReader(stdout_r)
for {
line, err := buf.ReadBytes('\n')
if line != nil {
linesCh <- line
}
if err == io.EOF {
return
}
scanner := bufio.NewScanner(cmdStdout)
for scanner.Scan() {
linesCh <- scanner.Text()
}
}()
// Make sure after we exit we read the lines from stdout forever
// so they don't block since it is an io.Pipe
// so they don't block since it is a pipe.
defer func() {
go func() {
for _ = range linesCh {
for range linesCh {
}
}()
}()
@ -645,10 +688,10 @@ func (c *Client) Start() (addr net.Addr, err error) {
err = errors.New("timeout while waiting for plugin to start")
case <-exitCh:
err = errors.New("plugin exited before we could connect")
case lineBytes := <-linesCh:
case line := <-linesCh:
// Trim the line and split by "|" in order to get the parts of
// the output.
line := strings.TrimSpace(string(lineBytes))
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "|", 6)
if len(parts) < 4 {
err = fmt.Errorf(
@ -718,12 +761,42 @@ func (c *Client) Start() (addr net.Addr, err error) {
return addr, err
}
// See if we have a TLS certificate from the server.
// Checking if the length is > 50 rules out catching the unused "extra"
// data returned from some older implementations.
if len(parts) >= 6 && len(parts[5]) > 50 {
err := c.loadServerCert(parts[5])
if err != nil {
return nil, fmt.Errorf("error parsing server cert: %s", err)
}
}
}
c.address = addr
return
}
// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
// server, and load it as the RootCA for the client TLSConfig.
func (c *Client) loadServerCert(cert string) error {
certPool := x509.NewCertPool()
asn1, err := base64.RawStdEncoding.DecodeString(cert)
if err != nil {
return err
}
x509Cert, err := x509.ParseCertificate([]byte(asn1))
if err != nil {
return err
}
certPool.AddCert(x509Cert)
c.config.TLSConfig.RootCAs = certPool
return nil
}
// checkProtoVersion returns the negotiated version and PluginSet.
// This returns an error if the server returned an incompatible protocol
// version, or an invalid handshake response.
@ -829,43 +902,40 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
}
func (c *Client) logStderr(r io.Reader) {
bufR := bufio.NewReader(r)
defer close(c.doneLogging)
scanner := bufio.NewScanner(r)
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
for {
line, err := bufR.ReadString('\n')
if line != "" {
c.config.Stderr.Write([]byte(line))
line = strings.TrimRightFunc(line, unicode.IsSpace)
for scanner.Scan() {
line := scanner.Text()
c.config.Stderr.Write([]byte(line + "\n"))
line = strings.TrimRightFunc(line, unicode.IsSpace)
entry, err := parseJSON(line)
// If output is not JSON format, print directly to Debug
if err != nil {
l.Debug(line)
} else {
out := flattenKVPairs(entry.KVPairs)
entry, err := parseJSON(line)
// If output is not JSON format, print directly to Debug
if err != nil {
l.Debug(line)
} else {
out := flattenKVPairs(entry.KVPairs)
out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
switch hclog.LevelFromString(entry.Level) {
case hclog.Trace:
l.Trace(entry.Message, out...)
case hclog.Debug:
l.Debug(entry.Message, out...)
case hclog.Info:
l.Info(entry.Message, out...)
case hclog.Warn:
l.Warn(entry.Message, out...)
case hclog.Error:
l.Error(entry.Message, out...)
}
out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
switch hclog.LevelFromString(entry.Level) {
case hclog.Trace:
l.Trace(entry.Message, out...)
case hclog.Debug:
l.Debug(entry.Message, out...)
case hclog.Info:
l.Info(entry.Message, out...)
case hclog.Warn:
l.Warn(entry.Message, out...)
case hclog.Error:
l.Error(entry.Message, out...)
}
}
if err == io.EOF {
break
}
}
// Flag that we've completed logging for others
close(c.doneLogging)
if err := scanner.Err(); err != nil {
l.Error("reading plugin stderr", "error", err)
}
}

13
vendor/github.com/hashicorp/go-plugin/go.mod generated vendored Normal file
View File

@ -0,0 +1,13 @@
module github.com/hashicorp/go-plugin
require (
github.com/golang/protobuf v1.2.0
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77
github.com/oklog/run v1.0.0
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
golang.org/x/text v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
google.golang.org/grpc v1.14.0
)

18
vendor/github.com/hashicorp/go-plugin/go.sum generated vendored Normal file
View File

@ -0,0 +1,18 @@
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd h1:rNuUHR+CvK1IS89MMtcF0EpcVMZtjKfPRp4MEmt/aTs=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb h1:b5rjCoWHc7eqmAS4/qyk21ZsHyb6Mxv/jykxvNTkU4M=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0 h1:ArxJuB1NWfPY6r9Gp9gqwplT0Ge7nqv9msgu03lHLmo=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=

View File

@ -11,6 +11,8 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-plugin/internal/proto"
"github.com/oklog/run"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -19,14 +21,14 @@ import (
// streamer interface is used in the broker to send/receive connection
// information.
type streamer interface {
Send(*ConnInfo) error
Recv() (*ConnInfo, error)
Send(*proto.ConnInfo) error
Recv() (*proto.ConnInfo, error)
Close()
}
// sendErr is used to pass errors back during a send.
type sendErr struct {
i *ConnInfo
i *proto.ConnInfo
ch chan error
}
@ -38,7 +40,7 @@ type gRPCBrokerServer struct {
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
recv chan *proto.ConnInfo
// quit closes down the stream.
quit chan struct{}
@ -50,7 +52,7 @@ type gRPCBrokerServer struct {
func newGRPCBrokerServer() *gRPCBrokerServer {
return &gRPCBrokerServer{
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
recv: make(chan *proto.ConnInfo),
quit: make(chan struct{}),
}
}
@ -58,7 +60,7 @@ func newGRPCBrokerServer() *gRPCBrokerServer {
// StartStream implements the GRPCBrokerServer interface and will block until
// the quit channel is closed or the context reports Done. The stream will pass
// connection information to/from the client.
func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
func (s *gRPCBrokerServer) StartStream(stream proto.GRPCBroker_StartStreamServer) error {
doneCh := stream.Context().Done()
defer s.Close()
@ -97,7 +99,7 @@ func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) erro
// Send is used by the GRPCBroker to pass connection information into the stream
// to the client.
func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
func (s *gRPCBrokerServer) Send(i *proto.ConnInfo) error {
ch := make(chan error)
defer close(ch)
@ -115,7 +117,7 @@ func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the client from the stream to the broker.
func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
func (s *gRPCBrokerServer) Recv() (*proto.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
@ -136,13 +138,13 @@ func (s *gRPCBrokerServer) Close() {
// streamer interfaces.
type gRPCBrokerClientImpl struct {
// client is the underlying GRPC client used to make calls to the server.
client GRPCBrokerClient
client proto.GRPCBrokerClient
// send is used to send connection info to the gRPC stream.
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
recv chan *proto.ConnInfo
// quit closes down the stream.
quit chan struct{}
@ -153,9 +155,9 @@ type gRPCBrokerClientImpl struct {
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
return &gRPCBrokerClientImpl{
client: NewGRPCBrokerClient(conn),
client: proto.NewGRPCBrokerClient(conn),
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
recv: make(chan *proto.ConnInfo),
quit: make(chan struct{}),
}
}
@ -207,7 +209,7 @@ func (s *gRPCBrokerClientImpl) StartStream() error {
// Send is used by the GRPCBroker to pass connection information into the stream
// to the plugin.
func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
func (s *gRPCBrokerClientImpl) Send(i *proto.ConnInfo) error {
ch := make(chan error)
defer close(ch)
@ -225,7 +227,7 @@ func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the plugin to the broker.
func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
func (s *gRPCBrokerClientImpl) Recv() (*proto.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
@ -266,7 +268,7 @@ type GRPCBroker struct {
}
type gRPCBrokerPending struct {
ch chan *ConnInfo
ch chan *proto.ConnInfo
doneCh chan struct{}
}
@ -288,7 +290,7 @@ func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
return nil, err
}
err = b.streamer.Send(&ConnInfo{
err = b.streamer.Send(&proto.ConnInfo{
ServiceId: id,
Network: listener.Addr().Network(),
Address: listener.Addr().String(),
@ -363,7 +365,7 @@ func (b *GRPCBroker) Close() error {
// Dial opens a connection by ID.
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
var c *ConnInfo
var c *proto.ConnInfo
// Open the stream
p := b.getStream(id)
@ -433,7 +435,7 @@ func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
}
m.streams[id] = &gRPCBrokerPending{
ch: make(chan *ConnInfo, 1),
ch: make(chan *proto.ConnInfo, 1),
doneCh: make(chan struct{}),
}
return m.streams[id]

View File

@ -6,6 +6,7 @@ import (
"net"
"time"
"github.com/hashicorp/go-plugin/internal/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -16,12 +17,9 @@ func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn,
// Build dialing options.
opts := make([]grpc.DialOption, 0, 5)
// We use a custom dialer so that we can connect over unix domain sockets
// We use a custom dialer so that we can connect over unix domain sockets.
opts = append(opts, grpc.WithDialer(dialer))
// go-plugin expects to block the connection
opts = append(opts, grpc.WithBlock())
// Fail right away
opts = append(opts, grpc.FailOnNonTempDialError(true))
@ -58,12 +56,15 @@ func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
go broker.Run()
go brokerGRPCClient.StartStream()
return &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
}, nil
cl := &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
controller: proto.NewGRPCControllerClient(conn),
}
return cl, nil
}
// GRPCClient connects to a GRPCServer over gRPC to dispense plugin types.
@ -73,11 +74,14 @@ type GRPCClient struct {
doneCtx context.Context
broker *GRPCBroker
controller proto.GRPCControllerClient
}
// ClientProtocol impl.
func (c *GRPCClient) Close() error {
c.broker.Close()
c.controller.Shutdown(c.doneCtx, &proto.Empty{})
return c.Conn.Close()
}

View File

@ -0,0 +1,23 @@
package plugin
import (
"context"
"github.com/hashicorp/go-plugin/internal/proto"
)
// GRPCControllerServer handles shutdown calls to terminate the server when the
// plugin client is closed.
type grpcControllerServer struct {
server *GRPCServer
}
// Shutdown stops the grpc server. It first will attempt a graceful stop, then a
// full stop on the server.
func (s *grpcControllerServer) Shutdown(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
resp := &proto.Empty{}
// TODO: figure out why GracefullStop doesn't work.
s.server.Stop()
return resp, nil
}

View File

@ -8,6 +8,8 @@ import (
"io"
"net"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
@ -52,6 +54,8 @@ type GRPCServer struct {
config GRPCServerConfig
server *grpc.Server
broker *GRPCBroker
logger hclog.Logger
}
// ServerProtocol impl.
@ -71,10 +75,16 @@ func (s *GRPCServer) Init() error {
// Register the broker service
brokerServer := newGRPCBrokerServer()
RegisterGRPCBrokerServer(s.server, brokerServer)
proto.RegisterGRPCBrokerServer(s.server, brokerServer)
s.broker = newGRPCBroker(brokerServer, s.TLS)
go s.broker.Run()
// Register the controller
controllerServer := &grpcControllerServer{
server: s,
}
proto.RegisterGRPCControllerServer(s.server, controllerServer)
// Register all our plugins onto the gRPC server.
for k, raw := range s.Plugins {
p, ok := raw.(GRPCPlugin)
@ -83,7 +93,7 @@ func (s *GRPCServer) Init() error {
}
if err := p.GRPCServer(s.broker, s.server); err != nil {
return fmt.Errorf("error registring %q: %s", k, err)
return fmt.Errorf("error registering %q: %s", k, err)
}
}
@ -117,11 +127,11 @@ func (s *GRPCServer) Config() string {
}
func (s *GRPCServer) Serve(lis net.Listener) {
// Start serving in a goroutine
go s.server.Serve(lis)
// Wait until graceful completion
<-s.DoneCh
defer close(s.DoneCh)
err := s.server.Serve(lis)
if err != nil {
s.logger.Error("grpc server", "error", err)
}
}
// GRPCServerConfig is the extra configuration passed along for consumers

View File

@ -0,0 +1,3 @@
//go:generate protoc -I ./ ./grpc_broker.proto ./grpc_controller.proto --go_out=plugins=grpc:.
package proto

View File

@ -1,24 +1,14 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc_broker.proto
/*
Package plugin is a generated protocol buffer package.
It is generated from these files:
grpc_broker.proto
It has these top-level messages:
ConnInfo
*/
package plugin
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -33,15 +23,38 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ConnInfo struct {
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"`
Network string `protobuf:"bytes,2,opt,name=network" json:"network,omitempty"`
Address string `protobuf:"bytes,3,opt,name=address" json:"address,omitempty"`
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
Network string `protobuf:"bytes,2,opt,name=network,proto3" json:"network,omitempty"`
Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
func (*ConnInfo) ProtoMessage() {}
func (*ConnInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
func (*ConnInfo) ProtoMessage() {}
func (*ConnInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_802e9beed3ec3b28, []int{0}
}
func (m *ConnInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ConnInfo.Unmarshal(m, b)
}
func (m *ConnInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ConnInfo.Marshal(b, m, deterministic)
}
func (m *ConnInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_ConnInfo.Merge(m, src)
}
func (m *ConnInfo) XXX_Size() int {
return xxx_messageInfo_ConnInfo.Size(m)
}
func (m *ConnInfo) XXX_DiscardUnknown() {
xxx_messageInfo_ConnInfo.DiscardUnknown(m)
}
var xxx_messageInfo_ConnInfo proto.InternalMessageInfo
func (m *ConnInfo) GetServiceId() uint32 {
if m != nil {
@ -65,7 +78,24 @@ func (m *ConnInfo) GetAddress() string {
}
func init() {
proto.RegisterType((*ConnInfo)(nil), "plugin.ConnInfo")
proto.RegisterType((*ConnInfo)(nil), "proto.ConnInfo")
}
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor_802e9beed3ec3b28) }
var fileDescriptor_802e9beed3ec3b28 = []byte{
// 164 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05,
0x53, 0x4a, 0xb1, 0x5c, 0x1c, 0xce, 0xf9, 0x79, 0x79, 0x9e, 0x79, 0x69, 0xf9, 0x42, 0xb2, 0x5c,
0x5c, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0xf1, 0x99, 0x29, 0x12, 0x8c, 0x0a, 0x8c, 0x1a,
0xbc, 0x41, 0x9c, 0x50, 0x11, 0xcf, 0x14, 0x21, 0x09, 0x2e, 0xf6, 0xbc, 0xd4, 0x92, 0xf2, 0xfc,
0xa2, 0x6c, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x18, 0x17, 0x24, 0x93, 0x98, 0x92, 0x52,
0x94, 0x5a, 0x5c, 0x2c, 0xc1, 0x0c, 0x91, 0x81, 0x72, 0x8d, 0x1c, 0xb9, 0xb8, 0xdc, 0x83, 0x02,
0x9c, 0x9d, 0xc0, 0x36, 0x0b, 0x19, 0x73, 0x71, 0x07, 0x97, 0x24, 0x16, 0x95, 0x04, 0x97, 0x14,
0xa5, 0x26, 0xe6, 0x0a, 0xf1, 0x43, 0x9c, 0xa2, 0x07, 0x73, 0x80, 0x14, 0xba, 0x80, 0x06, 0xa3,
0x01, 0x63, 0x12, 0x1b, 0x58, 0xcc, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x7a, 0xda, 0xd5, 0x84,
0xc4, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -76,8 +106,9 @@ var _ grpc.ClientConn
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for GRPCBroker service
// GRPCBrokerClient is the client API for GRPCBroker service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GRPCBrokerClient interface {
StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error)
}
@ -91,7 +122,7 @@ func NewGRPCBrokerClient(cc *grpc.ClientConn) GRPCBrokerClient {
}
func (c *gRPCBrokerClient) StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], c.cc, "/plugin.GRPCBroker/StartStream", opts...)
stream, err := c.cc.NewStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], "/proto.GRPCBroker/StartStream", opts...)
if err != nil {
return nil, err
}
@ -121,8 +152,7 @@ func (x *gRPCBrokerStartStreamClient) Recv() (*ConnInfo, error) {
return m, nil
}
// Server API for GRPCBroker service
// GRPCBrokerServer is the server API for GRPCBroker service.
type GRPCBrokerServer interface {
StartStream(GRPCBroker_StartStreamServer) error
}
@ -158,7 +188,7 @@ func (x *gRPCBrokerStartStreamServer) Recv() (*ConnInfo, error) {
}
var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
ServiceName: "plugin.GRPCBroker",
ServiceName: "proto.GRPCBroker",
HandlerType: (*GRPCBrokerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
@ -171,20 +201,3 @@ var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
},
Metadata: "grpc_broker.proto",
}
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 170 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b,
0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x53, 0x8a, 0xe5, 0xe2, 0x70, 0xce, 0xcf, 0xcb, 0xf3, 0xcc, 0x4b,
0xcb, 0x17, 0x92, 0xe5, 0xe2, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x8d, 0xcf, 0x4c, 0x91,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0d, 0xe2, 0x84, 0x8a, 0x78, 0xa6, 0x08, 0x49, 0x70, 0xb1, 0xe7,
0xa5, 0x96, 0x94, 0xe7, 0x17, 0x65, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x20,
0x99, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0x62, 0x09, 0x66, 0x88, 0x0c, 0x94, 0x6b, 0xe4, 0xcc,
0xc5, 0xe5, 0x1e, 0x14, 0xe0, 0xec, 0x04, 0xb6, 0x5a, 0xc8, 0x94, 0x8b, 0x3b, 0xb8, 0x24, 0xb1,
0xa8, 0x24, 0xb8, 0xa4, 0x28, 0x35, 0x31, 0x57, 0x48, 0x40, 0x0f, 0xe2, 0x08, 0x3d, 0x98, 0x0b,
0xa4, 0x30, 0x44, 0x34, 0x18, 0x0d, 0x18, 0x93, 0xd8, 0xc0, 0x4e, 0x36, 0x06, 0x04, 0x00, 0x00,
0xff, 0xff, 0x7b, 0x5d, 0xfb, 0xe1, 0xc7, 0x00, 0x00, 0x00,
}

View File

@ -1,5 +1,5 @@
syntax = "proto3";
package plugin;
package proto;
message ConnInfo {
uint32 service_id = 1;

View File

@ -0,0 +1,143 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc_controller.proto
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Empty struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) {
return fileDescriptor_23c2c7e42feab570, []int{0}
}
func (m *Empty) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Empty.Unmarshal(m, b)
}
func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Empty.Marshal(b, m, deterministic)
}
func (m *Empty) XXX_Merge(src proto.Message) {
xxx_messageInfo_Empty.Merge(m, src)
}
func (m *Empty) XXX_Size() int {
return xxx_messageInfo_Empty.Size(m)
}
func (m *Empty) XXX_DiscardUnknown() {
xxx_messageInfo_Empty.DiscardUnknown(m)
}
var xxx_messageInfo_Empty proto.InternalMessageInfo
func init() {
proto.RegisterType((*Empty)(nil), "proto.Empty")
}
func init() { proto.RegisterFile("grpc_controller.proto", fileDescriptor_23c2c7e42feab570) }
var fileDescriptor_23c2c7e42feab570 = []byte{
// 97 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4d, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0xce, 0xcf, 0x2b, 0x29, 0xca, 0xcf, 0xc9, 0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f,
0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xec, 0x5c, 0xac, 0xae, 0xb9, 0x05, 0x25, 0x95, 0x46, 0x16,
0x5c, 0x7c, 0xee, 0x41, 0x01, 0xce, 0xce, 0x70, 0x75, 0x42, 0x6a, 0x5c, 0x1c, 0xc1, 0x19, 0xa5,
0x25, 0x29, 0xf9, 0xe5, 0x79, 0x42, 0x3c, 0x10, 0x5d, 0x7a, 0x60, 0xb5, 0x52, 0x28, 0xbc, 0x24,
0x36, 0x30, 0xc7, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x69, 0xa1, 0xad, 0x79, 0x69, 0x00, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// GRPCControllerClient is the client API for GRPCController service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GRPCControllerClient interface {
Shutdown(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
}
type gRPCControllerClient struct {
cc *grpc.ClientConn
}
func NewGRPCControllerClient(cc *grpc.ClientConn) GRPCControllerClient {
return &gRPCControllerClient{cc}
}
func (c *gRPCControllerClient) Shutdown(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.GRPCController/Shutdown", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GRPCControllerServer is the server API for GRPCController service.
type GRPCControllerServer interface {
Shutdown(context.Context, *Empty) (*Empty, error)
}
func RegisterGRPCControllerServer(s *grpc.Server, srv GRPCControllerServer) {
s.RegisterService(&_GRPCController_serviceDesc, srv)
}
func _GRPCController_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GRPCControllerServer).Shutdown(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.GRPCController/Shutdown",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GRPCControllerServer).Shutdown(ctx, req.(*Empty))
}
return interceptor(ctx, in, info, handler)
}
var _GRPCController_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.GRPCController",
HandlerType: (*GRPCControllerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Shutdown",
Handler: _GRPCController_Shutdown_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "grpc_controller.proto",
}

View File

@ -0,0 +1,10 @@
syntax = "proto3";
package proto;
message Empty {
}
// The GRPCController is responsible for telling the plugin server to shutdown.
service GRPCController {
rpc Shutdown(Empty) returns (Empty);
}

73
vendor/github.com/hashicorp/go-plugin/mtls.go generated vendored Normal file
View File

@ -0,0 +1,73 @@
package plugin
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"time"
)
// generateCert generates a temporary certificate for plugin authentication. The
// certificate and private key are returns in PEM format.
func generateCert() (cert []byte, privateKey []byte, err error) {
key, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader)
if err != nil {
return nil, nil, err
}
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
sn, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, nil, err
}
host := "localhost"
template := &x509.Certificate{
Subject: pkix.Name{
CommonName: host,
Organization: []string{"HashiCorp"},
},
DNSNames: []string{host},
ExtKeyUsage: []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
x509.ExtKeyUsageServerAuth,
},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageKeyAgreement | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
SerialNumber: sn,
NotBefore: time.Now().Add(-30 * time.Second),
NotAfter: time.Now().Add(262980 * time.Hour),
IsCA: true,
}
der, err := x509.CreateCertificate(rand.Reader, template, template, key.Public(), key)
if err != nil {
return nil, nil, err
}
var certOut bytes.Buffer
if err := pem.Encode(&certOut, &pem.Block{Type: "CERTIFICATE", Bytes: der}); err != nil {
return nil, nil, err
}
keyBytes, err := x509.MarshalECPrivateKey(key)
if err != nil {
return nil, nil, err
}
var keyOut bytes.Buffer
if err := pem.Encode(&keyOut, &pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}); err != nil {
return nil, nil, err
}
cert = certOut.Bytes()
privateKey = keyOut.Bytes()
return cert, privateKey, nil
}

View File

@ -2,6 +2,7 @@ package plugin
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
@ -242,6 +243,41 @@ func Serve(opts *ServeConfig) {
}
}
var serverCert string
clientCert := os.Getenv("PLUGIN_CLIENT_CERT")
// If the client is configured using AutoMTLS, the certificate will be here,
// and we need to generate our own in response.
if tlsConfig == nil && clientCert != "" {
logger.Info("configuring server automatic mTLS")
clientCertPool := x509.NewCertPool()
if !clientCertPool.AppendCertsFromPEM([]byte(clientCert)) {
logger.Error("client cert provided but failed to parse", "cert", clientCert)
}
certPEM, keyPEM, err := generateCert()
if err != nil {
logger.Error("failed to generate client certificate", "error", err)
panic(err)
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
logger.Error("failed to parse client certificate", "error", err)
panic(err)
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: clientCertPool,
MinVersion: tls.VersionTLS12,
}
// We send back the raw leaf cert data for the client rather than the
// PEM, since the protocol can't handle newlines.
serverCert = base64.RawStdEncoding.EncodeToString(cert.Certificate[0])
}
// Create the channel to tell us when we're done
doneCh := make(chan struct{})
@ -272,6 +308,7 @@ func Serve(opts *ServeConfig) {
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
logger: logger,
}
default:
@ -284,25 +321,16 @@ func Serve(opts *ServeConfig) {
return
}
// Build the extra configuration
extra := ""
if v := server.Config(); v != "" {
extra = base64.StdEncoding.EncodeToString([]byte(v))
}
if extra != "" {
extra = "|" + extra
}
logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String())
// Output the address and service name to stdout so that the client can bring it up.
fmt.Printf("%d|%d|%s|%s|%s%s\n",
fmt.Printf("%d|%d|%s|%s|%s|%s\n",
CoreProtocolVersion,
protoVersion,
listener.Addr().Network(),
listener.Addr().String(),
protoType,
extra)
serverCert)
os.Stdout.Sync()
// Eat the interrupts

View File

@ -7,6 +7,8 @@ import (
"net"
"net/rpc"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/proto"
"github.com/mitchellh/go-testing-interface"
"google.golang.org/grpc"
)
@ -140,9 +142,11 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
// Start up the server
server := &GRPCServer{
Plugins: ps,
DoneCh: make(chan struct{}),
Server: DefaultGRPCServer,
Stdout: new(bytes.Buffer),
Stderr: new(bytes.Buffer),
logger: hclog.Default(),
}
if err := server.Init(); err != nil {
t.Fatalf("err: %s", err)
@ -165,10 +169,11 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
// Create the client
client := &GRPCClient{
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
controller: proto.NewGRPCControllerClient(conn),
}
return client, server

View File

@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool {
}
}
// CloseChan returns a read-only channel which is closed as
// soon as the session is closed.
func (s *Session) CloseChan() <-chan struct{} {
return s.shutdownCh
}
// NumStreams returns the number of currently open streams
func (s *Session) NumStreams() int {
s.streamLock.Lock()
@ -303,8 +309,10 @@ func (s *Session) keepalive() {
case <-time.After(s.config.KeepAliveInterval):
_, err := s.Ping()
if err != nil {
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
s.exitErr(ErrKeepAliveTimeout)
if err != ErrSessionShutdown {
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
s.exitErr(ErrKeepAliveTimeout)
}
return
}
case <-s.shutdownCh:
@ -323,8 +331,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
// potential shutdown. Since there's the expectation that sends can happen
// in a timely manner, we enforce the connection write timeout here.
func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
defer timer.Stop()
t := timerPool.Get()
timer := t.(*time.Timer)
timer.Reset(s.config.ConnectionWriteTimeout)
defer func() {
timer.Stop()
select {
case <-timer.C:
default:
}
timerPool.Put(t)
}()
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
select {
@ -349,8 +366,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
// the send happens right here, we enforce the connection write timeout if we
// can't queue the header to be sent.
func (s *Session) sendNoWait(hdr header) error {
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
defer timer.Stop()
t := timerPool.Get()
timer := t.(*time.Timer)
timer.Reset(s.config.ConnectionWriteTimeout)
defer func() {
timer.Stop()
select {
case <-timer.C:
default:
}
timerPool.Put(t)
}()
select {
case s.sendCh <- sendReady{Hdr: hdr}:
@ -408,11 +434,20 @@ func (s *Session) recv() {
}
}
// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
var (
handlers = []func(*Session, header) error{
typeData: (*Session).handleStreamMessage,
typeWindowUpdate: (*Session).handleStreamMessage,
typePing: (*Session).handlePing,
typeGoAway: (*Session).handleGoAway,
}
)
// recvLoop continues to receive data until a fatal error is encountered
func (s *Session) recvLoop() error {
defer close(s.recvDoneCh)
hdr := header(make([]byte, headerSize))
var handler func(header) error
for {
// Read the header
if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
@ -428,22 +463,12 @@ func (s *Session) recvLoop() error {
return ErrInvalidVersion
}
// Switch on the type
switch hdr.MsgType() {
case typeData:
handler = s.handleStreamMessage
case typeWindowUpdate:
handler = s.handleStreamMessage
case typeGoAway:
handler = s.handleGoAway
case typePing:
handler = s.handlePing
default:
mt := hdr.MsgType()
if mt < typeData || mt > typeGoAway {
return ErrInvalidMsgType
}
// Invoke the handler
if err := handler(hdr); err != nil {
if err := handlers[mt](s, hdr); err != nil {
return err
}
}

View File

@ -47,8 +47,8 @@ type Stream struct {
recvNotifyCh chan struct{}
sendNotifyCh chan struct{}
readDeadline time.Time
writeDeadline time.Time
readDeadline atomic.Value // time.Time
writeDeadline atomic.Value // time.Time
}
// newStream is used to construct a new stream within
@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
}
s.readDeadline.Store(time.Time{})
s.writeDeadline.Store(time.Time{})
return s
}
@ -122,8 +124,9 @@ START:
WAIT:
var timeout <-chan time.Time
var timer *time.Timer
if !s.readDeadline.IsZero() {
delay := s.readDeadline.Sub(time.Now())
readDeadline := s.readDeadline.Load().(time.Time)
if !readDeadline.IsZero() {
delay := readDeadline.Sub(time.Now())
timer = time.NewTimer(delay)
timeout = timer.C
}
@ -188,7 +191,7 @@ START:
// Send the header
s.sendHdr.encode(typeData, flags, s.id, max)
if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
return 0, err
}
@ -200,8 +203,9 @@ START:
WAIT:
var timeout <-chan time.Time
if !s.writeDeadline.IsZero() {
delay := s.writeDeadline.Sub(time.Now())
writeDeadline := s.writeDeadline.Load().(time.Time)
if !writeDeadline.IsZero() {
delay := writeDeadline.Sub(time.Now())
timeout = time.After(delay)
}
select {
@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error {
// Determine the delta update
max := s.session.config.MaxStreamWindowSize
delta := max - atomic.LoadUint32(&s.recvWindow)
var bufLen uint32
s.recvLock.Lock()
if s.recvBuf != nil {
bufLen = uint32(s.recvBuf.Len())
}
delta := (max - bufLen) - s.recvWindow
// Determine the flags if any
flags := s.sendFlags()
// Check if we can omit the update
if delta < (max/2) && flags == 0 {
s.recvLock.Unlock()
return nil
}
// Update our window
atomic.AddUint32(&s.recvWindow, delta)
s.recvWindow += delta
s.recvLock.Unlock()
// Send the header
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
if length == 0 {
return nil
}
if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
return ErrRecvWindowExceeded
}
// Wrap in a limited reader
conn = &io.LimitedReader{R: conn, N: int64(length)}
// Copy into buffer
s.recvLock.Lock()
if length > s.recvWindow {
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
return ErrRecvWindowExceeded
}
if s.recvBuf == nil {
// Allocate the receive buffer just-in-time to fit the full data frame.
// This way we can read in the whole packet without further allocations.
@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
}
// Decrement the receive window
atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
s.recvWindow -= length
s.recvLock.Unlock()
// Unblock any readers
@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error {
// SetReadDeadline sets the deadline for future Read calls.
func (s *Stream) SetReadDeadline(t time.Time) error {
s.readDeadline = t
s.readDeadline.Store(t)
return nil
}
// SetWriteDeadline sets the deadline for future Write calls
func (s *Stream) SetWriteDeadline(t time.Time) error {
s.writeDeadline = t
s.writeDeadline.Store(t)
return nil
}

View File

@ -1,5 +1,20 @@
package yamux
import (
"sync"
"time"
)
var (
timerPool = &sync.Pool{
New: func() interface{} {
timer := time.NewTimer(time.Hour * 1e6)
timer.Stop()
return timer
},
}
)
// asyncSendErr is used to try an async send of an error
func asyncSendErr(ch chan error, err error) {
if ch == nil {

View File

@ -2,6 +2,7 @@ language: go
go:
- 1.8
- 1.x
- tip
script:

View File

@ -19,14 +19,19 @@ import (
type T interface {
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Fail()
FailNow()
Failed() bool
Helper()
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Log(args ...interface{})
Logf(format string, args ...interface{})
Name() string
Skip(args ...interface{})
SkipNow()
Skipf(format string, args ...interface{})
Skipped() bool
Helper()
}
// RuntimeT implements T and can be instantiated and run at runtime to
@ -34,7 +39,8 @@ type T interface {
// for calls to Fatal. For calls to Error, you'll have to check the errors
// list to determine whether to exit yourself.
type RuntimeT struct {
failed bool
skipped bool
failed bool
}
func (t *RuntimeT) Error(args ...interface{}) {
@ -43,20 +49,10 @@ func (t *RuntimeT) Error(args ...interface{}) {
}
func (t *RuntimeT) Errorf(format string, args ...interface{}) {
log.Println(fmt.Sprintf(format, args...))
log.Printf(format, args...)
t.Fail()
}
func (t *RuntimeT) Fatal(args ...interface{}) {
log.Println(fmt.Sprintln(args...))
t.FailNow()
}
func (t *RuntimeT) Fatalf(format string, args ...interface{}) {
log.Println(fmt.Sprintf(format, args...))
t.FailNow()
}
func (t *RuntimeT) Fail() {
t.failed = true
}
@ -69,7 +65,15 @@ func (t *RuntimeT) Failed() bool {
return t.failed
}
func (t *RuntimeT) Helper() {}
func (t *RuntimeT) Fatal(args ...interface{}) {
log.Print(args...)
t.FailNow()
}
func (t *RuntimeT) Fatalf(format string, args ...interface{}) {
log.Printf(format, args...)
t.FailNow()
}
func (t *RuntimeT) Log(args ...interface{}) {
log.Println(fmt.Sprintln(args...))
@ -78,3 +82,27 @@ func (t *RuntimeT) Log(args ...interface{}) {
func (t *RuntimeT) Logf(format string, args ...interface{}) {
log.Println(fmt.Sprintf(format, args...))
}
func (t *RuntimeT) Name() string {
return ""
}
func (t *RuntimeT) Skip(args ...interface{}) {
log.Print(args...)
t.SkipNow()
}
func (t *RuntimeT) SkipNow() {
t.skipped = true
}
func (t *RuntimeT) Skipf(format string, args ...interface{}) {
log.Printf(format, args...)
t.SkipNow()
}
func (t *RuntimeT) Skipped() bool {
return t.skipped
}
func (t *RuntimeT) Helper() {}

7
vendor/modules.txt vendored
View File

@ -316,8 +316,9 @@ github.com/hashicorp/go-getter/helper/url
github.com/hashicorp/go-hclog
# github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-multierror
# github.com/hashicorp/go-plugin v0.0.0-20181002195811-1faddcf740b6
# github.com/hashicorp/go-plugin v0.0.0-20181205205220-20341d70f4ff
github.com/hashicorp/go-plugin
github.com/hashicorp/go-plugin/internal/proto
# github.com/hashicorp/go-retryablehttp v0.5.0
github.com/hashicorp/go-retryablehttp
# github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90
@ -368,7 +369,7 @@ github.com/hashicorp/serf/coordinate
github.com/hashicorp/vault/helper/pgpkeys
github.com/hashicorp/vault/helper/jsonutil
github.com/hashicorp/vault/helper/compressutil
# github.com/hashicorp/yamux v0.0.0-20160720233140-d1caa6c97c9f
# github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb
github.com/hashicorp/yamux
# github.com/jen20/awspolicyequivalence v0.0.0-20170831201602-3d48364a137a
github.com/jen20/awspolicyequivalence
@ -420,7 +421,7 @@ github.com/mitchellh/copystructure
github.com/mitchellh/go-homedir
# github.com/mitchellh/go-linereader v0.0.0-20141013185533-07bab5fdd958
github.com/mitchellh/go-linereader
# github.com/mitchellh/go-testing-interface v0.0.0-20170730050907-9a441910b168
# github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77
github.com/mitchellh/go-testing-interface
# github.com/mitchellh/go-wordwrap v1.0.0
github.com/mitchellh/go-wordwrap