From eeb606425dc892b5eeef90a9bd3916913ae98857 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Tue, 14 Aug 2018 18:26:12 -0400 Subject: [PATCH] update go-plugin with proto negotiation --- .../github.com/hashicorp/go-plugin/client.go | 112 +++++++++++++----- .../github.com/hashicorp/go-plugin/server.go | 112 +++++++++++++++--- .../github.com/hashicorp/go-plugin/testing.go | 23 +++- vendor/vendor.json | 6 +- 4 files changed, 207 insertions(+), 46 deletions(-) diff --git a/vendor/github.com/hashicorp/go-plugin/client.go b/vendor/github.com/hashicorp/go-plugin/client.go index b3e3b78ea..1528305d9 100644 --- a/vendor/github.com/hashicorp/go-plugin/client.go +++ b/vendor/github.com/hashicorp/go-plugin/client.go @@ -10,7 +10,6 @@ import ( "hash" "io" "io/ioutil" - "log" "net" "os" "os/exec" @@ -71,16 +70,23 @@ var ( // // See NewClient and ClientConfig for using a Client. type Client struct { - config *ClientConfig - exited bool - doneLogging chan struct{} - l sync.Mutex - address net.Addr - process *os.Process - client ClientProtocol - protocol Protocol - logger hclog.Logger - doneCtx context.Context + config *ClientConfig + exited bool + doneLogging chan struct{} + l sync.Mutex + address net.Addr + process *os.Process + client ClientProtocol + protocol Protocol + logger hclog.Logger + doneCtx context.Context + negotiatedVersion int +} + +// NegotiatedVersion returns the protocol version negotiated with the server. +// This is only valid after Start() is called. +func (c *Client) NegotiatedVersion() int { + return c.negotiatedVersion } // ClientConfig is the configuration used to initialize a new @@ -91,7 +97,13 @@ type ClientConfig struct { HandshakeConfig // Plugins are the plugins that can be consumed. - Plugins map[string]Plugin + // The implied version of this PluginSet is the Handshake.ProtocolVersion. + Plugins PluginSet + + // VersionedPlugins is a map of PluginSets for specific protocol versions. + // These can be used to negotiate a compatible version between client and + // server. If this is set, Handshake.ProtocolVersion is not required. + VersionedPlugins map[int]PluginSet // One of the following must be set, but not both. // @@ -234,7 +246,6 @@ func CleanupClients() { } managedClientsLock.Unlock() - log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...") wg.Wait() } @@ -479,10 +490,30 @@ func (c *Client) Start() (addr net.Addr, err error) { return c.address, nil } + if c.config.VersionedPlugins == nil { + c.config.VersionedPlugins = make(map[int]PluginSet) + } + + // handle all plugins as versioned, using the handshake config as the default. + version := int(c.config.ProtocolVersion) + + // Make sure we're not overwriting a real version 0. If ProtocolVersion was + // non-zero, then we have to just assume the user made sure that + // VersionedPlugins doesn't conflict. + if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { + c.config.VersionedPlugins[version] = c.config.Plugins + } + + var versionStrings []string + for v := range c.config.VersionedPlugins { + versionStrings = append(versionStrings, strconv.Itoa(v)) + } + env := []string{ fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), + fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), } stdout_r, stdout_w := io.Pipe() @@ -624,20 +655,18 @@ func (c *Client) Start() (addr net.Addr, err error) { } } - // Parse the protocol version - var protocol int64 - protocol, err = strconv.ParseInt(parts[1], 10, 0) + // Test the API version + version, pluginSet, err := c.checkProtoVersion(parts[1]) if err != nil { - err = fmt.Errorf("Error parsing protocol version: %s", err) - return + return addr, err } - // Test the API version - if uint(protocol) != c.config.ProtocolVersion { - err = fmt.Errorf("Incompatible API version with plugin. "+ - "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion) - return - } + // set the Plugins value to the compatible set, so the version + // doesn't need to be passed through to the ClientProtocol + // implementation. + c.config.Plugins = pluginSet + c.negotiatedVersion = version + c.logger.Debug("using plugin", "version", version) switch parts[2] { case "tcp": @@ -665,7 +694,7 @@ func (c *Client) Start() (addr net.Addr, err error) { if !found { err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", c.protocol, c.config.AllowedProtocols) - return + return addr, err } } @@ -674,6 +703,33 @@ func (c *Client) Start() (addr net.Addr, err error) { return } +// checkProtoVersion returns the negotiated version and PluginSet. +// This returns an error if the server returned an incompatible protocol +// version, or an invalid handshake response. +func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { + serverVersion, err := strconv.Atoi(protoVersion) + if err != nil { + return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) + } + + // record these for the error message + var clientVersions []int + + // all versions, including the legacy ProtocolVersion have been added to + // the versions set + for version, plugins := range c.config.VersionedPlugins { + clientVersions = append(clientVersions, version) + + if serverVersion != version { + continue + } + return version, plugins, nil + } + + return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ + "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) +} + // ReattachConfig returns the information that must be provided to NewClient // to reattach to the plugin process that this client started. This is // useful for plugins that detach from their parent process. @@ -753,14 +809,14 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { func (c *Client) logStderr(r io.Reader) { bufR := bufio.NewReader(r) + l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) + for { line, err := bufR.ReadString('\n') if line != "" { c.config.Stderr.Write([]byte(line)) line = strings.TrimRightFunc(line, unicode.IsSpace) - l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) - entry, err := parseJSON(line) // If output is not JSON format, print directly to Debug if err != nil { @@ -768,7 +824,7 @@ func (c *Client) logStderr(r io.Reader) { } else { out := flattenKVPairs(entry.KVPairs) - l = l.With("timestamp", entry.Timestamp.Format(hclog.TimeFormat)) + out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) switch hclog.LevelFromString(entry.Level) { case hclog.Trace: l.Trace(entry.Message, out...) diff --git a/vendor/github.com/hashicorp/go-plugin/server.go b/vendor/github.com/hashicorp/go-plugin/server.go index 1e808b99e..6de90485f 100644 --- a/vendor/github.com/hashicorp/go-plugin/server.go +++ b/vendor/github.com/hashicorp/go-plugin/server.go @@ -11,7 +11,9 @@ import ( "os" "os/signal" "runtime" + "sort" "strconv" + "strings" "sync/atomic" "github.com/hashicorp/go-hclog" @@ -36,6 +38,8 @@ type HandshakeConfig struct { // ProtocolVersion is the version that clients must match on to // agree they can communicate. This should match the ProtocolVersion // set on ClientConfig when using a plugin. + // This field is not required if VersionedPlugins are being used in the + // Client or Server configurations. ProtocolVersion uint // MagicCookieKey and value are used as a very basic verification @@ -46,6 +50,10 @@ type HandshakeConfig struct { MagicCookieValue string } +// PluginSet is a set of plugins provided to be registered in the plugin +// server. +type PluginSet map[string]Plugin + // ServeConfig configures what sorts of plugins are served. type ServeConfig struct { // HandshakeConfig is the configuration that must match clients. @@ -55,7 +63,13 @@ type ServeConfig struct { TLSProvider func() (*tls.Config, error) // Plugins are the plugins that are served. - Plugins map[string]Plugin + // The implied version of this PluginSet is the Handshake.ProtocolVersion. + Plugins PluginSet + + // VersionedPlugins is a map of PluginSets for specific protocol versions. + // These can be used to negotiate a compatible version between client and + // server. If this is set, Handshake.ProtocolVersion is not required. + VersionedPlugins map[int]PluginSet // GRPCServer should be non-nil to enable serving the plugins over // gRPC. This is a function to create the server when needed with the @@ -72,14 +86,80 @@ type ServeConfig struct { Logger hclog.Logger } -// Protocol returns the protocol that this server should speak. -func (c *ServeConfig) Protocol() Protocol { - result := ProtocolNetRPC - if c.GRPCServer != nil { - result = ProtocolGRPC +// protocolVersion determines the protocol version and plugin set to be used by +// the server. In the event that there is no suitable version, the last version +// in the config is returned leaving the client to report the incompatibility. +func protocolVersion(opts *ServeConfig) (int, Protocol, PluginSet) { + protoVersion := int(opts.ProtocolVersion) + pluginSet := opts.Plugins + protoType := ProtocolNetRPC + // check if the client sent a list of acceptable versions + var clientVersions []int + if vs := os.Getenv("PLUGIN_PROTOCOL_VERSIONS"); vs != "" { + for _, s := range strings.Split(vs, ",") { + v, err := strconv.Atoi(s) + if err != nil { + fmt.Fprintf(os.Stderr, "server sent invalid plugin version %q", s) + continue + } + clientVersions = append(clientVersions, v) + } } - return result + // we want to iterate in reverse order, to ensure we match the newest + // compatible plugin version. + sort.Sort(sort.Reverse(sort.IntSlice(clientVersions))) + + // set the old un-versioned fields as if they were versioned plugins + if opts.VersionedPlugins == nil { + opts.VersionedPlugins = make(map[int]PluginSet) + } + + if pluginSet != nil { + opts.VersionedPlugins[protoVersion] = pluginSet + } + + // sort the version to make sure we match the latest first + var versions []int + for v := range opts.VersionedPlugins { + versions = append(versions, v) + } + + sort.Sort(sort.Reverse(sort.IntSlice(versions))) + + // see if we have multiple versions of Plugins to choose from + for _, version := range versions { + // record each version, since we guarantee that this returns valid + // values even if they are not a protocol match. + protoVersion = version + pluginSet = opts.VersionedPlugins[version] + + // all plugins in a set must use the same transport, so check the first + // for the protocol type + for _, p := range pluginSet { + switch p.(type) { + case GRPCPlugin: + protoType = ProtocolGRPC + default: + protoType = ProtocolNetRPC + } + break + } + + for _, clientVersion := range clientVersions { + if clientVersion == protoVersion { + return protoVersion, protoType, pluginSet + } + } + } + + // Return the lowest version as the fallback. + // Since we iterated over all the versions in reverse order above, these + // values are from the lowest version number plugins (which may be from + // a combination of the Handshake.ProtocolVersion and ServeConfig.Plugins + // fields). This allows serving the oldest version of our plugins to a + // legacy client that did not send a PLUGIN_PROTOCOL_VERSIONS list. + return protoVersion, protoType, pluginSet } // Serve serves the plugins given by ServeConfig. @@ -107,6 +187,10 @@ func Serve(opts *ServeConfig) { os.Exit(1) } + // negotiate the version and plugins + // start with default version in the handshake config + protoVersion, protoType, pluginSet := protocolVersion(opts) + // Logging goes to the original stderr log.SetOutput(os.Stderr) @@ -160,7 +244,7 @@ func Serve(opts *ServeConfig) { // Build the server type var server ServerProtocol - switch opts.Protocol() { + switch protoType { case ProtocolNetRPC: // If we have a TLS configuration then we wrap the listener // ourselves and do it at that level. @@ -170,7 +254,7 @@ func Serve(opts *ServeConfig) { // Create the RPC server to dispense server = &RPCServer{ - Plugins: opts.Plugins, + Plugins: pluginSet, Stdout: stdout_r, Stderr: stderr_r, DoneCh: doneCh, @@ -179,7 +263,7 @@ func Serve(opts *ServeConfig) { case ProtocolGRPC: // Create the gRPC server server = &GRPCServer{ - Plugins: opts.Plugins, + Plugins: pluginSet, Server: opts.GRPCServer, TLS: tlsConfig, Stdout: stdout_r, @@ -188,7 +272,7 @@ func Serve(opts *ServeConfig) { } default: - panic("unknown server protocol: " + opts.Protocol()) + panic("unknown server protocol: " + protoType) } // Initialize the servers @@ -208,13 +292,13 @@ func Serve(opts *ServeConfig) { logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String()) - // Output the address and service name to stdout so that core can bring it up. + // Output the address and service name to stdout so that the client can bring it up. fmt.Printf("%d|%d|%s|%s|%s%s\n", CoreProtocolVersion, - opts.ProtocolVersion, + protoVersion, listener.Addr().Network(), listener.Addr().String(), - opts.Protocol(), + protoType, extra) os.Stdout.Sync() diff --git a/vendor/github.com/hashicorp/go-plugin/testing.go b/vendor/github.com/hashicorp/go-plugin/testing.go index df29593e1..2f541d968 100644 --- a/vendor/github.com/hashicorp/go-plugin/testing.go +++ b/vendor/github.com/hashicorp/go-plugin/testing.go @@ -3,6 +3,7 @@ package plugin import ( "bytes" "context" + "io" "net" "net/rpc" @@ -10,6 +11,18 @@ import ( "google.golang.org/grpc" ) +// TestOptions allows specifying options that can affect the behavior of the +// test functions +type TestOptions struct { + //ServerStdout causes the given value to be used in place of a blank buffer + //for RPCServer's Stdout + ServerStdout io.ReadCloser + + //ServerStderr causes the given value to be used in place of a blank buffer + //for RPCServer's Stderr + ServerStderr io.ReadCloser +} + // The testing file contains test helpers that you can use outside of // this package for making it easier to test plugins themselves. @@ -61,12 +74,20 @@ func TestRPCConn(t testing.T) (*rpc.Client, *rpc.Server) { // TestPluginRPCConn returns a plugin RPC client and server that are connected // together and configured. -func TestPluginRPCConn(t testing.T, ps map[string]Plugin) (*RPCClient, *RPCServer) { +func TestPluginRPCConn(t testing.T, ps map[string]Plugin, opts *TestOptions) (*RPCClient, *RPCServer) { // Create two net.Conns we can use to shuttle our control connection clientConn, serverConn := TestConn(t) // Start up the server server := &RPCServer{Plugins: ps, Stdout: new(bytes.Buffer), Stderr: new(bytes.Buffer)} + if opts != nil { + if opts.ServerStdout != nil { + server.Stdout = opts.ServerStdout + } + if opts.ServerStderr != nil { + server.Stderr = opts.ServerStderr + } + } go server.ServeConn(serverConn) // Connect the client to the server diff --git a/vendor/vendor.json b/vendor/vendor.json index 432dbdd6f..c419b48e4 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1802,10 +1802,10 @@ "revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5" }, { - "checksumSHA1": "y3op+t01flBlSBKlzUNqH5d4XHQ=", + "checksumSHA1": "8X6scmZ/Xfkme6wM7KPWs6vBvsY=", "path": "github.com/hashicorp/go-plugin", - "revision": "e53f54cbf51efde642d4711313e829a1ff0c236d", - "revisionTime": "2018-01-25T19:04:38Z", + "revision": "a4620f9913d19f03a6bf19b2f304daaaf83ea130", + "revisionTime": "2018-08-14T22:25:01Z", "version": "master", "versionExact": "master" },