diff --git a/builtin/bins/provider-aws/main.go b/builtin/bins/provider-aws/main.go index 813b68b3d..5ff7a4ab2 100644 --- a/builtin/bins/provider-aws/main.go +++ b/builtin/bins/provider-aws/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/providers/aws" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(aws.ResourceProvider)) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: func() terraform.ResourceProvider { + return new(aws.ResourceProvider) + }, + }) } diff --git a/builtin/bins/provider-cloudflare/main.go b/builtin/bins/provider-cloudflare/main.go index 3edd2456f..c81c552e7 100644 --- a/builtin/bins/provider-cloudflare/main.go +++ b/builtin/bins/provider-cloudflare/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/providers/cloudflare" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(cloudflare.ResourceProvider)) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: func() terraform.ResourceProvider { + return new(cloudflare.ResourceProvider) + }, + }) } diff --git a/builtin/bins/provider-consul/main.go b/builtin/bins/provider-consul/main.go index 1f0b52a2a..44a8b8e39 100644 --- a/builtin/bins/provider-consul/main.go +++ b/builtin/bins/provider-consul/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/providers/consul" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(consul.ResourceProvider)) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: func() terraform.ResourceProvider { + return new(consul.ResourceProvider) + }, + }) } diff --git a/builtin/bins/provider-digitalocean/main.go b/builtin/bins/provider-digitalocean/main.go index 3a4d2c46c..86d2acf7a 100644 --- a/builtin/bins/provider-digitalocean/main.go +++ b/builtin/bins/provider-digitalocean/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/providers/digitalocean" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(digitalocean.ResourceProvider)) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: func() terraform.ResourceProvider { + return new(digitalocean.ResourceProvider) + }, + }) } diff --git a/builtin/bins/provider-dnsimple/main.go b/builtin/bins/provider-dnsimple/main.go index 44860d71b..2c578ace2 100644 --- a/builtin/bins/provider-dnsimple/main.go +++ b/builtin/bins/provider-dnsimple/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/providers/dnsimple" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(dnsimple.ResourceProvider)) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: func() terraform.ResourceProvider { + return new(dnsimple.ResourceProvider) + }, + }) } diff --git a/builtin/bins/provider-google/main.go b/builtin/bins/provider-google/main.go index 469488805..2772ce1fc 100644 --- a/builtin/bins/provider-google/main.go +++ b/builtin/bins/provider-google/main.go @@ -6,5 +6,7 @@ import ( ) func main() { - plugin.Serve(google.Provider()) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: google.Provider, + }) } diff --git a/builtin/bins/provider-heroku/main.go b/builtin/bins/provider-heroku/main.go index 92e18cca9..03b13523c 100644 --- a/builtin/bins/provider-heroku/main.go +++ b/builtin/bins/provider-heroku/main.go @@ -6,5 +6,7 @@ import ( ) func main() { - plugin.Serve(heroku.Provider()) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: heroku.Provider, + }) } diff --git a/builtin/bins/provider-mailgun/main.go b/builtin/bins/provider-mailgun/main.go index 768604feb..6d7caa3da 100644 --- a/builtin/bins/provider-mailgun/main.go +++ b/builtin/bins/provider-mailgun/main.go @@ -6,5 +6,7 @@ import ( ) func main() { - plugin.Serve(mailgun.Provider()) + plugin.Serve(&plugin.ServeOpts{ + ProviderFunc: mailgun.Provider, + }) } diff --git a/builtin/bins/provisioner-file/main.go b/builtin/bins/provisioner-file/main.go index 6b6747803..592ff53a6 100644 --- a/builtin/bins/provisioner-file/main.go +++ b/builtin/bins/provisioner-file/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/provisioners/file" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(file.ResourceProvisioner)) + plugin.Serve(&plugin.ServeOpts{ + ProvisionerFunc: func() terraform.ResourceProvisioner { + return new(file.ResourceProvisioner) + }, + }) } diff --git a/builtin/bins/provisioner-local-exec/main.go b/builtin/bins/provisioner-local-exec/main.go index eb697e57b..87a70c6ce 100644 --- a/builtin/bins/provisioner-local-exec/main.go +++ b/builtin/bins/provisioner-local-exec/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/provisioners/local-exec" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(localexec.ResourceProvisioner)) + plugin.Serve(&plugin.ServeOpts{ + ProvisionerFunc: func() terraform.ResourceProvisioner { + return new(localexec.ResourceProvisioner) + }, + }) } diff --git a/builtin/bins/provisioner-remote-exec/main.go b/builtin/bins/provisioner-remote-exec/main.go index b7b2c53b4..e9874cbbe 100644 --- a/builtin/bins/provisioner-remote-exec/main.go +++ b/builtin/bins/provisioner-remote-exec/main.go @@ -3,8 +3,13 @@ package main import ( "github.com/hashicorp/terraform/builtin/provisioners/remote-exec" "github.com/hashicorp/terraform/plugin" + "github.com/hashicorp/terraform/terraform" ) func main() { - plugin.Serve(new(remoteexec.ResourceProvisioner)) + plugin.Serve(&plugin.ServeOpts{ + ProvisionerFunc: func() terraform.ResourceProvisioner { + return new(remoteexec.ResourceProvisioner) + }, + }) } diff --git a/builtin/providers/aws/resource_aws_instance.go b/builtin/providers/aws/resource_aws_instance.go index 84918d225..988335e21 100644 --- a/builtin/providers/aws/resource_aws_instance.go +++ b/builtin/providers/aws/resource_aws_instance.go @@ -118,7 +118,7 @@ func resourceAwsInstance() *schema.Resource { Optional: true, }, "iam_instance_profile": &schema.Schema{ - Type: schema.TypeString, + Type: schema.TypeString, ForceNew: true, Optional: true, }, diff --git a/builtin/providers/google/provider.go b/builtin/providers/google/provider.go index 5fbba686a..593b8559b 100644 --- a/builtin/providers/google/provider.go +++ b/builtin/providers/google/provider.go @@ -2,10 +2,11 @@ package google import ( "github.com/hashicorp/terraform/helper/schema" + "github.com/hashicorp/terraform/terraform" ) // Provider returns a terraform.ResourceProvider. -func Provider() *schema.Provider { +func Provider() terraform.ResourceProvider { return &schema.Provider{ Schema: map[string]*schema.Schema{ "account_file": &schema.Schema{ diff --git a/builtin/providers/google/resource_compute_firewall.go b/builtin/providers/google/resource_compute_firewall.go index c7d32517f..dfd020cc4 100644 --- a/builtin/providers/google/resource_compute_firewall.go +++ b/builtin/providers/google/resource_compute_firewall.go @@ -76,7 +76,7 @@ func resourceComputeFirewall() *schema.Resource { "target_tags": &schema.Schema{ Type: schema.TypeSet, Optional: true, - Elem: &schema.Schema{Type: schema.TypeString}, + Elem: &schema.Schema{Type: schema.TypeString}, Set: func(v interface{}) int { return hashcode.String(v.(string)) }, @@ -298,7 +298,7 @@ func resourceFirewall( var targetTags []string if v := d.Get("target_tags").(*schema.Set); v.Len() > 0 { targetTags = make([]string, v.Len()) - for i, v:= range v.List() { + for i, v := range v.List() { targetTags[i] = v.(string) } } diff --git a/builtin/providers/heroku/provider.go b/builtin/providers/heroku/provider.go index f1cf6085c..dabd9cda5 100644 --- a/builtin/providers/heroku/provider.go +++ b/builtin/providers/heroku/provider.go @@ -4,11 +4,12 @@ import ( "log" "github.com/hashicorp/terraform/helper/schema" + "github.com/hashicorp/terraform/terraform" "github.com/mitchellh/mapstructure" ) // Provider returns a terraform.ResourceProvider. -func Provider() *schema.Provider { +func Provider() terraform.ResourceProvider { return &schema.Provider{ Schema: map[string]*schema.Schema{ "email": &schema.Schema{ diff --git a/builtin/providers/mailgun/provider.go b/builtin/providers/mailgun/provider.go index 25d29b343..833e682ad 100644 --- a/builtin/providers/mailgun/provider.go +++ b/builtin/providers/mailgun/provider.go @@ -4,11 +4,12 @@ import ( "log" "github.com/hashicorp/terraform/helper/schema" + "github.com/hashicorp/terraform/terraform" "github.com/mitchellh/mapstructure" ) // Provider returns a terraform.ResourceProvider. -func Provider() *schema.Provider { +func Provider() terraform.ResourceProvider { return &schema.Provider{ Schema: map[string]*schema.Schema{ "api_key": &schema.Schema{ diff --git a/command/get_test.go b/command/get_test.go index cce03611c..e706a7c2a 100644 --- a/command/get_test.go +++ b/command/get_test.go @@ -108,4 +108,3 @@ func TestGet_update(t *testing.T) { t.Fatalf("doesn't look like get: %s", output) } } - diff --git a/config.go b/config.go index 37c302bee..7260f05a2 100644 --- a/config.go +++ b/config.go @@ -11,7 +11,6 @@ import ( "github.com/hashicorp/hcl" "github.com/hashicorp/terraform/plugin" - "github.com/hashicorp/terraform/rpc" "github.com/hashicorp/terraform/terraform" "github.com/mitchellh/osext" ) @@ -197,29 +196,21 @@ func (c *Config) ProviderFactories() map[string]terraform.ResourceProviderFactor } func (c *Config) providerFactory(path string) terraform.ResourceProviderFactory { - return func() (terraform.ResourceProvider, error) { - // Build the plugin client configuration and init the plugin - var config plugin.ClientConfig - config.Cmd = pluginCmd(path) - config.Managed = true - client := plugin.NewClient(&config) + // Build the plugin client configuration and init the plugin + var config plugin.ClientConfig + config.Cmd = pluginCmd(path) + config.Managed = true + client := plugin.NewClient(&config) - // Request the RPC client and service name from the client + return func() (terraform.ResourceProvider, error) { + // Request the RPC client so we can get the provider // so we can build the actual RPC-implemented provider. rpcClient, err := client.Client() if err != nil { return nil, err } - service, err := client.Service() - if err != nil { - return nil, err - } - - return &rpc.ResourceProvider{ - Client: rpcClient, - Name: service, - }, nil + return rpcClient.ResourceProvider() } } @@ -236,29 +227,19 @@ func (c *Config) ProvisionerFactories() map[string]terraform.ResourceProvisioner } func (c *Config) provisionerFactory(path string) terraform.ResourceProvisionerFactory { - return func() (terraform.ResourceProvisioner, error) { - // Build the plugin client configuration and init the plugin - var config plugin.ClientConfig - config.Cmd = pluginCmd(path) - config.Managed = true - client := plugin.NewClient(&config) + // Build the plugin client configuration and init the plugin + var config plugin.ClientConfig + config.Cmd = pluginCmd(path) + config.Managed = true + client := plugin.NewClient(&config) - // Request the RPC client and service name from the client - // so we can build the actual RPC-implemented provider. + return func() (terraform.ResourceProvisioner, error) { rpcClient, err := client.Client() if err != nil { return nil, err } - service, err := client.Service() - if err != nil { - return nil, err - } - - return &rpc.ResourceProvisioner{ - Client: rpcClient, - Name: service, - }, nil + return rpcClient.ResourceProvisioner() } } diff --git a/config/module/detect_github.go b/config/module/detect_github.go index 7e7deb653..c4a4e89f0 100644 --- a/config/module/detect_github.go +++ b/config/module/detect_github.go @@ -59,7 +59,7 @@ func (d *GitHubDetector) detectSSH(src string) (string, bool, error) { u.Scheme = "ssh" u.User = url.User("git") u.Host = "github.com" - u.Path = src[idx+1:qidx] + u.Path = src[idx+1 : qidx] if qidx < len(src) { q, err := url.ParseQuery(src[qidx+1:]) if err != nil { @@ -69,5 +69,5 @@ func (d *GitHubDetector) detectSSH(src string) (string, bool, error) { u.RawQuery = q.Encode() } - return "git::"+u.String(), true, nil + return "git::" + u.String(), true, nil } diff --git a/config/module/get_hg.go b/config/module/get_hg.go index 19e4abd5a..a979eacfd 100644 --- a/config/module/get_hg.go +++ b/config/module/get_hg.go @@ -39,7 +39,7 @@ func (g *HgGetter) Get(dst string, u *url.URL) error { } } - if err:= g.pull(dst, u); err != nil { + if err := g.pull(dst, u); err != nil { return err } diff --git a/config/module/get_test.go b/config/module/get_test.go index cf34f1ae8..831780214 100644 --- a/config/module/get_test.go +++ b/config/module/get_test.go @@ -34,7 +34,7 @@ func TestGet_file(t *testing.T) { func TestGet_fileForced(t *testing.T) { dst := tempDir(t) u := testModule("basic") - u = "file::"+u + u = "file::" + u if err := Get(dst, u); err != nil { t.Fatalf("err: %s", err) @@ -76,7 +76,7 @@ func TestGetCopy_file(t *testing.T) { func TestGetDirSubdir(t *testing.T) { cases := []struct { - Input string + Input string Dir, Sub string }{ { diff --git a/config/module/tree.go b/config/module/tree.go index b7cfc5108..bb2afc16e 100644 --- a/config/module/tree.go +++ b/config/module/tree.go @@ -2,9 +2,9 @@ package module import ( "bufio" - "path/filepath" "bytes" "fmt" + "path/filepath" "strings" "sync" diff --git a/plugin/client.go b/plugin/client.go index 7589092ef..be54526c7 100644 --- a/plugin/client.go +++ b/plugin/client.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "log" "net" - "net/rpc" "os" "os/exec" "path/filepath" @@ -16,6 +15,8 @@ import ( "sync" "time" "unicode" + + tfrpc "github.com/hashicorp/terraform/rpc" ) // If this is true, then the "unexpected EOF" panic will not be @@ -35,8 +36,7 @@ type Client struct { doneLogging chan struct{} l sync.Mutex address net.Addr - service string - client *rpc.Client + client *tfrpc.Client } // ClientConfig is the configuration used to initialize a new @@ -124,7 +124,7 @@ func NewClient(config *ClientConfig) (c *Client) { // Client returns an RPC client for the plugin. // // Subsequent calls to this will return the same RPC client. -func (c *Client) Client() (*rpc.Client, error) { +func (c *Client) Client() (*tfrpc.Client, error) { addr, err := c.Start() if err != nil { return nil, err @@ -137,17 +137,11 @@ func (c *Client) Client() (*rpc.Client, error) { return c.client, nil } - conn, err := net.Dial(addr.Network(), addr.String()) + c.client, err = tfrpc.Dial(addr.Network(), addr.String()) if err != nil { return nil, err } - if tcpConn, ok := conn.(*net.TCPConn); ok { - // Make sure to set keep alive so that the connection doesn't die - tcpConn.SetKeepAlive(true) - } - - c.client = rpc.NewClient(conn) return c.client, nil } @@ -177,15 +171,6 @@ func (c *Client) Kill() { <-c.doneLogging } -// Service returns the name of the service to use. -func (c *Client) Service() (string, error) { - if _, err := c.Start(); err != nil { - return "", err - } - - return c.service, nil -} - // Starts the underlying subprocess, communicating with it to negotiate // a port for RPC connections, and returning the address to connect via RPC. // @@ -306,8 +291,8 @@ func (c *Client) Start() (addr net.Addr, err error) { // Trim the line and split by "|" in order to get the parts of // the output. line := strings.TrimSpace(string(lineBytes)) - parts := strings.SplitN(line, "|", 4) - if len(parts) < 4 { + parts := strings.SplitN(line, "|", 3) + if len(parts) < 3 { err = fmt.Errorf("Unrecognized remote plugin message: %s", line) return } @@ -327,9 +312,6 @@ func (c *Client) Start() (addr net.Addr, err error) { default: err = fmt.Errorf("Unknown address type: %s", parts[1]) } - - // Grab the services - c.service = parts[3] } c.address = addr diff --git a/plugin/client_test.go b/plugin/client_test.go index 9b3486e9a..d558b4912 100644 --- a/plugin/client_test.go +++ b/plugin/client_test.go @@ -28,14 +28,6 @@ func TestClient(t *testing.T) { t.Fatalf("bad: %#v", addr) } - service, err := c.Service() - if err != nil { - t.Fatalf("err: %s", err) - } - if service != "foo" { - t.Fatalf("bad: %#v", service) - } - // Test that it exits properly if killed c.Kill() diff --git a/plugin/plugin_test.go b/plugin/plugin_test.go index 7e5dcfcb1..d395837c3 100644 --- a/plugin/plugin_test.go +++ b/plugin/plugin_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + tfrpc "github.com/hashicorp/terraform/rpc" "github.com/hashicorp/terraform/terraform" ) @@ -52,34 +53,31 @@ func TestHelperProcess(*testing.T) { cmd, args := args[0], args[1:] switch cmd { case "bad-version": - fmt.Printf("%s1|tcp|:1234|foo\n", APIVersion) + fmt.Printf("%s1|tcp|:1234\n", APIVersion) <-make(chan int) case "resource-provider": - err := Serve(new(terraform.MockResourceProvider)) - if err != nil { - log.Printf("[ERR] %s", err) - os.Exit(1) - } + Serve(&ServeOpts{ + ProviderFunc: testProviderFixed(new(terraform.MockResourceProvider)), + }) case "resource-provisioner": - err := Serve(new(terraform.MockResourceProvisioner)) - if err != nil { - log.Printf("[ERR] %s", err) - os.Exit(1) - } + Serve(&ServeOpts{ + ProvisionerFunc: testProvisionerFixed( + new(terraform.MockResourceProvisioner)), + }) case "invalid-rpc-address": fmt.Println("lolinvalid") case "mock": - fmt.Printf("%s|tcp|:1234|foo\n", APIVersion) + fmt.Printf("%s|tcp|:1234\n", APIVersion) <-make(chan int) case "start-timeout": time.Sleep(1 * time.Minute) os.Exit(1) case "stderr": - fmt.Printf("%s|tcp|:1234|foo\n", APIVersion) + fmt.Printf("%s|tcp|:1234\n", APIVersion) log.Println("HELLO") log.Println("WORLD") case "stdin": - fmt.Printf("%s|tcp|:1234|foo\n", APIVersion) + fmt.Printf("%s|tcp|:1234\n", APIVersion) data := make([]byte, 5) if _, err := os.Stdin.Read(data); err != nil { log.Printf("stdin read error: %s", err) @@ -96,3 +94,15 @@ func TestHelperProcess(*testing.T) { os.Exit(2) } } + +func testProviderFixed(p terraform.ResourceProvider) tfrpc.ProviderFunc { + return func() terraform.ResourceProvider { + return p + } +} + +func testProvisionerFixed(p terraform.ResourceProvisioner) tfrpc.ProvisionerFunc { + return func() terraform.ResourceProvisioner { + return p + } +} diff --git a/plugin/resource_provider.go b/plugin/resource_provider.go deleted file mode 100644 index f4109ce97..000000000 --- a/plugin/resource_provider.go +++ /dev/null @@ -1,35 +0,0 @@ -package plugin - -import ( - "os/exec" - - tfrpc "github.com/hashicorp/terraform/rpc" - "github.com/hashicorp/terraform/terraform" -) - -// ResourceProviderFactory returns a Terraform ResourceProviderFactory -// that executes a plugin and connects to it. -func ResourceProviderFactory(cmd *exec.Cmd) terraform.ResourceProviderFactory { - return func() (terraform.ResourceProvider, error) { - config := &ClientConfig{ - Cmd: cmd, - Managed: true, - } - - client := NewClient(config) - rpcClient, err := client.Client() - if err != nil { - return nil, err - } - - rpcName, err := client.Service() - if err != nil { - return nil, err - } - - return &tfrpc.ResourceProvider{ - Client: rpcClient, - Name: rpcName, - }, nil - } -} diff --git a/plugin/resource_provider_test.go b/plugin/resource_provider_test.go index 805a079db..41cbb8191 100644 --- a/plugin/resource_provider_test.go +++ b/plugin/resource_provider_test.go @@ -12,12 +12,4 @@ func TestResourceProvider(t *testing.T) { if err != nil { t.Fatalf("should not have error: %s", err) } - - service, err := c.Service() - if err != nil { - t.Fatalf("err: %s", err) - } - if service == "" { - t.Fatal("service should not be blank") - } } diff --git a/plugin/resource_provisioner.go b/plugin/resource_provisioner.go deleted file mode 100644 index 6d8fd39db..000000000 --- a/plugin/resource_provisioner.go +++ /dev/null @@ -1,35 +0,0 @@ -package plugin - -import ( - "os/exec" - - tfrpc "github.com/hashicorp/terraform/rpc" - "github.com/hashicorp/terraform/terraform" -) - -// ResourceProvisionerFactory returns a Terraform ResourceProvisionerFactory -// that executes a plugin and connects to it. -func ResourceProvisionerFactory(cmd *exec.Cmd) terraform.ResourceProvisionerFactory { - return func() (terraform.ResourceProvisioner, error) { - config := &ClientConfig{ - Cmd: cmd, - Managed: true, - } - - client := NewClient(config) - rpcClient, err := client.Client() - if err != nil { - return nil, err - } - - rpcName, err := client.Service() - if err != nil { - return nil, err - } - - return &tfrpc.ResourceProvisioner{ - Client: rpcClient, - Name: rpcName, - }, nil - } -} diff --git a/plugin/resource_provisioner_test.go b/plugin/resource_provisioner_test.go index 2ca37c7d9..e0920b4af 100644 --- a/plugin/resource_provisioner_test.go +++ b/plugin/resource_provisioner_test.go @@ -12,12 +12,4 @@ func TestResourceProvisioner(t *testing.T) { if err != nil { t.Fatalf("should not have error: %s", err) } - - service, err := c.Service() - if err != nil { - t.Fatalf("err: %s", err) - } - if service == "" { - t.Fatal("service should not be blank") - } } diff --git a/plugin/server.go b/plugin/server.go index 984c1f968..3daa8a3de 100644 --- a/plugin/server.go +++ b/plugin/server.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "log" "net" - "net/rpc" "os" "os/signal" "runtime" @@ -27,7 +26,17 @@ const APIVersion = "2" const MagicCookieKey = "TF_PLUGIN_MAGIC_COOKIE" const MagicCookieValue = "d602bf8f470bc67ca7faa0386276bbdd4330efaf76d1a219cb4d6991ca9872b2" -func Serve(svc interface{}) error { +// ServeOpts configures what sorts of plugins are served. +type ServeOpts struct { + ProviderFunc tfrpc.ProviderFunc + ProvisionerFunc tfrpc.ProvisionerFunc +} + +// Serve serves the plugins given by ServeOpts. +// +// Serve doesn't return until the plugin is done being executed. Any +// errors will be outputted to the log. +func Serve(opts *ServeOpts) { // First check the cookie if os.Getenv(MagicCookieKey) != MagicCookieValue { fmt.Fprintf(os.Stderr, @@ -37,40 +46,30 @@ func Serve(svc interface{}) error { os.Exit(1) } - // Create the server to serve our interface - server := rpc.NewServer() - - // Register the service - name, err := tfrpc.Register(server, svc) - if err != nil { - return err - } - // Register a listener so we can accept a connection listener, err := serverListener() if err != nil { - return err + log.Printf("[ERR] plugin init: %s", err) + return } defer listener.Close() - // Output the address and service name to stdout + // Create the RPC server to dispense + server := &tfrpc.Server{ + ProviderFunc: opts.ProviderFunc, + ProvisionerFunc: opts.ProvisionerFunc, + } + + // Output the address and service name to stdout so that Terraform + // core can bring it up. log.Printf("Plugin address: %s %s\n", listener.Addr().Network(), listener.Addr().String()) - fmt.Printf("%s|%s|%s|%s\n", + fmt.Printf("%s|%s|%s\n", APIVersion, listener.Addr().Network(), - listener.Addr().String(), - name) + listener.Addr().String()) os.Stdout.Sync() - // Accept a connection - log.Println("Waiting for connection...") - conn, err := listener.Accept() - if err != nil { - log.Printf("Error accepting connection: %s\n", err.Error()) - return err - } - // Eat the interrupts ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt) @@ -85,10 +84,8 @@ func Serve(svc interface{}) error { } }() - // Serve a single connection - log.Println("Serving a plugin connection...") - server.ServeConn(conn) - return nil + // Serve + server.Accept(listener) } func serverListener() (net.Listener, error) { diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 000000000..f39494f93 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,106 @@ +package rpc + +import ( + "io" + "net" + "net/rpc" + + "github.com/hashicorp/terraform/terraform" + "github.com/hashicorp/yamux" +) + +// Client connects to a Server in order to request plugin implementations +// for Terraform. +type Client struct { + broker *muxBroker + control *rpc.Client +} + +// Dial opens a connection to a Terraform RPC server and returns a client. +func Dial(network, address string) (*Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + + if tcpConn, ok := conn.(*net.TCPConn); ok { + // Make sure to set keep alive so that the connection doesn't die + tcpConn.SetKeepAlive(true) + } + + return NewClient(conn) +} + +// NewClient creates a client from an already-open connection-like value. +// Dial is typically used instead. +func NewClient(conn io.ReadWriteCloser) (*Client, error) { + // Create the yamux client so we can multiplex + mux, err := yamux.Client(conn, nil) + if err != nil { + conn.Close() + return nil, err + } + + // Connect to the control stream. + control, err := mux.Open() + if err != nil { + mux.Close() + return nil, err + } + + // Create the broker and start it up + broker := newMuxBroker(mux) + go broker.Run() + + // Build the client using our broker and control channel. + return &Client{ + broker: broker, + control: rpc.NewClient(control), + }, nil +} + +// Close closes the connection. The client is no longer usable after this +// is called. +func (c *Client) Close() error { + if err := c.control.Close(); err != nil { + return err + } + + return c.broker.Close() +} + +func (c *Client) ResourceProvider() (terraform.ResourceProvider, error) { + var id uint32 + if err := c.control.Call( + "Dispenser.ResourceProvider", new(interface{}), &id); err != nil { + return nil, err + } + + conn, err := c.broker.Dial(id) + if err != nil { + return nil, err + } + + return &ResourceProvider{ + Client: rpc.NewClient(conn), + Name: "ResourceProvider", + }, nil +} + +func (c *Client) ResourceProvisioner() (terraform.ResourceProvisioner, error) { + var id uint32 + if err := c.control.Call( + "Dispenser.ResourceProvisioner", new(interface{}), &id); err != nil { + return nil, err + } + + conn, err := c.broker.Dial(id) + if err != nil { + return nil, err + } + + return &ResourceProvisioner{ + Client: rpc.NewClient(conn), + Name: "ResourceProvisioner", + }, nil +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 000000000..c4479cfd1 --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,75 @@ +package rpc + +import ( + "reflect" + "testing" + + "github.com/hashicorp/terraform/terraform" +) + +func TestClient_ResourceProvider(t *testing.T) { + clientConn, serverConn := testConn(t) + + p := new(terraform.MockResourceProvider) + server := &Server{ProviderFunc: testProviderFixed(p)} + go server.ServeConn(serverConn) + + client, err := NewClient(clientConn) + if err != nil { + t.Fatalf("err: %s", err) + } + defer client.Close() + + provider, err := client.ResourceProvider() + if err != nil { + t.Fatalf("err: %s", err) + } + + // Configure + config := &terraform.ResourceConfig{ + Raw: map[string]interface{}{"foo": "bar"}, + } + e := provider.Configure(config) + if !p.ConfigureCalled { + t.Fatal("configure should be called") + } + if !reflect.DeepEqual(p.ConfigureConfig, config) { + t.Fatalf("bad: %#v", p.ConfigureConfig) + } + if e != nil { + t.Fatalf("bad: %#v", e) + } +} + +func TestClient_ResourceProvisioner(t *testing.T) { + clientConn, serverConn := testConn(t) + + p := new(terraform.MockResourceProvisioner) + server := &Server{ProvisionerFunc: testProvisionerFixed(p)} + go server.ServeConn(serverConn) + + client, err := NewClient(clientConn) + if err != nil { + t.Fatalf("err: %s", err) + } + defer client.Close() + + provisioner, err := client.ResourceProvisioner() + if err != nil { + t.Fatalf("err: %s", err) + } + + // Apply + state := &terraform.InstanceState{} + conf := &terraform.ResourceConfig{} + err = provisioner.Apply(state, conf) + if !p.ApplyCalled { + t.Fatal("apply should be called") + } + if !reflect.DeepEqual(p.ApplyConfig, conf) { + t.Fatalf("bad: %#v", p.ApplyConfig) + } + if err != nil { + t.Fatalf("bad: %#v", err) + } +} diff --git a/rpc/mux_broker.go b/rpc/mux_broker.go new file mode 100644 index 000000000..639902a82 --- /dev/null +++ b/rpc/mux_broker.go @@ -0,0 +1,172 @@ +package rpc + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/yamux" +) + +// muxBroker is responsible for brokering multiplexed connections by unique ID. +// +// This allows a plugin to request a channel with a specific ID to connect to +// or accept a connection from, and the broker handles the details of +// holding these channels open while they're being negotiated. +type muxBroker struct { + nextId uint32 + session *yamux.Session + streams map[uint32]*muxBrokerPending + + sync.Mutex +} + +type muxBrokerPending struct { + ch chan net.Conn + doneCh chan struct{} +} + +func newMuxBroker(s *yamux.Session) *muxBroker { + return &muxBroker{ + session: s, + streams: make(map[uint32]*muxBrokerPending), + } +} + +// Accept accepts a connection by ID. +// +// This should not be called multiple times with the same ID at one time. +func (m *muxBroker) Accept(id uint32) (net.Conn, error) { + var c net.Conn + p := m.getStream(id) + select { + case c = <-p.ch: + close(p.doneCh) + case <-time.After(5 * time.Second): + m.Lock() + defer m.Unlock() + delete(m.streams, id) + + return nil, fmt.Errorf("timeout waiting for accept") + } + + // Ack our connection + if err := binary.Write(c, binary.LittleEndian, id); err != nil { + c.Close() + return nil, err + } + + return c, nil +} + +// Close closes the connection and all sub-connections. +func (m *muxBroker) Close() error { + return m.session.Close() +} + +// Dial opens a connection by ID. +func (m *muxBroker) Dial(id uint32) (net.Conn, error) { + // Open the stream + stream, err := m.session.OpenStream() + if err != nil { + return nil, err + } + + // Write the stream ID onto the wire. + if err := binary.Write(stream, binary.LittleEndian, id); err != nil { + stream.Close() + return nil, err + } + + // Read the ack that we connected. Then we're off! + var ack uint32 + if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil { + stream.Close() + return nil, err + } + if ack != id { + stream.Close() + return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id) + } + + return stream, nil +} + +// NextId returns a unique ID to use next. +func (m *muxBroker) NextId() uint32 { + return atomic.AddUint32(&m.nextId, 1) +} + +// Run starts the brokering and should be executed in a goroutine, since it +// blocks forever, or until the session closes. +func (m *muxBroker) Run() { + for { + stream, err := m.session.AcceptStream() + if err != nil { + // Once we receive an error, just exit + break + } + + // Read the stream ID from the stream + var id uint32 + if err := binary.Read(stream, binary.LittleEndian, &id); err != nil { + stream.Close() + continue + } + + // Initialize the waiter + p := m.getStream(id) + select { + case p.ch <- stream: + default: + } + + // Wait for a timeout + go m.timeoutWait(id, p) + } +} + +func (m *muxBroker) getStream(id uint32) *muxBrokerPending { + m.Lock() + defer m.Unlock() + + p, ok := m.streams[id] + if ok { + return p + } + + m.streams[id] = &muxBrokerPending{ + ch: make(chan net.Conn, 1), + doneCh: make(chan struct{}), + } + return m.streams[id] +} + +func (m *muxBroker) timeoutWait(id uint32, p *muxBrokerPending) { + // Wait for the stream to either be picked up and connected, or + // for a timeout. + timeout := false + select { + case <-p.doneCh: + case <-time.After(5 * time.Second): + timeout = true + } + + m.Lock() + defer m.Unlock() + + // Delete the stream so no one else can grab it + delete(m.streams, id) + + // If we timed out, then check if we have a channel in the buffer, + // and if so, close it. + if timeout { + select { + case s := <-p.ch: + s.Close() + } + } +} diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 80f3aeaef..d8550d84a 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -4,6 +4,8 @@ import ( "net" "net/rpc" "testing" + + "github.com/hashicorp/terraform/terraform" ) func testConn(t *testing.T) (net.Conn, net.Conn) { @@ -43,3 +45,15 @@ func testClientServer(t *testing.T) (*rpc.Client, *rpc.Server) { return client, server } + +func testProviderFixed(p terraform.ResourceProvider) ProviderFunc { + return func() terraform.ResourceProvider { + return p + } +} + +func testProvisionerFixed(p terraform.ResourceProvisioner) ProvisionerFunc { + return func() terraform.ResourceProvisioner { + return p + } +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 000000000..0ad92366c --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,135 @@ +package rpc + +import ( + "io" + "log" + "net" + "net/rpc" + + "github.com/hashicorp/terraform/terraform" + "github.com/hashicorp/yamux" +) + +// Server listens for network connections and then dispenses interface +// implementations for Terraform over net/rpc. +type Server struct { + ProviderFunc ProviderFunc + ProvisionerFunc ProvisionerFunc +} + +// ProviderFunc creates terraform.ResourceProviders when they're requested +// from the server. +type ProviderFunc func() terraform.ResourceProvider + +// ProvisionerFunc creates terraform.ResourceProvisioners when they're requested +// from the server. +type ProvisionerFunc func() terraform.ResourceProvisioner + +// Accept accepts connections on a listener and serves requests for +// each incoming connection. Accept blocks; the caller typically invokes +// it in a go statement. +func (s *Server) Accept(lis net.Listener) { + for { + conn, err := lis.Accept() + if err != nil { + log.Printf("[ERR] plugin server: %s", err) + return + } + + go s.ServeConn(conn) + } +} + +// ServeConn runs a single connection. +// +// ServeConn blocks, serving the connection until the client hangs up. +func (s *Server) ServeConn(conn io.ReadWriteCloser) { + // First create the yamux server to wrap this connection + mux, err := yamux.Server(conn, nil) + if err != nil { + conn.Close() + log.Printf("[ERR] plugin: %s", err) + return + } + + // Accept the control connection + control, err := mux.Accept() + if err != nil { + mux.Close() + log.Printf("[ERR] plugin: %s", err) + return + } + + // Create the broker and start it up + broker := newMuxBroker(mux) + go broker.Run() + + // Use the control connection to build the dispenser and serve the + // connection. + server := rpc.NewServer() + server.RegisterName("Dispenser", &dispenseServer{ + ProviderFunc: s.ProviderFunc, + ProvisionerFunc: s.ProvisionerFunc, + + broker: broker, + }) + server.ServeConn(control) +} + +// dispenseServer dispenses variousinterface implementations for Terraform. +type dispenseServer struct { + ProviderFunc ProviderFunc + ProvisionerFunc ProvisionerFunc + + broker *muxBroker +} + +func (d *dispenseServer) ResourceProvider( + args interface{}, response *uint32) error { + id := d.broker.NextId() + *response = id + + go func() { + conn, err := d.broker.Accept(id) + if err != nil { + log.Printf("[ERR] Plugin dispense: %s", err) + return + } + + d.serve(conn, "ResourceProvider", &ResourceProviderServer{ + Provider: d.ProviderFunc(), + }) + }() + + return nil +} + +func (d *dispenseServer) ResourceProvisioner( + args interface{}, response *uint32) error { + id := d.broker.NextId() + *response = id + + go func() { + conn, err := d.broker.Accept(id) + if err != nil { + log.Printf("[ERR] Plugin dispense: %s", err) + return + } + + d.serve(conn, "ResourceProvisioner", &ResourceProvisionerServer{ + Provisioner: d.ProvisionerFunc(), + }) + }() + + return nil +} + +func (d *dispenseServer) serve(conn io.ReadWriteCloser, name string, v interface{}) { + server := rpc.NewServer() + if err := server.RegisterName(name, v); err != nil { + log.Printf("[ERR] Plugin dispense: %s", err) + return + } + + server.ServeConn(conn) +} diff --git a/terraform/context_test.go b/terraform/context_test.go index 25cb94e74..67144b2df 100644 --- a/terraform/context_test.go +++ b/terraform/context_test.go @@ -3,10 +3,10 @@ package terraform import ( "fmt" "reflect" + "sort" "strings" "sync" "testing" - "sort" ) func TestContextGraph(t *testing.T) { diff --git a/terraform/state.go b/terraform/state.go index 51dec770e..f71a34289 100644 --- a/terraform/state.go +++ b/terraform/state.go @@ -59,7 +59,6 @@ func (s *State) Children(path []string) []*ModuleState { return result } - // AddModule adds the module with the given path to the state. // // This should be the preferred method to add module states since it