Merge pull request #334 from hashicorp/f-plugin-rpc

Plugin connections multiplexed on one process
This commit is contained in:
Mitchell Hashimoto 2014-09-28 11:56:36 -07:00
commit 92a0872eb0
37 changed files with 646 additions and 221 deletions

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/providers/aws"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(aws.ResourceProvider))
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return new(aws.ResourceProvider)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/providers/cloudflare"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(cloudflare.ResourceProvider))
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return new(cloudflare.ResourceProvider)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/providers/consul"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(consul.ResourceProvider))
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return new(consul.ResourceProvider)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/providers/digitalocean"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(digitalocean.ResourceProvider))
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return new(digitalocean.ResourceProvider)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/providers/dnsimple"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(dnsimple.ResourceProvider))
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return new(dnsimple.ResourceProvider)
},
})
}

View File

@ -6,5 +6,7 @@ import (
)
func main() {
plugin.Serve(google.Provider())
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: google.Provider,
})
}

View File

@ -6,5 +6,7 @@ import (
)
func main() {
plugin.Serve(heroku.Provider())
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: heroku.Provider,
})
}

View File

@ -6,5 +6,7 @@ import (
)
func main() {
plugin.Serve(mailgun.Provider())
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: mailgun.Provider,
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/provisioners/file"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(file.ResourceProvisioner))
plugin.Serve(&plugin.ServeOpts{
ProvisionerFunc: func() terraform.ResourceProvisioner {
return new(file.ResourceProvisioner)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/provisioners/local-exec"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(localexec.ResourceProvisioner))
plugin.Serve(&plugin.ServeOpts{
ProvisionerFunc: func() terraform.ResourceProvisioner {
return new(localexec.ResourceProvisioner)
},
})
}

View File

@ -3,8 +3,13 @@ package main
import (
"github.com/hashicorp/terraform/builtin/provisioners/remote-exec"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/terraform"
)
func main() {
plugin.Serve(new(remoteexec.ResourceProvisioner))
plugin.Serve(&plugin.ServeOpts{
ProvisionerFunc: func() terraform.ResourceProvisioner {
return new(remoteexec.ResourceProvisioner)
},
})
}

View File

@ -2,10 +2,11 @@ package google
import (
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
)
// Provider returns a terraform.ResourceProvider.
func Provider() *schema.Provider {
func Provider() terraform.ResourceProvider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"account_file": &schema.Schema{

View File

@ -298,7 +298,7 @@ func resourceFirewall(
var targetTags []string
if v := d.Get("target_tags").(*schema.Set); v.Len() > 0 {
targetTags = make([]string, v.Len())
for i, v:= range v.List() {
for i, v := range v.List() {
targetTags[i] = v.(string)
}
}

View File

@ -4,11 +4,12 @@ import (
"log"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
"github.com/mitchellh/mapstructure"
)
// Provider returns a terraform.ResourceProvider.
func Provider() *schema.Provider {
func Provider() terraform.ResourceProvider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"email": &schema.Schema{

View File

@ -4,11 +4,12 @@ import (
"log"
"github.com/hashicorp/terraform/helper/schema"
"github.com/hashicorp/terraform/terraform"
"github.com/mitchellh/mapstructure"
)
// Provider returns a terraform.ResourceProvider.
func Provider() *schema.Provider {
func Provider() terraform.ResourceProvider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"api_key": &schema.Schema{

View File

@ -108,4 +108,3 @@ func TestGet_update(t *testing.T) {
t.Fatalf("doesn't look like get: %s", output)
}
}

View File

@ -11,7 +11,6 @@ import (
"github.com/hashicorp/hcl"
"github.com/hashicorp/terraform/plugin"
"github.com/hashicorp/terraform/rpc"
"github.com/hashicorp/terraform/terraform"
"github.com/mitchellh/osext"
)
@ -197,29 +196,21 @@ func (c *Config) ProviderFactories() map[string]terraform.ResourceProviderFactor
}
func (c *Config) providerFactory(path string) terraform.ResourceProviderFactory {
return func() (terraform.ResourceProvider, error) {
// Build the plugin client configuration and init the plugin
var config plugin.ClientConfig
config.Cmd = pluginCmd(path)
config.Managed = true
client := plugin.NewClient(&config)
// Request the RPC client and service name from the client
return func() (terraform.ResourceProvider, error) {
// Request the RPC client so we can get the provider
// so we can build the actual RPC-implemented provider.
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
service, err := client.Service()
if err != nil {
return nil, err
}
return &rpc.ResourceProvider{
Client: rpcClient,
Name: service,
}, nil
return rpcClient.ResourceProvider()
}
}
@ -236,29 +227,19 @@ func (c *Config) ProvisionerFactories() map[string]terraform.ResourceProvisioner
}
func (c *Config) provisionerFactory(path string) terraform.ResourceProvisionerFactory {
return func() (terraform.ResourceProvisioner, error) {
// Build the plugin client configuration and init the plugin
var config plugin.ClientConfig
config.Cmd = pluginCmd(path)
config.Managed = true
client := plugin.NewClient(&config)
// Request the RPC client and service name from the client
// so we can build the actual RPC-implemented provider.
return func() (terraform.ResourceProvisioner, error) {
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
service, err := client.Service()
if err != nil {
return nil, err
}
return &rpc.ResourceProvisioner{
Client: rpcClient,
Name: service,
}, nil
return rpcClient.ResourceProvisioner()
}
}

View File

@ -59,7 +59,7 @@ func (d *GitHubDetector) detectSSH(src string) (string, bool, error) {
u.Scheme = "ssh"
u.User = url.User("git")
u.Host = "github.com"
u.Path = src[idx+1:qidx]
u.Path = src[idx+1 : qidx]
if qidx < len(src) {
q, err := url.ParseQuery(src[qidx+1:])
if err != nil {
@ -69,5 +69,5 @@ func (d *GitHubDetector) detectSSH(src string) (string, bool, error) {
u.RawQuery = q.Encode()
}
return "git::"+u.String(), true, nil
return "git::" + u.String(), true, nil
}

View File

@ -39,7 +39,7 @@ func (g *HgGetter) Get(dst string, u *url.URL) error {
}
}
if err:= g.pull(dst, u); err != nil {
if err := g.pull(dst, u); err != nil {
return err
}

View File

@ -34,7 +34,7 @@ func TestGet_file(t *testing.T) {
func TestGet_fileForced(t *testing.T) {
dst := tempDir(t)
u := testModule("basic")
u = "file::"+u
u = "file::" + u
if err := Get(dst, u); err != nil {
t.Fatalf("err: %s", err)

View File

@ -2,9 +2,9 @@ package module
import (
"bufio"
"path/filepath"
"bytes"
"fmt"
"path/filepath"
"strings"
"sync"

View File

@ -8,7 +8,6 @@ import (
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"os/exec"
"path/filepath"
@ -16,6 +15,8 @@ import (
"sync"
"time"
"unicode"
tfrpc "github.com/hashicorp/terraform/rpc"
)
// If this is true, then the "unexpected EOF" panic will not be
@ -35,8 +36,7 @@ type Client struct {
doneLogging chan struct{}
l sync.Mutex
address net.Addr
service string
client *rpc.Client
client *tfrpc.Client
}
// ClientConfig is the configuration used to initialize a new
@ -124,7 +124,7 @@ func NewClient(config *ClientConfig) (c *Client) {
// Client returns an RPC client for the plugin.
//
// Subsequent calls to this will return the same RPC client.
func (c *Client) Client() (*rpc.Client, error) {
func (c *Client) Client() (*tfrpc.Client, error) {
addr, err := c.Start()
if err != nil {
return nil, err
@ -137,17 +137,11 @@ func (c *Client) Client() (*rpc.Client, error) {
return c.client, nil
}
conn, err := net.Dial(addr.Network(), addr.String())
c.client, err = tfrpc.Dial(addr.Network(), addr.String())
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}
c.client = rpc.NewClient(conn)
return c.client, nil
}
@ -177,15 +171,6 @@ func (c *Client) Kill() {
<-c.doneLogging
}
// Service returns the name of the service to use.
func (c *Client) Service() (string, error) {
if _, err := c.Start(); err != nil {
return "", err
}
return c.service, nil
}
// Starts the underlying subprocess, communicating with it to negotiate
// a port for RPC connections, and returning the address to connect via RPC.
//
@ -306,8 +291,8 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Trim the line and split by "|" in order to get the parts of
// the output.
line := strings.TrimSpace(string(lineBytes))
parts := strings.SplitN(line, "|", 4)
if len(parts) < 4 {
parts := strings.SplitN(line, "|", 3)
if len(parts) < 3 {
err = fmt.Errorf("Unrecognized remote plugin message: %s", line)
return
}
@ -327,9 +312,6 @@ func (c *Client) Start() (addr net.Addr, err error) {
default:
err = fmt.Errorf("Unknown address type: %s", parts[1])
}
// Grab the services
c.service = parts[3]
}
c.address = addr

View File

@ -28,14 +28,6 @@ func TestClient(t *testing.T) {
t.Fatalf("bad: %#v", addr)
}
service, err := c.Service()
if err != nil {
t.Fatalf("err: %s", err)
}
if service != "foo" {
t.Fatalf("bad: %#v", service)
}
// Test that it exits properly if killed
c.Kill()

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
tfrpc "github.com/hashicorp/terraform/rpc"
"github.com/hashicorp/terraform/terraform"
)
@ -52,34 +53,31 @@ func TestHelperProcess(*testing.T) {
cmd, args := args[0], args[1:]
switch cmd {
case "bad-version":
fmt.Printf("%s1|tcp|:1234|foo\n", APIVersion)
fmt.Printf("%s1|tcp|:1234\n", APIVersion)
<-make(chan int)
case "resource-provider":
err := Serve(new(terraform.MockResourceProvider))
if err != nil {
log.Printf("[ERR] %s", err)
os.Exit(1)
}
Serve(&ServeOpts{
ProviderFunc: testProviderFixed(new(terraform.MockResourceProvider)),
})
case "resource-provisioner":
err := Serve(new(terraform.MockResourceProvisioner))
if err != nil {
log.Printf("[ERR] %s", err)
os.Exit(1)
}
Serve(&ServeOpts{
ProvisionerFunc: testProvisionerFixed(
new(terraform.MockResourceProvisioner)),
})
case "invalid-rpc-address":
fmt.Println("lolinvalid")
case "mock":
fmt.Printf("%s|tcp|:1234|foo\n", APIVersion)
fmt.Printf("%s|tcp|:1234\n", APIVersion)
<-make(chan int)
case "start-timeout":
time.Sleep(1 * time.Minute)
os.Exit(1)
case "stderr":
fmt.Printf("%s|tcp|:1234|foo\n", APIVersion)
fmt.Printf("%s|tcp|:1234\n", APIVersion)
log.Println("HELLO")
log.Println("WORLD")
case "stdin":
fmt.Printf("%s|tcp|:1234|foo\n", APIVersion)
fmt.Printf("%s|tcp|:1234\n", APIVersion)
data := make([]byte, 5)
if _, err := os.Stdin.Read(data); err != nil {
log.Printf("stdin read error: %s", err)
@ -96,3 +94,15 @@ func TestHelperProcess(*testing.T) {
os.Exit(2)
}
}
func testProviderFixed(p terraform.ResourceProvider) tfrpc.ProviderFunc {
return func() terraform.ResourceProvider {
return p
}
}
func testProvisionerFixed(p terraform.ResourceProvisioner) tfrpc.ProvisionerFunc {
return func() terraform.ResourceProvisioner {
return p
}
}

View File

@ -1,35 +0,0 @@
package plugin
import (
"os/exec"
tfrpc "github.com/hashicorp/terraform/rpc"
"github.com/hashicorp/terraform/terraform"
)
// ResourceProviderFactory returns a Terraform ResourceProviderFactory
// that executes a plugin and connects to it.
func ResourceProviderFactory(cmd *exec.Cmd) terraform.ResourceProviderFactory {
return func() (terraform.ResourceProvider, error) {
config := &ClientConfig{
Cmd: cmd,
Managed: true,
}
client := NewClient(config)
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
rpcName, err := client.Service()
if err != nil {
return nil, err
}
return &tfrpc.ResourceProvider{
Client: rpcClient,
Name: rpcName,
}, nil
}
}

View File

@ -12,12 +12,4 @@ func TestResourceProvider(t *testing.T) {
if err != nil {
t.Fatalf("should not have error: %s", err)
}
service, err := c.Service()
if err != nil {
t.Fatalf("err: %s", err)
}
if service == "" {
t.Fatal("service should not be blank")
}
}

View File

@ -1,35 +0,0 @@
package plugin
import (
"os/exec"
tfrpc "github.com/hashicorp/terraform/rpc"
"github.com/hashicorp/terraform/terraform"
)
// ResourceProvisionerFactory returns a Terraform ResourceProvisionerFactory
// that executes a plugin and connects to it.
func ResourceProvisionerFactory(cmd *exec.Cmd) terraform.ResourceProvisionerFactory {
return func() (terraform.ResourceProvisioner, error) {
config := &ClientConfig{
Cmd: cmd,
Managed: true,
}
client := NewClient(config)
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
rpcName, err := client.Service()
if err != nil {
return nil, err
}
return &tfrpc.ResourceProvisioner{
Client: rpcClient,
Name: rpcName,
}, nil
}
}

View File

@ -12,12 +12,4 @@ func TestResourceProvisioner(t *testing.T) {
if err != nil {
t.Fatalf("should not have error: %s", err)
}
service, err := c.Service()
if err != nil {
t.Fatalf("err: %s", err)
}
if service == "" {
t.Fatal("service should not be blank")
}
}

View File

@ -6,7 +6,6 @@ import (
"io/ioutil"
"log"
"net"
"net/rpc"
"os"
"os/signal"
"runtime"
@ -27,7 +26,17 @@ const APIVersion = "2"
const MagicCookieKey = "TF_PLUGIN_MAGIC_COOKIE"
const MagicCookieValue = "d602bf8f470bc67ca7faa0386276bbdd4330efaf76d1a219cb4d6991ca9872b2"
func Serve(svc interface{}) error {
// ServeOpts configures what sorts of plugins are served.
type ServeOpts struct {
ProviderFunc tfrpc.ProviderFunc
ProvisionerFunc tfrpc.ProvisionerFunc
}
// Serve serves the plugins given by ServeOpts.
//
// Serve doesn't return until the plugin is done being executed. Any
// errors will be outputted to the log.
func Serve(opts *ServeOpts) {
// First check the cookie
if os.Getenv(MagicCookieKey) != MagicCookieValue {
fmt.Fprintf(os.Stderr,
@ -37,40 +46,30 @@ func Serve(svc interface{}) error {
os.Exit(1)
}
// Create the server to serve our interface
server := rpc.NewServer()
// Register the service
name, err := tfrpc.Register(server, svc)
if err != nil {
return err
}
// Register a listener so we can accept a connection
listener, err := serverListener()
if err != nil {
return err
log.Printf("[ERR] plugin init: %s", err)
return
}
defer listener.Close()
// Output the address and service name to stdout
// Create the RPC server to dispense
server := &tfrpc.Server{
ProviderFunc: opts.ProviderFunc,
ProvisionerFunc: opts.ProvisionerFunc,
}
// Output the address and service name to stdout so that Terraform
// core can bring it up.
log.Printf("Plugin address: %s %s\n",
listener.Addr().Network(), listener.Addr().String())
fmt.Printf("%s|%s|%s|%s\n",
fmt.Printf("%s|%s|%s\n",
APIVersion,
listener.Addr().Network(),
listener.Addr().String(),
name)
listener.Addr().String())
os.Stdout.Sync()
// Accept a connection
log.Println("Waiting for connection...")
conn, err := listener.Accept()
if err != nil {
log.Printf("Error accepting connection: %s\n", err.Error())
return err
}
// Eat the interrupts
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
@ -85,10 +84,8 @@ func Serve(svc interface{}) error {
}
}()
// Serve a single connection
log.Println("Serving a plugin connection...")
server.ServeConn(conn)
return nil
// Serve
server.Accept(listener)
}
func serverListener() (net.Listener, error) {

106
rpc/client.go Normal file
View File

@ -0,0 +1,106 @@
package rpc
import (
"io"
"net"
"net/rpc"
"github.com/hashicorp/terraform/terraform"
"github.com/hashicorp/yamux"
)
// Client connects to a Server in order to request plugin implementations
// for Terraform.
type Client struct {
broker *muxBroker
control *rpc.Client
}
// Dial opens a connection to a Terraform RPC server and returns a client.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
if tcpConn, ok := conn.(*net.TCPConn); ok {
// Make sure to set keep alive so that the connection doesn't die
tcpConn.SetKeepAlive(true)
}
return NewClient(conn)
}
// NewClient creates a client from an already-open connection-like value.
// Dial is typically used instead.
func NewClient(conn io.ReadWriteCloser) (*Client, error) {
// Create the yamux client so we can multiplex
mux, err := yamux.Client(conn, nil)
if err != nil {
conn.Close()
return nil, err
}
// Connect to the control stream.
control, err := mux.Open()
if err != nil {
mux.Close()
return nil, err
}
// Create the broker and start it up
broker := newMuxBroker(mux)
go broker.Run()
// Build the client using our broker and control channel.
return &Client{
broker: broker,
control: rpc.NewClient(control),
}, nil
}
// Close closes the connection. The client is no longer usable after this
// is called.
func (c *Client) Close() error {
if err := c.control.Close(); err != nil {
return err
}
return c.broker.Close()
}
func (c *Client) ResourceProvider() (terraform.ResourceProvider, error) {
var id uint32
if err := c.control.Call(
"Dispenser.ResourceProvider", new(interface{}), &id); err != nil {
return nil, err
}
conn, err := c.broker.Dial(id)
if err != nil {
return nil, err
}
return &ResourceProvider{
Client: rpc.NewClient(conn),
Name: "ResourceProvider",
}, nil
}
func (c *Client) ResourceProvisioner() (terraform.ResourceProvisioner, error) {
var id uint32
if err := c.control.Call(
"Dispenser.ResourceProvisioner", new(interface{}), &id); err != nil {
return nil, err
}
conn, err := c.broker.Dial(id)
if err != nil {
return nil, err
}
return &ResourceProvisioner{
Client: rpc.NewClient(conn),
Name: "ResourceProvisioner",
}, nil
}

75
rpc/client_test.go Normal file
View File

@ -0,0 +1,75 @@
package rpc
import (
"reflect"
"testing"
"github.com/hashicorp/terraform/terraform"
)
func TestClient_ResourceProvider(t *testing.T) {
clientConn, serverConn := testConn(t)
p := new(terraform.MockResourceProvider)
server := &Server{ProviderFunc: testProviderFixed(p)}
go server.ServeConn(serverConn)
client, err := NewClient(clientConn)
if err != nil {
t.Fatalf("err: %s", err)
}
defer client.Close()
provider, err := client.ResourceProvider()
if err != nil {
t.Fatalf("err: %s", err)
}
// Configure
config := &terraform.ResourceConfig{
Raw: map[string]interface{}{"foo": "bar"},
}
e := provider.Configure(config)
if !p.ConfigureCalled {
t.Fatal("configure should be called")
}
if !reflect.DeepEqual(p.ConfigureConfig, config) {
t.Fatalf("bad: %#v", p.ConfigureConfig)
}
if e != nil {
t.Fatalf("bad: %#v", e)
}
}
func TestClient_ResourceProvisioner(t *testing.T) {
clientConn, serverConn := testConn(t)
p := new(terraform.MockResourceProvisioner)
server := &Server{ProvisionerFunc: testProvisionerFixed(p)}
go server.ServeConn(serverConn)
client, err := NewClient(clientConn)
if err != nil {
t.Fatalf("err: %s", err)
}
defer client.Close()
provisioner, err := client.ResourceProvisioner()
if err != nil {
t.Fatalf("err: %s", err)
}
// Apply
state := &terraform.InstanceState{}
conf := &terraform.ResourceConfig{}
err = provisioner.Apply(state, conf)
if !p.ApplyCalled {
t.Fatal("apply should be called")
}
if !reflect.DeepEqual(p.ApplyConfig, conf) {
t.Fatalf("bad: %#v", p.ApplyConfig)
}
if err != nil {
t.Fatalf("bad: %#v", err)
}
}

172
rpc/mux_broker.go Normal file
View File

@ -0,0 +1,172 @@
package rpc
import (
"encoding/binary"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/yamux"
)
// muxBroker is responsible for brokering multiplexed connections by unique ID.
//
// This allows a plugin to request a channel with a specific ID to connect to
// or accept a connection from, and the broker handles the details of
// holding these channels open while they're being negotiated.
type muxBroker struct {
nextId uint32
session *yamux.Session
streams map[uint32]*muxBrokerPending
sync.Mutex
}
type muxBrokerPending struct {
ch chan net.Conn
doneCh chan struct{}
}
func newMuxBroker(s *yamux.Session) *muxBroker {
return &muxBroker{
session: s,
streams: make(map[uint32]*muxBrokerPending),
}
}
// Accept accepts a connection by ID.
//
// This should not be called multiple times with the same ID at one time.
func (m *muxBroker) Accept(id uint32) (net.Conn, error) {
var c net.Conn
p := m.getStream(id)
select {
case c = <-p.ch:
close(p.doneCh)
case <-time.After(5 * time.Second):
m.Lock()
defer m.Unlock()
delete(m.streams, id)
return nil, fmt.Errorf("timeout waiting for accept")
}
// Ack our connection
if err := binary.Write(c, binary.LittleEndian, id); err != nil {
c.Close()
return nil, err
}
return c, nil
}
// Close closes the connection and all sub-connections.
func (m *muxBroker) Close() error {
return m.session.Close()
}
// Dial opens a connection by ID.
func (m *muxBroker) Dial(id uint32) (net.Conn, error) {
// Open the stream
stream, err := m.session.OpenStream()
if err != nil {
return nil, err
}
// Write the stream ID onto the wire.
if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
stream.Close()
return nil, err
}
// Read the ack that we connected. Then we're off!
var ack uint32
if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
stream.Close()
return nil, err
}
if ack != id {
stream.Close()
return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
}
return stream, nil
}
// NextId returns a unique ID to use next.
func (m *muxBroker) NextId() uint32 {
return atomic.AddUint32(&m.nextId, 1)
}
// Run starts the brokering and should be executed in a goroutine, since it
// blocks forever, or until the session closes.
func (m *muxBroker) Run() {
for {
stream, err := m.session.AcceptStream()
if err != nil {
// Once we receive an error, just exit
break
}
// Read the stream ID from the stream
var id uint32
if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
stream.Close()
continue
}
// Initialize the waiter
p := m.getStream(id)
select {
case p.ch <- stream:
default:
}
// Wait for a timeout
go m.timeoutWait(id, p)
}
}
func (m *muxBroker) getStream(id uint32) *muxBrokerPending {
m.Lock()
defer m.Unlock()
p, ok := m.streams[id]
if ok {
return p
}
m.streams[id] = &muxBrokerPending{
ch: make(chan net.Conn, 1),
doneCh: make(chan struct{}),
}
return m.streams[id]
}
func (m *muxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
// Wait for the stream to either be picked up and connected, or
// for a timeout.
timeout := false
select {
case <-p.doneCh:
case <-time.After(5 * time.Second):
timeout = true
}
m.Lock()
defer m.Unlock()
// Delete the stream so no one else can grab it
delete(m.streams, id)
// If we timed out, then check if we have a channel in the buffer,
// and if so, close it.
if timeout {
select {
case s := <-p.ch:
s.Close()
}
}
}

View File

@ -4,6 +4,8 @@ import (
"net"
"net/rpc"
"testing"
"github.com/hashicorp/terraform/terraform"
)
func testConn(t *testing.T) (net.Conn, net.Conn) {
@ -43,3 +45,15 @@ func testClientServer(t *testing.T) (*rpc.Client, *rpc.Server) {
return client, server
}
func testProviderFixed(p terraform.ResourceProvider) ProviderFunc {
return func() terraform.ResourceProvider {
return p
}
}
func testProvisionerFixed(p terraform.ResourceProvisioner) ProvisionerFunc {
return func() terraform.ResourceProvisioner {
return p
}
}

135
rpc/server.go Normal file
View File

@ -0,0 +1,135 @@
package rpc
import (
"io"
"log"
"net"
"net/rpc"
"github.com/hashicorp/terraform/terraform"
"github.com/hashicorp/yamux"
)
// Server listens for network connections and then dispenses interface
// implementations for Terraform over net/rpc.
type Server struct {
ProviderFunc ProviderFunc
ProvisionerFunc ProvisionerFunc
}
// ProviderFunc creates terraform.ResourceProviders when they're requested
// from the server.
type ProviderFunc func() terraform.ResourceProvider
// ProvisionerFunc creates terraform.ResourceProvisioners when they're requested
// from the server.
type ProvisionerFunc func() terraform.ResourceProvisioner
// Accept accepts connections on a listener and serves requests for
// each incoming connection. Accept blocks; the caller typically invokes
// it in a go statement.
func (s *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Printf("[ERR] plugin server: %s", err)
return
}
go s.ServeConn(conn)
}
}
// ServeConn runs a single connection.
//
// ServeConn blocks, serving the connection until the client hangs up.
func (s *Server) ServeConn(conn io.ReadWriteCloser) {
// First create the yamux server to wrap this connection
mux, err := yamux.Server(conn, nil)
if err != nil {
conn.Close()
log.Printf("[ERR] plugin: %s", err)
return
}
// Accept the control connection
control, err := mux.Accept()
if err != nil {
mux.Close()
log.Printf("[ERR] plugin: %s", err)
return
}
// Create the broker and start it up
broker := newMuxBroker(mux)
go broker.Run()
// Use the control connection to build the dispenser and serve the
// connection.
server := rpc.NewServer()
server.RegisterName("Dispenser", &dispenseServer{
ProviderFunc: s.ProviderFunc,
ProvisionerFunc: s.ProvisionerFunc,
broker: broker,
})
server.ServeConn(control)
}
// dispenseServer dispenses variousinterface implementations for Terraform.
type dispenseServer struct {
ProviderFunc ProviderFunc
ProvisionerFunc ProvisionerFunc
broker *muxBroker
}
func (d *dispenseServer) ResourceProvider(
args interface{}, response *uint32) error {
id := d.broker.NextId()
*response = id
go func() {
conn, err := d.broker.Accept(id)
if err != nil {
log.Printf("[ERR] Plugin dispense: %s", err)
return
}
d.serve(conn, "ResourceProvider", &ResourceProviderServer{
Provider: d.ProviderFunc(),
})
}()
return nil
}
func (d *dispenseServer) ResourceProvisioner(
args interface{}, response *uint32) error {
id := d.broker.NextId()
*response = id
go func() {
conn, err := d.broker.Accept(id)
if err != nil {
log.Printf("[ERR] Plugin dispense: %s", err)
return
}
d.serve(conn, "ResourceProvisioner", &ResourceProvisionerServer{
Provisioner: d.ProvisionerFunc(),
})
}()
return nil
}
func (d *dispenseServer) serve(conn io.ReadWriteCloser, name string, v interface{}) {
server := rpc.NewServer()
if err := server.RegisterName(name, v); err != nil {
log.Printf("[ERR] Plugin dispense: %s", err)
return
}
server.ServeConn(conn)
}

View File

@ -3,10 +3,10 @@ package terraform
import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
"testing"
"sort"
)
func TestContextGraph(t *testing.T) {

View File

@ -59,7 +59,6 @@ func (s *State) Children(path []string) []*ModuleState {
return result
}
// AddModule adds the module with the given path to the state.
//
// This should be the preferred method to add module states since it