update go-plugin with proto negotiation

This commit is contained in:
James Bardin 2018-08-14 18:26:12 -04:00 committed by Martin Atkins
parent a3403f2766
commit eeb606425d
4 changed files with 207 additions and 46 deletions

View File

@ -10,7 +10,6 @@ import (
"hash"
"io"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
@ -71,16 +70,23 @@ var (
//
// See NewClient and ClientConfig for using a Client.
type Client struct {
config *ClientConfig
exited bool
doneLogging chan struct{}
l sync.Mutex
address net.Addr
process *os.Process
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
config *ClientConfig
exited bool
doneLogging chan struct{}
l sync.Mutex
address net.Addr
process *os.Process
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
negotiatedVersion int
}
// NegotiatedVersion returns the protocol version negotiated with the server.
// This is only valid after Start() is called.
func (c *Client) NegotiatedVersion() int {
return c.negotiatedVersion
}
// ClientConfig is the configuration used to initialize a new
@ -91,7 +97,13 @@ type ClientConfig struct {
HandshakeConfig
// Plugins are the plugins that can be consumed.
Plugins map[string]Plugin
// The implied version of this PluginSet is the Handshake.ProtocolVersion.
Plugins PluginSet
// VersionedPlugins is a map of PluginSets for specific protocol versions.
// These can be used to negotiate a compatible version between client and
// server. If this is set, Handshake.ProtocolVersion is not required.
VersionedPlugins map[int]PluginSet
// One of the following must be set, but not both.
//
@ -234,7 +246,6 @@ func CleanupClients() {
}
managedClientsLock.Unlock()
log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
wg.Wait()
}
@ -479,10 +490,30 @@ func (c *Client) Start() (addr net.Addr, err error) {
return c.address, nil
}
if c.config.VersionedPlugins == nil {
c.config.VersionedPlugins = make(map[int]PluginSet)
}
// handle all plugins as versioned, using the handshake config as the default.
version := int(c.config.ProtocolVersion)
// Make sure we're not overwriting a real version 0. If ProtocolVersion was
// non-zero, then we have to just assume the user made sure that
// VersionedPlugins doesn't conflict.
if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
c.config.VersionedPlugins[version] = c.config.Plugins
}
var versionStrings []string
for v := range c.config.VersionedPlugins {
versionStrings = append(versionStrings, strconv.Itoa(v))
}
env := []string{
fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
}
stdout_r, stdout_w := io.Pipe()
@ -624,20 +655,18 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}
// Parse the protocol version
var protocol int64
protocol, err = strconv.ParseInt(parts[1], 10, 0)
// Test the API version
version, pluginSet, err := c.checkProtoVersion(parts[1])
if err != nil {
err = fmt.Errorf("Error parsing protocol version: %s", err)
return
return addr, err
}
// Test the API version
if uint(protocol) != c.config.ProtocolVersion {
err = fmt.Errorf("Incompatible API version with plugin. "+
"Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion)
return
}
// set the Plugins value to the compatible set, so the version
// doesn't need to be passed through to the ClientProtocol
// implementation.
c.config.Plugins = pluginSet
c.negotiatedVersion = version
c.logger.Debug("using plugin", "version", version)
switch parts[2] {
case "tcp":
@ -665,7 +694,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
if !found {
err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
c.protocol, c.config.AllowedProtocols)
return
return addr, err
}
}
@ -674,6 +703,33 @@ func (c *Client) Start() (addr net.Addr, err error) {
return
}
// checkProtoVersion returns the negotiated version and PluginSet.
// This returns an error if the server returned an incompatible protocol
// version, or an invalid handshake response.
func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
serverVersion, err := strconv.Atoi(protoVersion)
if err != nil {
return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
}
// record these for the error message
var clientVersions []int
// all versions, including the legacy ProtocolVersion have been added to
// the versions set
for version, plugins := range c.config.VersionedPlugins {
clientVersions = append(clientVersions, version)
if serverVersion != version {
continue
}
return version, plugins, nil
}
return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
"Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
}
// ReattachConfig returns the information that must be provided to NewClient
// to reattach to the plugin process that this client started. This is
// useful for plugins that detach from their parent process.
@ -753,14 +809,14 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
func (c *Client) logStderr(r io.Reader) {
bufR := bufio.NewReader(r)
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
for {
line, err := bufR.ReadString('\n')
if line != "" {
c.config.Stderr.Write([]byte(line))
line = strings.TrimRightFunc(line, unicode.IsSpace)
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
entry, err := parseJSON(line)
// If output is not JSON format, print directly to Debug
if err != nil {
@ -768,7 +824,7 @@ func (c *Client) logStderr(r io.Reader) {
} else {
out := flattenKVPairs(entry.KVPairs)
l = l.With("timestamp", entry.Timestamp.Format(hclog.TimeFormat))
out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
switch hclog.LevelFromString(entry.Level) {
case hclog.Trace:
l.Trace(entry.Message, out...)

View File

@ -11,7 +11,9 @@ import (
"os"
"os/signal"
"runtime"
"sort"
"strconv"
"strings"
"sync/atomic"
"github.com/hashicorp/go-hclog"
@ -36,6 +38,8 @@ type HandshakeConfig struct {
// ProtocolVersion is the version that clients must match on to
// agree they can communicate. This should match the ProtocolVersion
// set on ClientConfig when using a plugin.
// This field is not required if VersionedPlugins are being used in the
// Client or Server configurations.
ProtocolVersion uint
// MagicCookieKey and value are used as a very basic verification
@ -46,6 +50,10 @@ type HandshakeConfig struct {
MagicCookieValue string
}
// PluginSet is a set of plugins provided to be registered in the plugin
// server.
type PluginSet map[string]Plugin
// ServeConfig configures what sorts of plugins are served.
type ServeConfig struct {
// HandshakeConfig is the configuration that must match clients.
@ -55,7 +63,13 @@ type ServeConfig struct {
TLSProvider func() (*tls.Config, error)
// Plugins are the plugins that are served.
Plugins map[string]Plugin
// The implied version of this PluginSet is the Handshake.ProtocolVersion.
Plugins PluginSet
// VersionedPlugins is a map of PluginSets for specific protocol versions.
// These can be used to negotiate a compatible version between client and
// server. If this is set, Handshake.ProtocolVersion is not required.
VersionedPlugins map[int]PluginSet
// GRPCServer should be non-nil to enable serving the plugins over
// gRPC. This is a function to create the server when needed with the
@ -72,14 +86,80 @@ type ServeConfig struct {
Logger hclog.Logger
}
// Protocol returns the protocol that this server should speak.
func (c *ServeConfig) Protocol() Protocol {
result := ProtocolNetRPC
if c.GRPCServer != nil {
result = ProtocolGRPC
// protocolVersion determines the protocol version and plugin set to be used by
// the server. In the event that there is no suitable version, the last version
// in the config is returned leaving the client to report the incompatibility.
func protocolVersion(opts *ServeConfig) (int, Protocol, PluginSet) {
protoVersion := int(opts.ProtocolVersion)
pluginSet := opts.Plugins
protoType := ProtocolNetRPC
// check if the client sent a list of acceptable versions
var clientVersions []int
if vs := os.Getenv("PLUGIN_PROTOCOL_VERSIONS"); vs != "" {
for _, s := range strings.Split(vs, ",") {
v, err := strconv.Atoi(s)
if err != nil {
fmt.Fprintf(os.Stderr, "server sent invalid plugin version %q", s)
continue
}
clientVersions = append(clientVersions, v)
}
}
return result
// we want to iterate in reverse order, to ensure we match the newest
// compatible plugin version.
sort.Sort(sort.Reverse(sort.IntSlice(clientVersions)))
// set the old un-versioned fields as if they were versioned plugins
if opts.VersionedPlugins == nil {
opts.VersionedPlugins = make(map[int]PluginSet)
}
if pluginSet != nil {
opts.VersionedPlugins[protoVersion] = pluginSet
}
// sort the version to make sure we match the latest first
var versions []int
for v := range opts.VersionedPlugins {
versions = append(versions, v)
}
sort.Sort(sort.Reverse(sort.IntSlice(versions)))
// see if we have multiple versions of Plugins to choose from
for _, version := range versions {
// record each version, since we guarantee that this returns valid
// values even if they are not a protocol match.
protoVersion = version
pluginSet = opts.VersionedPlugins[version]
// all plugins in a set must use the same transport, so check the first
// for the protocol type
for _, p := range pluginSet {
switch p.(type) {
case GRPCPlugin:
protoType = ProtocolGRPC
default:
protoType = ProtocolNetRPC
}
break
}
for _, clientVersion := range clientVersions {
if clientVersion == protoVersion {
return protoVersion, protoType, pluginSet
}
}
}
// Return the lowest version as the fallback.
// Since we iterated over all the versions in reverse order above, these
// values are from the lowest version number plugins (which may be from
// a combination of the Handshake.ProtocolVersion and ServeConfig.Plugins
// fields). This allows serving the oldest version of our plugins to a
// legacy client that did not send a PLUGIN_PROTOCOL_VERSIONS list.
return protoVersion, protoType, pluginSet
}
// Serve serves the plugins given by ServeConfig.
@ -107,6 +187,10 @@ func Serve(opts *ServeConfig) {
os.Exit(1)
}
// negotiate the version and plugins
// start with default version in the handshake config
protoVersion, protoType, pluginSet := protocolVersion(opts)
// Logging goes to the original stderr
log.SetOutput(os.Stderr)
@ -160,7 +244,7 @@ func Serve(opts *ServeConfig) {
// Build the server type
var server ServerProtocol
switch opts.Protocol() {
switch protoType {
case ProtocolNetRPC:
// If we have a TLS configuration then we wrap the listener
// ourselves and do it at that level.
@ -170,7 +254,7 @@ func Serve(opts *ServeConfig) {
// Create the RPC server to dispense
server = &RPCServer{
Plugins: opts.Plugins,
Plugins: pluginSet,
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
@ -179,7 +263,7 @@ func Serve(opts *ServeConfig) {
case ProtocolGRPC:
// Create the gRPC server
server = &GRPCServer{
Plugins: opts.Plugins,
Plugins: pluginSet,
Server: opts.GRPCServer,
TLS: tlsConfig,
Stdout: stdout_r,
@ -188,7 +272,7 @@ func Serve(opts *ServeConfig) {
}
default:
panic("unknown server protocol: " + opts.Protocol())
panic("unknown server protocol: " + protoType)
}
// Initialize the servers
@ -208,13 +292,13 @@ func Serve(opts *ServeConfig) {
logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String())
// Output the address and service name to stdout so that core can bring it up.
// Output the address and service name to stdout so that the client can bring it up.
fmt.Printf("%d|%d|%s|%s|%s%s\n",
CoreProtocolVersion,
opts.ProtocolVersion,
protoVersion,
listener.Addr().Network(),
listener.Addr().String(),
opts.Protocol(),
protoType,
extra)
os.Stdout.Sync()

View File

@ -3,6 +3,7 @@ package plugin
import (
"bytes"
"context"
"io"
"net"
"net/rpc"
@ -10,6 +11,18 @@ import (
"google.golang.org/grpc"
)
// TestOptions allows specifying options that can affect the behavior of the
// test functions
type TestOptions struct {
//ServerStdout causes the given value to be used in place of a blank buffer
//for RPCServer's Stdout
ServerStdout io.ReadCloser
//ServerStderr causes the given value to be used in place of a blank buffer
//for RPCServer's Stderr
ServerStderr io.ReadCloser
}
// The testing file contains test helpers that you can use outside of
// this package for making it easier to test plugins themselves.
@ -61,12 +74,20 @@ func TestRPCConn(t testing.T) (*rpc.Client, *rpc.Server) {
// TestPluginRPCConn returns a plugin RPC client and server that are connected
// together and configured.
func TestPluginRPCConn(t testing.T, ps map[string]Plugin) (*RPCClient, *RPCServer) {
func TestPluginRPCConn(t testing.T, ps map[string]Plugin, opts *TestOptions) (*RPCClient, *RPCServer) {
// Create two net.Conns we can use to shuttle our control connection
clientConn, serverConn := TestConn(t)
// Start up the server
server := &RPCServer{Plugins: ps, Stdout: new(bytes.Buffer), Stderr: new(bytes.Buffer)}
if opts != nil {
if opts.ServerStdout != nil {
server.Stdout = opts.ServerStdout
}
if opts.ServerStderr != nil {
server.Stderr = opts.ServerStderr
}
}
go server.ServeConn(serverConn)
// Connect the client to the server

6
vendor/vendor.json vendored
View File

@ -1802,10 +1802,10 @@
"revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5"
},
{
"checksumSHA1": "y3op+t01flBlSBKlzUNqH5d4XHQ=",
"checksumSHA1": "8X6scmZ/Xfkme6wM7KPWs6vBvsY=",
"path": "github.com/hashicorp/go-plugin",
"revision": "e53f54cbf51efde642d4711313e829a1ff0c236d",
"revisionTime": "2018-01-25T19:04:38Z",
"revision": "a4620f9913d19f03a6bf19b2f304daaaf83ea130",
"revisionTime": "2018-08-14T22:25:01Z",
"version": "master",
"versionExact": "master"
},