update the go-plugin package
This contains some new functionality related to grpc.
This commit is contained in:
parent
99c60f5525
commit
18c2ad1cfc
|
@ -76,7 +76,7 @@ must be properly secured to protect this configuration.
|
|||
## Architecture
|
||||
|
||||
The HashiCorp plugin system works by launching subprocesses and communicating
|
||||
over RPC (using standard `net/rpc` or [gRPC](http://www.grpc.io). A single
|
||||
over RPC (using standard `net/rpc` or [gRPC](http://www.grpc.io)). A single
|
||||
connection is made between any plugin and the host process. For net/rpc-based
|
||||
plugins, we use a [connection multiplexing](https://github.com/hashicorp/yamux)
|
||||
library to multiplex any other connections on top. For gRPC-based plugins,
|
||||
|
|
|
@ -2,6 +2,7 @@ package plugin
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
|
@ -79,6 +80,7 @@ type Client struct {
|
|||
client ClientProtocol
|
||||
protocol Protocol
|
||||
logger hclog.Logger
|
||||
doneCtx context.Context
|
||||
}
|
||||
|
||||
// ClientConfig is the configuration used to initialize a new
|
||||
|
@ -310,7 +312,7 @@ func (c *Client) Client() (ClientProtocol, error) {
|
|||
c.client, err = newRPCClient(c)
|
||||
|
||||
case ProtocolGRPC:
|
||||
c.client, err = newGRPCClient(c)
|
||||
c.client, err = newGRPCClient(c.doneCtx, c)
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
|
||||
|
@ -423,6 +425,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|||
|
||||
// Create the logging channel for when we kill
|
||||
c.doneLogging = make(chan struct{})
|
||||
// Create a context for when we kill
|
||||
var ctxCancel context.CancelFunc
|
||||
c.doneCtx, ctxCancel = context.WithCancel(context.Background())
|
||||
|
||||
if c.config.Reattach != nil {
|
||||
// Verify the process still exists. If not, then it is an error
|
||||
|
@ -457,6 +462,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|||
|
||||
// Close the logging channel since that doesn't work on reattach
|
||||
close(c.doneLogging)
|
||||
|
||||
// Cancel the context
|
||||
ctxCancel()
|
||||
}(p.Pid)
|
||||
|
||||
// Set the address and process
|
||||
|
@ -535,6 +543,9 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|||
// Mark that we exited
|
||||
close(exitCh)
|
||||
|
||||
// Cancel the context, marking that we exited
|
||||
ctxCancel()
|
||||
|
||||
// Set that we exited, which takes a lock
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
|
@ -606,7 +617,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|||
|
||||
if int(coreProtocol) != CoreProtocolVersion {
|
||||
err = fmt.Errorf("Incompatible core API version with plugin. "+
|
||||
"Plugin version: %s, Ours: %d\n\n"+
|
||||
"Plugin version: %s, Core version: %d\n\n"+
|
||||
"To fix this, the plugin usually only needs to be recompiled.\n"+
|
||||
"Please report this to the plugin author.", parts[0], CoreProtocolVersion)
|
||||
return
|
||||
|
@ -624,7 +635,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
|
|||
// Test the API version
|
||||
if uint(protocol) != c.config.ProtocolVersion {
|
||||
err = fmt.Errorf("Incompatible API version with plugin. "+
|
||||
"Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion)
|
||||
"Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -707,18 +718,29 @@ func (c *Client) Protocol() Protocol {
|
|||
return c.protocol
|
||||
}
|
||||
|
||||
func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
|
||||
return func(_ string, _ time.Duration) (net.Conn, error) {
|
||||
// Connect to the client
|
||||
conn, err := net.Dial(addr.Network(), addr.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
// Make sure to set keep alive so that the connection doesn't die
|
||||
tcpConn.SetKeepAlive(true)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
|
||||
// dialer is compatible with grpc.WithDialer and creates the connection
|
||||
// to the plugin.
|
||||
func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
|
||||
// Connect to the client
|
||||
conn, err := net.Dial(c.address.Network(), c.address.String())
|
||||
conn, err := netAddrDialer(c.address)("", timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
// Make sure to set keep alive so that the connection doesn't die
|
||||
tcpConn.SetKeepAlive(true)
|
||||
}
|
||||
|
||||
// If we have a TLS config we wrap our connection. We only do this
|
||||
// for net/rpc since gRPC uses its own mechanism for TLS.
|
||||
|
|
|
@ -0,0 +1,455 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/run"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
// streamer interface is used in the broker to send/receive connection
|
||||
// information.
|
||||
type streamer interface {
|
||||
Send(*ConnInfo) error
|
||||
Recv() (*ConnInfo, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
// sendErr is used to pass errors back during a send.
|
||||
type sendErr struct {
|
||||
i *ConnInfo
|
||||
ch chan error
|
||||
}
|
||||
|
||||
// gRPCBrokerServer is used by the plugin to start a stream and to send
|
||||
// connection information to/from the plugin. Implements GRPCBrokerServer and
|
||||
// streamer interfaces.
|
||||
type gRPCBrokerServer struct {
|
||||
// send is used to send connection info to the gRPC stream.
|
||||
send chan *sendErr
|
||||
|
||||
// recv is used to receive connection info from the gRPC stream.
|
||||
recv chan *ConnInfo
|
||||
|
||||
// quit closes down the stream.
|
||||
quit chan struct{}
|
||||
|
||||
// o is used to ensure we close the quit channel only once.
|
||||
o sync.Once
|
||||
}
|
||||
|
||||
func newGRPCBrokerServer() *gRPCBrokerServer {
|
||||
return &gRPCBrokerServer{
|
||||
send: make(chan *sendErr),
|
||||
recv: make(chan *ConnInfo),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// StartStream implements the GRPCBrokerServer interface and will block until
|
||||
// the quit channel is closed or the context reports Done. The stream will pass
|
||||
// connection information to/from the client.
|
||||
func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
|
||||
doneCh := stream.Context().Done()
|
||||
defer s.Close()
|
||||
|
||||
// Proccess send stream
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-s.quit:
|
||||
return
|
||||
case se := <-s.send:
|
||||
err := stream.Send(se.i)
|
||||
se.ch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Process receive stream
|
||||
for {
|
||||
i, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-doneCh:
|
||||
return nil
|
||||
case <-s.quit:
|
||||
return nil
|
||||
case s.recv <- i:
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send is used by the GRPCBroker to pass connection information into the stream
|
||||
// to the client.
|
||||
func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
|
||||
ch := make(chan error)
|
||||
defer close(ch)
|
||||
|
||||
select {
|
||||
case <-s.quit:
|
||||
return errors.New("broker closed")
|
||||
case s.send <- &sendErr{
|
||||
i: i,
|
||||
ch: ch,
|
||||
}:
|
||||
}
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// Recv is used by the GRPCBroker to pass connection information that has been
|
||||
// sent from the client from the stream to the broker.
|
||||
func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
|
||||
select {
|
||||
case <-s.quit:
|
||||
return nil, errors.New("broker closed")
|
||||
case i := <-s.recv:
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the quit channel, shutting down the stream.
|
||||
func (s *gRPCBrokerServer) Close() {
|
||||
s.o.Do(func() {
|
||||
close(s.quit)
|
||||
})
|
||||
}
|
||||
|
||||
// gRPCBrokerClientImpl is used by the client to start a stream and to send
|
||||
// connection information to/from the client. Implements GRPCBrokerClient and
|
||||
// streamer interfaces.
|
||||
type gRPCBrokerClientImpl struct {
|
||||
// client is the underlying GRPC client used to make calls to the server.
|
||||
client GRPCBrokerClient
|
||||
|
||||
// send is used to send connection info to the gRPC stream.
|
||||
send chan *sendErr
|
||||
|
||||
// recv is used to receive connection info from the gRPC stream.
|
||||
recv chan *ConnInfo
|
||||
|
||||
// quit closes down the stream.
|
||||
quit chan struct{}
|
||||
|
||||
// o is used to ensure we close the quit channel only once.
|
||||
o sync.Once
|
||||
}
|
||||
|
||||
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
|
||||
return &gRPCBrokerClientImpl{
|
||||
client: NewGRPCBrokerClient(conn),
|
||||
send: make(chan *sendErr),
|
||||
recv: make(chan *ConnInfo),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// StartStream implements the GRPCBrokerClient interface and will block until
|
||||
// the quit channel is closed or the context reports Done. The stream will pass
|
||||
// connection information to/from the plugin.
|
||||
func (s *gRPCBrokerClientImpl) StartStream() error {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
defer s.Close()
|
||||
|
||||
stream, err := s.client.StartStream(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
doneCh := stream.Context().Done()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-s.quit:
|
||||
return
|
||||
case se := <-s.send:
|
||||
err := stream.Send(se.i)
|
||||
se.ch <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
i, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-doneCh:
|
||||
return nil
|
||||
case <-s.quit:
|
||||
return nil
|
||||
case s.recv <- i:
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send is used by the GRPCBroker to pass connection information into the stream
|
||||
// to the plugin.
|
||||
func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
|
||||
ch := make(chan error)
|
||||
defer close(ch)
|
||||
|
||||
select {
|
||||
case <-s.quit:
|
||||
return errors.New("broker closed")
|
||||
case s.send <- &sendErr{
|
||||
i: i,
|
||||
ch: ch,
|
||||
}:
|
||||
}
|
||||
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// Recv is used by the GRPCBroker to pass connection information that has been
|
||||
// sent from the plugin to the broker.
|
||||
func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
|
||||
select {
|
||||
case <-s.quit:
|
||||
return nil, errors.New("broker closed")
|
||||
case i := <-s.recv:
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the quit channel, shutting down the stream.
|
||||
func (s *gRPCBrokerClientImpl) Close() {
|
||||
s.o.Do(func() {
|
||||
close(s.quit)
|
||||
})
|
||||
}
|
||||
|
||||
// GRPCBroker is responsible for brokering connections by unique ID.
|
||||
//
|
||||
// It is used by plugins to create multiple gRPC connections and data
|
||||
// streams between the plugin process and the host process.
|
||||
//
|
||||
// This allows a plugin to request a channel with a specific ID to connect to
|
||||
// or accept a connection from, and the broker handles the details of
|
||||
// holding these channels open while they're being negotiated.
|
||||
//
|
||||
// The Plugin interface has access to these for both Server and Client.
|
||||
// The broker can be used by either (optionally) to reserve and connect to
|
||||
// new streams. This is useful for complex args and return values,
|
||||
// or anything else you might need a data stream for.
|
||||
type GRPCBroker struct {
|
||||
nextId uint32
|
||||
streamer streamer
|
||||
streams map[uint32]*gRPCBrokerPending
|
||||
tls *tls.Config
|
||||
doneCh chan struct{}
|
||||
o sync.Once
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type gRPCBrokerPending struct {
|
||||
ch chan *ConnInfo
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
|
||||
return &GRPCBroker{
|
||||
streamer: s,
|
||||
streams: make(map[uint32]*gRPCBrokerPending),
|
||||
tls: tls,
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Accept accepts a connection by ID.
|
||||
//
|
||||
// This should not be called multiple times with the same ID at one time.
|
||||
func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
|
||||
listener, err := serverListener()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = b.streamer.Send(&ConnInfo{
|
||||
ServiceId: id,
|
||||
Network: listener.Addr().Network(),
|
||||
Address: listener.Addr().String(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
// AcceptAndServe is used to accept a specific stream ID and immediately
|
||||
// serve a gRPC server on that stream ID. This is used to easily serve
|
||||
// complex arguments. Each AcceptAndServe call opens a new listener socket and
|
||||
// sends the connection info down the stream to the dialer. Since a new
|
||||
// connection is opened every call, these calls should be used sparingly.
|
||||
// Multiple gRPC server implementations can be registered to a single
|
||||
// AcceptAndServe call.
|
||||
func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
|
||||
listener, err := b.Accept(id)
|
||||
if err != nil {
|
||||
log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
|
||||
return
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
var opts []grpc.ServerOption
|
||||
if b.tls != nil {
|
||||
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
|
||||
}
|
||||
|
||||
server := s(opts)
|
||||
|
||||
// Here we use a run group to close this goroutine if the server is shutdown
|
||||
// or the broker is shutdown.
|
||||
var g run.Group
|
||||
{
|
||||
// Serve on the listener, if shutting down call GracefulStop.
|
||||
g.Add(func() error {
|
||||
return server.Serve(listener)
|
||||
}, func(err error) {
|
||||
server.GracefulStop()
|
||||
})
|
||||
}
|
||||
{
|
||||
// block on the closeCh or the doneCh. If we are shutting down close the
|
||||
// closeCh.
|
||||
closeCh := make(chan struct{})
|
||||
g.Add(func() error {
|
||||
select {
|
||||
case <-b.doneCh:
|
||||
case <-closeCh:
|
||||
}
|
||||
return nil
|
||||
}, func(err error) {
|
||||
close(closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
// Block until we are done
|
||||
g.Run()
|
||||
}
|
||||
|
||||
// Close closes the stream and all servers.
|
||||
func (b *GRPCBroker) Close() error {
|
||||
b.streamer.Close()
|
||||
b.o.Do(func() {
|
||||
close(b.doneCh)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dial opens a connection by ID.
|
||||
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
|
||||
var c *ConnInfo
|
||||
|
||||
// Open the stream
|
||||
p := b.getStream(id)
|
||||
select {
|
||||
case c = <-p.ch:
|
||||
close(p.doneCh)
|
||||
case <-time.After(5 * time.Second):
|
||||
return nil, fmt.Errorf("timeout waiting for connection info")
|
||||
}
|
||||
|
||||
var addr net.Addr
|
||||
switch c.Network {
|
||||
case "tcp":
|
||||
addr, err = net.ResolveTCPAddr("tcp", c.Address)
|
||||
case "unix":
|
||||
addr, err = net.ResolveUnixAddr("unix", c.Address)
|
||||
default:
|
||||
err = fmt.Errorf("Unknown address type: %s", c.Address)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dialGRPCConn(b.tls, netAddrDialer(addr))
|
||||
}
|
||||
|
||||
// NextId returns a unique ID to use next.
|
||||
//
|
||||
// It is possible for very long-running plugin hosts to wrap this value,
|
||||
// though it would require a very large amount of calls. In practice
|
||||
// we've never seen it happen.
|
||||
func (m *GRPCBroker) NextId() uint32 {
|
||||
return atomic.AddUint32(&m.nextId, 1)
|
||||
}
|
||||
|
||||
// Run starts the brokering and should be executed in a goroutine, since it
|
||||
// blocks forever, or until the session closes.
|
||||
//
|
||||
// Uses of GRPCBroker never need to call this. It is called internally by
|
||||
// the plugin host/client.
|
||||
func (m *GRPCBroker) Run() {
|
||||
for {
|
||||
stream, err := m.streamer.Recv()
|
||||
if err != nil {
|
||||
// Once we receive an error, just exit
|
||||
break
|
||||
}
|
||||
|
||||
// Initialize the waiter
|
||||
p := m.getStream(stream.ServiceId)
|
||||
select {
|
||||
case p.ch <- stream:
|
||||
default:
|
||||
}
|
||||
|
||||
go m.timeoutWait(stream.ServiceId, p)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
p, ok := m.streams[id]
|
||||
if ok {
|
||||
return p
|
||||
}
|
||||
|
||||
m.streams[id] = &gRPCBrokerPending{
|
||||
ch: make(chan *ConnInfo, 1),
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
return m.streams[id]
|
||||
}
|
||||
|
||||
func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
|
||||
// Wait for the stream to either be picked up and connected, or
|
||||
// for a timeout.
|
||||
select {
|
||||
case <-p.doneCh:
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Delete the stream so no one else can grab it
|
||||
delete(m.streams, id)
|
||||
}
|
|
@ -0,0 +1,190 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// source: grpc_broker.proto
|
||||
|
||||
/*
|
||||
Package plugin is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
grpc_broker.proto
|
||||
|
||||
It has these top-level messages:
|
||||
ConnInfo
|
||||
*/
|
||||
package plugin
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
type ConnInfo struct {
|
||||
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"`
|
||||
Network string `protobuf:"bytes,2,opt,name=network" json:"network,omitempty"`
|
||||
Address string `protobuf:"bytes,3,opt,name=address" json:"address,omitempty"`
|
||||
}
|
||||
|
||||
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
|
||||
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
|
||||
func (*ConnInfo) ProtoMessage() {}
|
||||
func (*ConnInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||
|
||||
func (m *ConnInfo) GetServiceId() uint32 {
|
||||
if m != nil {
|
||||
return m.ServiceId
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ConnInfo) GetNetwork() string {
|
||||
if m != nil {
|
||||
return m.Network
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *ConnInfo) GetAddress() string {
|
||||
if m != nil {
|
||||
return m.Address
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*ConnInfo)(nil), "plugin.ConnInfo")
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion4
|
||||
|
||||
// Client API for GRPCBroker service
|
||||
|
||||
type GRPCBrokerClient interface {
|
||||
StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error)
|
||||
}
|
||||
|
||||
type gRPCBrokerClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewGRPCBrokerClient(cc *grpc.ClientConn) GRPCBrokerClient {
|
||||
return &gRPCBrokerClient{cc}
|
||||
}
|
||||
|
||||
func (c *gRPCBrokerClient) StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], c.cc, "/plugin.GRPCBroker/StartStream", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &gRPCBrokerStartStreamClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type GRPCBroker_StartStreamClient interface {
|
||||
Send(*ConnInfo) error
|
||||
Recv() (*ConnInfo, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type gRPCBrokerStartStreamClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *gRPCBrokerStartStreamClient) Send(m *ConnInfo) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *gRPCBrokerStartStreamClient) Recv() (*ConnInfo, error) {
|
||||
m := new(ConnInfo)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for GRPCBroker service
|
||||
|
||||
type GRPCBrokerServer interface {
|
||||
StartStream(GRPCBroker_StartStreamServer) error
|
||||
}
|
||||
|
||||
func RegisterGRPCBrokerServer(s *grpc.Server, srv GRPCBrokerServer) {
|
||||
s.RegisterService(&_GRPCBroker_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _GRPCBroker_StartStream_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(GRPCBrokerServer).StartStream(&gRPCBrokerStartStreamServer{stream})
|
||||
}
|
||||
|
||||
type GRPCBroker_StartStreamServer interface {
|
||||
Send(*ConnInfo) error
|
||||
Recv() (*ConnInfo, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type gRPCBrokerStartStreamServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *gRPCBrokerStartStreamServer) Send(m *ConnInfo) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *gRPCBrokerStartStreamServer) Recv() (*ConnInfo, error) {
|
||||
m := new(ConnInfo)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "plugin.GRPCBroker",
|
||||
HandlerType: (*GRPCBrokerServer)(nil),
|
||||
Methods: []grpc.MethodDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "StartStream",
|
||||
Handler: _GRPCBroker_StartStream_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "grpc_broker.proto",
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 170 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
|
||||
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b,
|
||||
0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x53, 0x8a, 0xe5, 0xe2, 0x70, 0xce, 0xcf, 0xcb, 0xf3, 0xcc, 0x4b,
|
||||
0xcb, 0x17, 0x92, 0xe5, 0xe2, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x8d, 0xcf, 0x4c, 0x91,
|
||||
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0d, 0xe2, 0x84, 0x8a, 0x78, 0xa6, 0x08, 0x49, 0x70, 0xb1, 0xe7,
|
||||
0xa5, 0x96, 0x94, 0xe7, 0x17, 0x65, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x20,
|
||||
0x99, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0x62, 0x09, 0x66, 0x88, 0x0c, 0x94, 0x6b, 0xe4, 0xcc,
|
||||
0xc5, 0xe5, 0x1e, 0x14, 0xe0, 0xec, 0x04, 0xb6, 0x5a, 0xc8, 0x94, 0x8b, 0x3b, 0xb8, 0x24, 0xb1,
|
||||
0xa8, 0x24, 0xb8, 0xa4, 0x28, 0x35, 0x31, 0x57, 0x48, 0x40, 0x0f, 0xe2, 0x08, 0x3d, 0x98, 0x0b,
|
||||
0xa4, 0x30, 0x44, 0x34, 0x18, 0x0d, 0x18, 0x93, 0xd8, 0xc0, 0x4e, 0x36, 0x06, 0x04, 0x00, 0x00,
|
||||
0xff, 0xff, 0x7b, 0x5d, 0xfb, 0xe1, 0xc7, 0x00, 0x00, 0x00,
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
syntax = "proto3";
|
||||
package plugin;
|
||||
|
||||
message ConnInfo {
|
||||
uint32 service_id = 1;
|
||||
string network = 2;
|
||||
string address = 3;
|
||||
}
|
||||
|
||||
service GRPCBroker {
|
||||
rpc StartStream(stream ConnInfo) returns (stream ConnInfo);
|
||||
}
|
||||
|
||||
|
|
@ -1,7 +1,10 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -9,14 +12,12 @@ import (
|
|||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
// newGRPCClient creates a new GRPCClient. The Client argument is expected
|
||||
// to be successfully started already with a lock held.
|
||||
func newGRPCClient(c *Client) (*GRPCClient, error) {
|
||||
func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
|
||||
// Build dialing options.
|
||||
opts := make([]grpc.DialOption, 0, 5)
|
||||
|
||||
// We use a custom dialer so that we can connect over unix domain sockets
|
||||
opts = append(opts, grpc.WithDialer(c.dialer))
|
||||
opts = append(opts, grpc.WithDialer(dialer))
|
||||
|
||||
// go-plugin expects to block the connection
|
||||
opts = append(opts, grpc.WithBlock())
|
||||
|
@ -26,11 +27,11 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
|
|||
|
||||
// If we have no TLS configuration set, we need to explicitly tell grpc
|
||||
// that we're connecting with an insecure connection.
|
||||
if c.config.TLSConfig == nil {
|
||||
if tls == nil {
|
||||
opts = append(opts, grpc.WithInsecure())
|
||||
} else {
|
||||
opts = append(opts, grpc.WithTransportCredentials(
|
||||
credentials.NewTLS(c.config.TLSConfig)))
|
||||
credentials.NewTLS(tls)))
|
||||
}
|
||||
|
||||
// Connect. Note the first parameter is unused because we use a custom
|
||||
|
@ -40,9 +41,28 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// newGRPCClient creates a new GRPCClient. The Client argument is expected
|
||||
// to be successfully started already with a lock held.
|
||||
func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
|
||||
conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start the broker.
|
||||
brokerGRPCClient := newGRPCBrokerClient(conn)
|
||||
broker := newGRPCBroker(brokerGRPCClient, c.config.TLSConfig)
|
||||
go broker.Run()
|
||||
go brokerGRPCClient.StartStream()
|
||||
|
||||
return &GRPCClient{
|
||||
Conn: conn,
|
||||
Plugins: c.config.Plugins,
|
||||
doneCtx: doneCtx,
|
||||
broker: broker,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -50,10 +70,14 @@ func newGRPCClient(c *Client) (*GRPCClient, error) {
|
|||
type GRPCClient struct {
|
||||
Conn *grpc.ClientConn
|
||||
Plugins map[string]Plugin
|
||||
|
||||
doneCtx context.Context
|
||||
broker *GRPCBroker
|
||||
}
|
||||
|
||||
// ClientProtocol impl.
|
||||
func (c *GRPCClient) Close() error {
|
||||
c.broker.Close()
|
||||
return c.Conn.Close()
|
||||
}
|
||||
|
||||
|
@ -69,7 +93,7 @@ func (c *GRPCClient) Dispense(name string) (interface{}, error) {
|
|||
return nil, fmt.Errorf("plugin %q doesn't support gRPC", name)
|
||||
}
|
||||
|
||||
return p.GRPCClient(c.Conn)
|
||||
return p.GRPCClient(c.doneCtx, c.broker, c.Conn)
|
||||
}
|
||||
|
||||
// ClientProtocol impl.
|
||||
|
|
|
@ -51,6 +51,7 @@ type GRPCServer struct {
|
|||
|
||||
config GRPCServerConfig
|
||||
server *grpc.Server
|
||||
broker *GRPCBroker
|
||||
}
|
||||
|
||||
// ServerProtocol impl.
|
||||
|
@ -68,14 +69,20 @@ func (s *GRPCServer) Init() error {
|
|||
GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
|
||||
grpc_health_v1.RegisterHealthServer(s.server, healthCheck)
|
||||
|
||||
// Register the broker service
|
||||
brokerServer := newGRPCBrokerServer()
|
||||
RegisterGRPCBrokerServer(s.server, brokerServer)
|
||||
s.broker = newGRPCBroker(brokerServer, s.TLS)
|
||||
go s.broker.Run()
|
||||
|
||||
// Register all our plugins onto the gRPC server.
|
||||
for k, raw := range s.Plugins {
|
||||
p, ok := raw.(GRPCPlugin)
|
||||
if !ok {
|
||||
return fmt.Errorf("%q is not a GRPC-compatibile plugin", k)
|
||||
return fmt.Errorf("%q is not a GRPC-compatible plugin", k)
|
||||
}
|
||||
|
||||
if err := p.GRPCServer(s.server); err != nil {
|
||||
if err := p.GRPCServer(s.broker, s.server); err != nil {
|
||||
return fmt.Errorf("error registring %q: %s", k, err)
|
||||
}
|
||||
}
|
||||
|
@ -83,6 +90,16 @@ func (s *GRPCServer) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Stop calls Stop on the underlying grpc.Server
|
||||
func (s *GRPCServer) Stop() {
|
||||
s.server.Stop()
|
||||
}
|
||||
|
||||
// GracefulStop calls GracefulStop on the underlying grpc.Server
|
||||
func (s *GRPCServer) GracefulStop() {
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
|
||||
// Config is the GRPCServerConfig encoded as JSON then base64.
|
||||
func (s *GRPCServer) Config() string {
|
||||
// Create a buffer that will contain our final contents
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/rpc"
|
||||
|
||||
|
@ -33,11 +34,12 @@ type GRPCPlugin interface {
|
|||
// GRPCServer should register this plugin for serving with the
|
||||
// given GRPCServer. Unlike Plugin.Server, this is only called once
|
||||
// since gRPC plugins serve singletons.
|
||||
GRPCServer(*grpc.Server) error
|
||||
GRPCServer(*GRPCBroker, *grpc.Server) error
|
||||
|
||||
// GRPCClient should return the interface implementation for the plugin
|
||||
// you're serving via gRPC.
|
||||
GRPCClient(*grpc.ClientConn) (interface{}, error)
|
||||
// you're serving via gRPC. The provided context will be canceled by
|
||||
// go-plugin in the event of the plugin process exiting.
|
||||
GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error)
|
||||
}
|
||||
|
||||
// NetRPCUnsupportedPlugin implements Plugin but returns errors for the
|
||||
|
|
|
@ -66,6 +66,10 @@ type ServeConfig struct {
|
|||
// the gRPC health checking service. This is not optional since go-plugin
|
||||
// relies on this to implement Ping().
|
||||
GRPCServer func([]grpc.ServerOption) *grpc.Server
|
||||
|
||||
// Logger is used to pass a logger into the server. If none is provided the
|
||||
// server will create a default logger.
|
||||
Logger hclog.Logger
|
||||
}
|
||||
|
||||
// Protocol returns the protocol that this server should speak.
|
||||
|
@ -106,12 +110,15 @@ func Serve(opts *ServeConfig) {
|
|||
// Logging goes to the original stderr
|
||||
log.SetOutput(os.Stderr)
|
||||
|
||||
// internal logger to os.Stderr
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Level: hclog.Trace,
|
||||
Output: os.Stderr,
|
||||
JSONFormat: true,
|
||||
})
|
||||
logger := opts.Logger
|
||||
if logger == nil {
|
||||
// internal logger to os.Stderr
|
||||
logger = hclog.New(&hclog.LoggerOptions{
|
||||
Level: hclog.Trace,
|
||||
Output: os.Stderr,
|
||||
JSONFormat: true,
|
||||
})
|
||||
}
|
||||
|
||||
// Create our new stdout, stderr files. These will override our built-in
|
||||
// stdout/stderr so that it works across the stream boundary.
|
||||
|
|
|
@ -2,6 +2,7 @@ package plugin
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net"
|
||||
"net/rpc"
|
||||
|
||||
|
@ -77,6 +78,35 @@ func TestPluginRPCConn(t testing.T, ps map[string]Plugin) (*RPCClient, *RPCServe
|
|||
return client, server
|
||||
}
|
||||
|
||||
// TestGRPCConn returns a gRPC client conn and grpc server that are connected
|
||||
// together and configured. The register function is used to register services
|
||||
// prior to the Serve call. This is used to test gRPC connections.
|
||||
func TestGRPCConn(t testing.T, register func(*grpc.Server)) (*grpc.ClientConn, *grpc.Server) {
|
||||
// Create a listener
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
server := grpc.NewServer()
|
||||
register(server)
|
||||
go server.Serve(l)
|
||||
|
||||
// Connect to the server
|
||||
conn, err := grpc.Dial(
|
||||
l.Addr().String(),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Connection successful, close the listener
|
||||
l.Close()
|
||||
|
||||
return conn, server
|
||||
}
|
||||
|
||||
// TestPluginGRPCConn returns a plugin gRPC client and server that are connected
|
||||
// together and configured. This is used to test gRPC connections.
|
||||
func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCServer) {
|
||||
|
@ -107,13 +137,17 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Connection successful, close the listener
|
||||
l.Close()
|
||||
brokerGRPCClient := newGRPCBrokerClient(conn)
|
||||
broker := newGRPCBroker(brokerGRPCClient, nil)
|
||||
go broker.Run()
|
||||
go brokerGRPCClient.StartStream()
|
||||
|
||||
// Create the client
|
||||
client := &GRPCClient{
|
||||
Conn: conn,
|
||||
Plugins: ps,
|
||||
broker: broker,
|
||||
doneCtx: context.Background(),
|
||||
}
|
||||
|
||||
return client, server
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -0,0 +1,73 @@
|
|||
# run
|
||||
|
||||
[![GoDoc](https://godoc.org/github.com/oklog/run?status.svg)](https://godoc.org/github.com/oklog/run)
|
||||
[![Build Status](https://travis-ci.org/oklog/run.svg?branch=master)](https://travis-ci.org/oklog/run)
|
||||
[![Go Report Card](https://goreportcard.com/badge/github.com/oklog/run)](https://goreportcard.com/report/github.com/oklog/run)
|
||||
[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/oklog/run/master/LICENSE)
|
||||
|
||||
run.Group is a universal mechanism to manage goroutine lifecycles.
|
||||
|
||||
Create a zero-value run.Group, and then add actors to it. Actors are defined as
|
||||
a pair of functions: an **execute** function, which should run synchronously;
|
||||
and an **interrupt** function, which, when invoked, should cause the execute
|
||||
function to return. Finally, invoke Run, which blocks until the first actor
|
||||
returns. This general-purpose API allows callers to model pretty much any
|
||||
runnable task, and achieve well-defined lifecycle semantics for the group.
|
||||
|
||||
run.Group was written to manage component lifecycles in func main for
|
||||
[OK Log](https://github.com/oklog/oklog).
|
||||
But it's useful in any circumstance where you need to orchestrate multiple
|
||||
goroutines as a unit whole.
|
||||
[Click here](https://www.youtube.com/watch?v=LHe1Cb_Ud_M&t=15m45s) to see a
|
||||
video of a talk where run.Group is described.
|
||||
|
||||
## Examples
|
||||
|
||||
### context.Context
|
||||
|
||||
```go
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
g.Add(func() error {
|
||||
return myProcess(ctx, ...)
|
||||
}, func(error) {
|
||||
cancel()
|
||||
})
|
||||
```
|
||||
|
||||
### net.Listener
|
||||
|
||||
```go
|
||||
ln, _ := net.Listen("tcp", ":8080")
|
||||
g.Add(func() error {
|
||||
return http.Serve(ln, nil)
|
||||
}, func(error) {
|
||||
ln.Close()
|
||||
})
|
||||
```
|
||||
|
||||
### io.ReadCloser
|
||||
|
||||
```go
|
||||
var conn io.ReadCloser = ...
|
||||
g.Add(func() error {
|
||||
s := bufio.NewScanner(conn)
|
||||
for s.Scan() {
|
||||
println(s.Text())
|
||||
}
|
||||
return s.Err()
|
||||
}, func(error) {
|
||||
conn.Close()
|
||||
})
|
||||
```
|
||||
|
||||
## Comparisons
|
||||
|
||||
Package run is somewhat similar to package
|
||||
[errgroup](https://godoc.org/golang.org/x/sync/errgroup),
|
||||
except it doesn't require actor goroutines to understand context semantics.
|
||||
|
||||
It's somewhat similar to package
|
||||
[tomb.v1](https://godoc.org/gopkg.in/tomb.v1) or
|
||||
[tomb.v2](https://godoc.org/gopkg.in/tomb.v2),
|
||||
except it has a much smaller API surface, delegating e.g. staged shutdown of
|
||||
goroutines to the caller.
|
|
@ -0,0 +1,62 @@
|
|||
// Package run implements an actor-runner with deterministic teardown. It is
|
||||
// somewhat similar to package errgroup, except it does not require actor
|
||||
// goroutines to understand context semantics. This makes it suitable for use in
|
||||
// more circumstances; for example, goroutines which are handling connections
|
||||
// from net.Listeners, or scanning input from a closable io.Reader.
|
||||
package run
|
||||
|
||||
// Group collects actors (functions) and runs them concurrently.
|
||||
// When one actor (function) returns, all actors are interrupted.
|
||||
// The zero value of a Group is useful.
|
||||
type Group struct {
|
||||
actors []actor
|
||||
}
|
||||
|
||||
// Add an actor (function) to the group. Each actor must be pre-emptable by an
|
||||
// interrupt function. That is, if interrupt is invoked, execute should return.
|
||||
// Also, it must be safe to call interrupt even after execute has returned.
|
||||
//
|
||||
// The first actor (function) to return interrupts all running actors.
|
||||
// The error is passed to the interrupt functions, and is returned by Run.
|
||||
func (g *Group) Add(execute func() error, interrupt func(error)) {
|
||||
g.actors = append(g.actors, actor{execute, interrupt})
|
||||
}
|
||||
|
||||
// Run all actors (functions) concurrently.
|
||||
// When the first actor returns, all others are interrupted.
|
||||
// Run only returns when all actors have exited.
|
||||
// Run returns the error returned by the first exiting actor.
|
||||
func (g *Group) Run() error {
|
||||
if len(g.actors) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run each actor.
|
||||
errors := make(chan error, len(g.actors))
|
||||
for _, a := range g.actors {
|
||||
go func(a actor) {
|
||||
errors <- a.execute()
|
||||
}(a)
|
||||
}
|
||||
|
||||
// Wait for the first actor to stop.
|
||||
err := <-errors
|
||||
|
||||
// Signal all actors to stop.
|
||||
for _, a := range g.actors {
|
||||
a.interrupt(err)
|
||||
}
|
||||
|
||||
// Wait for all actors to stop.
|
||||
for i := 1; i < cap(errors); i++ {
|
||||
<-errors
|
||||
}
|
||||
|
||||
// Return the original error.
|
||||
return err
|
||||
}
|
||||
|
||||
type actor struct {
|
||||
execute func() error
|
||||
interrupt func(error)
|
||||
}
|
|
@ -1572,10 +1572,12 @@
|
|||
"revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "R6me0jVmcT/OPo80Fe0qo5fRwHc=",
|
||||
"checksumSHA1": "y3op+t01flBlSBKlzUNqH5d4XHQ=",
|
||||
"path": "github.com/hashicorp/go-plugin",
|
||||
"revision": "a5174f84d7f8ff00fb07ab4ef1f380d32eee0e63",
|
||||
"revisionTime": "2017-08-16T15:18:19Z"
|
||||
"revision": "e53f54cbf51efde642d4711313e829a1ff0c236d",
|
||||
"revisionTime": "2018-01-25T19:04:38Z",
|
||||
"version": "master",
|
||||
"versionExact": "master"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ErJHGU6AVPZM9yoY/xV11TwSjQs=",
|
||||
|
@ -2068,6 +2070,12 @@
|
|||
"revision": "179d4d0c4d8d407a32af483c2354df1d2c91e6c3",
|
||||
"revisionTime": "2013-12-21T20:05:32Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "Sfxv8SV6j8m6YD+hwvlMJjq2zfg=",
|
||||
"path": "github.com/oklog/run",
|
||||
"revision": "4dadeb3030eda0273a12382bb2348ffc7c9d1a39",
|
||||
"revisionTime": "2017-11-14T00:29:35Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "dIMJD0AZwSwmuOuTaGgqWZkzuPU=",
|
||||
"path": "github.com/packer-community/winrmcp",
|
||||
|
|
Loading…
Reference in New Issue