vendor: update go-plugin

This fixes some hanging issues seen by TF users, see the relevant
commits in go-plugin.
This commit is contained in:
Mitchell Hashimoto 2017-02-17 08:32:21 -08:00
parent 7fa5731ad5
commit 6cc5123d31
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 64 additions and 14 deletions

View File

@ -28,6 +28,7 @@ var Killed uint32 = 0
// This is a slice of the "managed" clients which are cleaned up when // This is a slice of the "managed" clients which are cleaned up when
// calling Cleanup // calling Cleanup
var managedClients = make([]*Client, 0, 5) var managedClients = make([]*Client, 0, 5)
var managedClientsLock sync.Mutex
// Error types // Error types
var ( var (
@ -130,6 +131,7 @@ func CleanupClients() {
// Kill all the managed clients in parallel and use a WaitGroup // Kill all the managed clients in parallel and use a WaitGroup
// to wait for them all to finish up. // to wait for them all to finish up.
var wg sync.WaitGroup var wg sync.WaitGroup
managedClientsLock.Lock()
for _, client := range managedClients { for _, client := range managedClients {
wg.Add(1) wg.Add(1)
@ -138,6 +140,7 @@ func CleanupClients() {
wg.Done() wg.Done()
}(client) }(client)
} }
managedClientsLock.Unlock()
log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...") log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
wg.Wait() wg.Wait()
@ -173,7 +176,9 @@ func NewClient(config *ClientConfig) (c *Client) {
c = &Client{config: config} c = &Client{config: config}
if config.Managed { if config.Managed {
managedClientsLock.Lock()
managedClients = append(managedClients, c) managedClients = append(managedClients, c)
managedClientsLock.Unlock()
} }
return return
@ -239,23 +244,58 @@ func (c *Client) Exited() bool {
// //
// This method can safely be called multiple times. // This method can safely be called multiple times.
func (c *Client) Kill() { func (c *Client) Kill() {
if c.process == nil { // Grab a lock to read some private fields.
c.l.Lock()
process := c.process
addr := c.address
doneCh := c.doneLogging
c.l.Unlock()
// If there is no process, we never started anything. Nothing to kill.
if process == nil {
return return
} }
// Close the client to cleanly exit the process // We need to check for address here. It is possible that the plugin
client, err := c.Client() // started (process != nil) but has no address (addr == nil) if the
if err == nil { // plugin failed at startup. If we do have an address, we need to close
err = client.Close() // the plugin net connections.
} graceful := false
if err != nil { if addr != nil {
// If something went wrong somewhere gracefully quitting the // Close the client to cleanly exit the process.
// plugin, we just force kill it. client, err := c.Client()
c.process.Kill() if err == nil {
err = client.Close()
// If there is no error, then we attempt to wait for a graceful
// exit. If there was an error, we assume that graceful cleanup
// won't happen and just force kill.
graceful = err == nil
if err != nil {
// If there was an error just log it. We're going to force
// kill in a moment anyways.
log.Printf(
"[WARN] plugin: error closing client during Kill: %s", err)
}
}
} }
// If we're attempting a graceful exit, then we wait for a short period
// of time to allow that to happen. To wait for this we just wait on the
// doneCh which would be closed if the process exits.
if graceful {
select {
case <-doneCh:
return
case <-time.After(250 * time.Millisecond):
}
}
// If graceful exiting failed, just kill it
process.Kill()
// Wait for the client to finish logging so we have a complete log // Wait for the client to finish logging so we have a complete log
<-c.doneLogging <-doneCh
} }
// Starts the underlying subprocess, communicating with it to negotiate // Starts the underlying subprocess, communicating with it to negotiate

View File

@ -7,12 +7,16 @@ import (
"log" "log"
"net" "net"
"net/rpc" "net/rpc"
"sync"
"github.com/hashicorp/yamux" "github.com/hashicorp/yamux"
) )
// RPCServer listens for network connections and then dispenses interface // RPCServer listens for network connections and then dispenses interface
// implementations over net/rpc. // implementations over net/rpc.
//
// After setting the fields below, they shouldn't be read again directly
// from the structure which may be reading/writing them concurrently.
type RPCServer struct { type RPCServer struct {
Plugins map[string]Plugin Plugins map[string]Plugin
@ -26,6 +30,8 @@ type RPCServer struct {
// DoneCh should be set to a non-nil channel that will be closed // DoneCh should be set to a non-nil channel that will be closed
// when the control requests the RPC server to end. // when the control requests the RPC server to end.
DoneCh chan<- struct{} DoneCh chan<- struct{}
lock sync.Mutex
} }
// Accept accepts connections on a listener and serves requests for // Accept accepts connections on a listener and serves requests for
@ -102,8 +108,12 @@ func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
// doneCh to close which is listened to by the main process to cleanly // doneCh to close which is listened to by the main process to cleanly
// exit. // exit.
func (s *RPCServer) done() { func (s *RPCServer) done() {
s.lock.Lock()
defer s.lock.Unlock()
if s.DoneCh != nil { if s.DoneCh != nil {
close(s.DoneCh) close(s.DoneCh)
s.DoneCh = nil
} }
} }

6
vendor/vendor.json vendored
View File

@ -1680,10 +1680,10 @@
"revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5" "revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5"
}, },
{ {
"checksumSHA1": "Jh6jdEjDeajnpEG8xlrLrnYw210=", "checksumSHA1": "b0nQutPMJHeUmz4SjpreotAo6Yk=",
"path": "github.com/hashicorp/go-plugin", "path": "github.com/hashicorp/go-plugin",
"revision": "8cf118f7a2f0c7ef1c82f66d4f6ac77c7e27dc12", "revision": "f72692aebca2008343a9deb06ddb4b17f7051c15",
"revisionTime": "2016-06-08T02:21:58Z" "revisionTime": "2017-02-17T16:27:05Z"
}, },
{ {
"checksumSHA1": "GBDE1KDl/7c5hlRPYRZ7+C0WQ0g=", "checksumSHA1": "GBDE1KDl/7c5hlRPYRZ7+C0WQ0g=",