terraform: Context.Stop

This commit is contained in:
Mitchell Hashimoto 2014-07-03 11:27:30 -07:00
parent 603ee36d92
commit 4f5f3a5502
5 changed files with 109 additions and 726 deletions

View File

@ -23,6 +23,11 @@ type Context struct {
state *State
providers map[string]ResourceProviderFactory
variables map[string]string
l sync.Mutex
cond *sync.Cond
runCh <-chan struct{}
sh *stopHook
}
// ContextOpts are the user-creatable configuration structure to create
@ -42,13 +47,24 @@ type ContextOpts struct {
// not be mutated in any way, since the pointers are copied, not the values
// themselves.
func NewContext(opts *ContextOpts) *Context {
sh := new(stopHook)
// Copy all the hooks and add our stop hook. We don't append directly
// to the Config so that we're not modifying that in-place.
hooks := make([]Hook, len(opts.Hooks)+1)
copy(hooks, opts.Hooks)
hooks[len(opts.Hooks)] = sh
return &Context{
config: opts.Config,
diff: opts.Diff,
hooks: opts.Hooks,
hooks: hooks,
state: opts.State,
providers: opts.Providers,
variables: opts.Variables,
cond: sync.NewCond(new(sync.Mutex)),
sh: sh,
}
}
@ -58,6 +74,9 @@ func NewContext(opts *ContextOpts) *Context {
// In addition to returning the resulting state, this context is updated
// with the latest state.
func (c *Context) Apply() (*State, error) {
v := c.acquireRun()
defer c.releaseRun(v)
g, err := Graph(&GraphOpts{
Config: c.config,
Diff: c.diff,
@ -138,6 +157,27 @@ func (c *Context) Refresh() (*State, error) {
return s, err
}
// Stop stops the running task.
//
// Stop will block until the task completes.
func (c *Context) Stop() {
c.l.Lock()
ch := c.runCh
// If we aren't running, then just return
if ch == nil {
c.l.Unlock()
return
}
// Tell the hook we want to stop
c.sh.Stop()
// Wait for us to stop
c.l.Unlock()
<-ch
}
// Validate validates the configuration and returns any warnings or errors.
func (c *Context) Validate() ([]string, []error) {
var rerr *multierror.Error
@ -160,6 +200,32 @@ func (c *Context) Validate() ([]string, []error) {
return nil, errs
}
func (c *Context) acquireRun() chan<- struct{} {
c.l.Lock()
defer c.l.Unlock()
// Wait for no channel to exist
for c.runCh != nil {
c.l.Unlock()
ch := c.runCh
<-ch
c.l.Lock()
}
ch := make(chan struct{})
c.runCh = ch
return ch
}
func (c *Context) releaseRun(ch chan<- struct{}) {
c.l.Lock()
defer c.l.Unlock()
close(ch)
c.runCh = nil
c.sh.Reset()
}
func (c *Context) applyWalkFn(result *State) depgraph.WalkFunc {
var l sync.Mutex

View File

@ -84,30 +84,28 @@ func TestContextApply(t *testing.T) {
}
}
/*
func TestContextApply_cancel(t *testing.T) {
stopped := false
stopCh := make(chan struct{})
stopReplyCh := make(chan struct{})
rpAWS := new(MockResourceProvider)
rpAWS.ResourcesReturn = []ResourceType{
ResourceType{Name: "aws_instance"},
}
rpAWS.DiffFn = func(*ResourceState, *ResourceConfig) (*ResourceDiff, error) {
return &ResourceDiff{
Attributes: map[string]*ResourceAttrDiff{
"num": &ResourceAttrDiff{
New: "bar",
},
},
}, nil
}
rpAWS.ApplyFn = func(*ResourceState, *ResourceDiff) (*ResourceState, error) {
c := testConfig(t, "apply-cancel")
p := testProvider("aws")
ctx := testContext(t, &ContextOpts{
Config: c,
Providers: map[string]ResourceProviderFactory{
"aws": testProviderFuncFixed(p),
},
})
p.ApplyFn = func(*ResourceState, *ResourceDiff) (*ResourceState, error) {
if !stopped {
stopped = true
close(stopCh)
<-stopReplyCh
go ctx.Stop()
for {
if ctx.sh.Stopped() {
break
}
}
}
return &ResourceState{
@ -117,23 +115,24 @@ func TestContextApply_cancel(t *testing.T) {
},
}, nil
}
p.DiffFn = func(*ResourceState, *ResourceConfig) (*ResourceDiff, error) {
return &ResourceDiff{
Attributes: map[string]*ResourceAttrDiff{
"num": &ResourceAttrDiff{
New: "bar",
},
},
}, nil
}
c := testConfig(t, "apply-cancel")
tf := testTerraform2(t, &Config{
Providers: map[string]ResourceProviderFactory{
"aws": testProviderFuncFixed(rpAWS),
},
})
p, err := tf.Plan(&PlanOpts{Config: c})
if err != nil {
if _, err := ctx.Plan(nil); err != nil {
t.Fatalf("err: %s", err)
}
// Start the Apply in a goroutine
stateCh := make(chan *State)
go func() {
state, err := tf.Apply(p)
state, err := ctx.Apply()
if err != nil {
panic(err)
}
@ -141,18 +140,6 @@ func TestContextApply_cancel(t *testing.T) {
stateCh <- state
}()
// Start a goroutine so we can inject exactly when we stop
s := tf.stopHook.ref()
go func() {
defer tf.stopHook.unref(s)
<-tf.stopHook.ch
close(stopReplyCh)
tf.stopHook.stoppedCh <- struct{}{}
}()
<-stopCh
tf.Stop()
state := <-stateCh
if len(state.Resources) != 1 {
@ -165,7 +152,6 @@ func TestContextApply_cancel(t *testing.T) {
t.Fatalf("bad: \n%s", actual)
}
}
*/
func TestContextApply_compute(t *testing.T) {
c := testConfig(t, "apply-compute")

View File

@ -1,24 +1,13 @@
package terraform
import (
"sync"
"sync/atomic"
)
// stopHook is a private Hook implementation that Terraform uses to
// signal when to stop or cancel actions.
type stopHook struct {
sync.Mutex
// This should be incremented for every thing that can be stopped.
// When this is zero, a stopper can assume that everything is properly
// stopped.
count int
// This channel should be closed when it is time to stop
ch chan struct{}
serial int
stoppedCh chan<- struct{}
stop uint32
}
func (h *stopHook) PreApply(string, *ResourceState, *ResourceDiff) (HookAction, error) {
@ -46,34 +35,22 @@ func (h *stopHook) PostRefresh(string, *ResourceState) (HookAction, error) {
}
func (h *stopHook) hook() (HookAction, error) {
select {
case <-h.ch:
h.stoppedCh <- struct{}{}
if h.Stopped() {
return HookActionHalt, nil
default:
return HookActionContinue, nil
}
return HookActionContinue, nil
}
// reset should be called within the lock context
func (h *stopHook) reset() {
h.ch = make(chan struct{})
h.count = 0
h.serial += 1
h.stoppedCh = nil
func (h *stopHook) Reset() {
atomic.StoreUint32(&h.stop, 0)
}
func (h *stopHook) ref() int {
h.Lock()
defer h.Unlock()
h.count++
return h.serial
func (h *stopHook) Stop() {
atomic.StoreUint32(&h.stop, 1)
}
func (h *stopHook) unref(s int) {
h.Lock()
defer h.Unlock()
if h.serial == s {
h.count--
}
func (h *stopHook) Stopped() bool {
return atomic.LoadUint32(&h.stop) == 1
}

View File

@ -1,16 +1,5 @@
package terraform
import (
"fmt"
"log"
"sync"
"sync/atomic"
"github.com/hashicorp/terraform/config"
"github.com/hashicorp/terraform/depgraph"
"github.com/hashicorp/terraform/helper/multierror"
)
// Terraform is the primary structure that is used to interact with
// Terraform from code, and can perform operations such as returning
// all resources, a resource tree, a specific resource, etc.
@ -31,6 +20,7 @@ type Config struct {
Providers map[string]ResourceProviderFactory
}
/*
// New creates a new Terraform structure, initializes resource providers
// for the given configuration, etc.
//
@ -56,28 +46,7 @@ func New(c *Config) (*Terraform, error) {
}, nil
}
func (t *Terraform) Apply(p *Plan) (*State, error) {
// Increase the count on the stop hook so we know when to stop
serial := t.stopHook.ref()
defer t.stopHook.unref(serial)
// Make sure we're working with a plan that doesn't have null pointers
// everywhere, and is instead just empty otherwise.
p.init()
g, err := Graph(&GraphOpts{
Config: p.Config,
Diff: p.Diff,
Providers: t.providers,
State: p.State,
})
if err != nil {
return nil, err
}
return t.apply(g, p)
}
/*
// Stop stops all running tasks (applies, plans, refreshes).
//
// This will block until all running tasks are stopped. While Stop is
@ -106,351 +75,4 @@ func (t *Terraform) Stop() {
// Success, everything stopped, reset everything
t.stopHook.reset()
}
func (t *Terraform) Plan(opts *PlanOpts) (*Plan, error) {
// Increase the count on the stop hook so we know when to stop
serial := t.stopHook.ref()
defer t.stopHook.unref(serial)
g, err := Graph(&GraphOpts{
Config: opts.Config,
Providers: t.providers,
State: opts.State,
})
if err != nil {
return nil, err
}
return t.plan(g, opts)
}
// Refresh goes through all the resources in the state and refreshes them
// to their latest status.
func (t *Terraform) Refresh(c *config.Config, s *State) (*State, error) {
// Increase the count on the stop hook so we know when to stop
serial := t.stopHook.ref()
defer t.stopHook.unref(serial)
g, err := Graph(&GraphOpts{
Config: c,
Providers: t.providers,
State: s,
})
if err != nil {
return s, err
}
return t.refresh(g)
}
func (t *Terraform) apply(
g *depgraph.Graph,
p *Plan) (*State, error) {
// Create our result. Make sure we preserve the prior states
s := new(State)
s.init()
for k, v := range p.State.Resources {
s.Resources[k] = v
}
err := g.Walk(t.applyWalkFn(s, p))
return s, err
}
func (t *Terraform) plan(g *depgraph.Graph, opts *PlanOpts) (*Plan, error) {
p := &Plan{
Config: opts.Config,
Vars: opts.Vars,
State: opts.State,
}
err := g.Walk(t.planWalkFn(p, opts))
return p, err
}
func (t *Terraform) refresh(g *depgraph.Graph) (*State, error) {
s := new(State)
err := g.Walk(t.refreshWalkFn(s))
return s, err
}
func (t *Terraform) refreshWalkFn(result *State) depgraph.WalkFunc {
var l sync.Mutex
// Initialize the result so we don't have to nil check everywhere
result.init()
cb := func(r *Resource) (map[string]string, error) {
for _, h := range t.hooks {
handleHook(h.PreRefresh(r.Id, r.State))
}
rs, err := r.Provider.Refresh(r.State)
if err != nil {
return nil, err
}
if rs == nil {
rs = new(ResourceState)
}
// Fix the type to be the type we have
rs.Type = r.State.Type
l.Lock()
result.Resources[r.Id] = rs
l.Unlock()
for _, h := range t.hooks {
handleHook(h.PostRefresh(r.Id, rs))
}
return nil, nil
}
return t.genericWalkFn(nil, cb)
}
func (t *Terraform) applyWalkFn(
result *State,
p *Plan) depgraph.WalkFunc {
var l sync.Mutex
// Initialize the result
result.init()
cb := func(r *Resource) (map[string]string, error) {
diff := r.Diff
if diff.Empty() {
return r.Vars(), nil
}
if !diff.Destroy {
var err error
diff, err = r.Provider.Diff(r.State, r.Config)
if err != nil {
return nil, err
}
}
// TODO(mitchellh): we need to verify the diff doesn't change
// anything and that the diff has no computed values (pre-computed)
for _, h := range t.hooks {
handleHook(h.PreApply(r.Id, r.State, diff))
}
// With the completed diff, apply!
log.Printf("[DEBUG] %s: Executing Apply", r.Id)
rs, err := r.Provider.Apply(r.State, diff)
if err != nil {
return nil, err
}
// Make sure the result is instantiated
if rs == nil {
rs = new(ResourceState)
}
// Force the resource state type to be our type
rs.Type = r.State.Type
var errs []error
for ak, av := range rs.Attributes {
// If the value is the unknown variable value, then it is an error.
// In this case we record the error and remove it from the state
if av == config.UnknownVariableValue {
errs = append(errs, fmt.Errorf(
"Attribute with unknown value: %s", ak))
delete(rs.Attributes, ak)
}
}
// Update the resulting diff
l.Lock()
if rs.ID == "" {
delete(result.Resources, r.Id)
} else {
result.Resources[r.Id] = rs
}
l.Unlock()
// Update the state for the resource itself
r.State = rs
for _, h := range t.hooks {
handleHook(h.PostApply(r.Id, r.State))
}
// Determine the new state and update variables
err = nil
if len(errs) > 0 {
err = &multierror.Error{Errors: errs}
}
return r.Vars(), err
}
return t.genericWalkFn(p.Vars, cb)
}
func (t *Terraform) planWalkFn(result *Plan, opts *PlanOpts) depgraph.WalkFunc {
var l sync.Mutex
// Initialize the result
result.init()
cb := func(r *Resource) (map[string]string, error) {
var diff *ResourceDiff
for _, h := range t.hooks {
handleHook(h.PreDiff(r.Id, r.State))
}
if opts.Destroy {
if r.State.ID != "" {
log.Printf("[DEBUG] %s: Making for destroy", r.Id)
diff = &ResourceDiff{Destroy: true}
} else {
log.Printf("[DEBUG] %s: Not marking for destroy, no ID", r.Id)
}
} else if r.Config == nil {
log.Printf("[DEBUG] %s: Orphan, marking for destroy", r.Id)
// This is an orphan (no config), so we mark it to be destroyed
diff = &ResourceDiff{Destroy: true}
} else {
log.Printf("[DEBUG] %s: Executing diff", r.Id)
// Get a diff from the newest state
var err error
diff, err = r.Provider.Diff(r.State, r.Config)
if err != nil {
return nil, err
}
}
l.Lock()
if !diff.Empty() {
result.Diff.Resources[r.Id] = diff
}
l.Unlock()
for _, h := range t.hooks {
handleHook(h.PostDiff(r.Id, diff))
}
// Determine the new state and update variables
if !diff.Empty() {
r.State = r.State.MergeDiff(diff)
}
return r.Vars(), nil
}
return t.genericWalkFn(opts.Vars, cb)
}
func (t *Terraform) genericWalkFn(
invars map[string]string,
cb genericWalkFunc) depgraph.WalkFunc {
var l sync.RWMutex
// Initialize the variables for application
vars := make(map[string]string)
for k, v := range invars {
vars[fmt.Sprintf("var.%s", k)] = v
}
// This will keep track of whether we're stopped or not
var stop uint32 = 0
return func(n *depgraph.Noun) error {
// If it is the root node, ignore
if n.Name == GraphRootNode {
return nil
}
// If we're stopped, return right away
if atomic.LoadUint32(&stop) != 0 {
return nil
}
switch m := n.Meta.(type) {
case *GraphNodeResource:
case *GraphNodeResourceProvider:
var rc *ResourceConfig
if m.Config != nil {
if err := m.Config.RawConfig.Interpolate(vars); err != nil {
panic(err)
}
rc = NewResourceConfig(m.Config.RawConfig)
}
for k, p := range m.Providers {
log.Printf("[INFO] Configuring provider: %s", k)
err := p.Configure(rc)
if err != nil {
return err
}
}
return nil
}
rn := n.Meta.(*GraphNodeResource)
l.RLock()
if len(vars) > 0 && rn.Config != nil {
if err := rn.Config.RawConfig.Interpolate(vars); err != nil {
panic(fmt.Sprintf("Interpolate error: %s", err))
}
// Force the config to be set later
rn.Resource.Config = nil
}
l.RUnlock()
// Make sure that at least some resource configuration is set
if !rn.Orphan {
if rn.Resource.Config == nil {
if rn.Config == nil {
rn.Resource.Config = new(ResourceConfig)
} else {
rn.Resource.Config = NewResourceConfig(rn.Config.RawConfig)
}
}
} else {
rn.Resource.Config = nil
}
// Handle recovery of special panic scenarios
defer func() {
if v := recover(); v != nil {
if v == HookActionHalt {
atomic.StoreUint32(&stop, 1)
} else {
panic(v)
}
}
}()
// Call the callack
log.Printf("[INFO] Walking: %s", rn.Resource.Id)
newVars, err := cb(rn.Resource)
if err != nil {
return err
}
if len(newVars) > 0 {
// Acquire a lock since this function is called in parallel
l.Lock()
defer l.Unlock()
// Update variables
for k, v := range newVars {
vars[k] = v
}
}
return nil
}
}
*/

View File

@ -1,9 +1,7 @@
package terraform
import (
"fmt"
"path/filepath"
"reflect"
"sync"
"testing"
@ -13,117 +11,6 @@ import (
// This is the directory where our test fixtures are.
const fixtureDir = "./test-fixtures"
func TestTerraformRefresh(t *testing.T) {
rpAWS := new(MockResourceProvider)
rpAWS.ResourcesReturn = []ResourceType{
ResourceType{Name: "aws_instance"},
}
c := testConfig(t, "refresh-basic")
tf := testTerraform2(t, &Config{
Providers: map[string]ResourceProviderFactory{
"aws": testProviderFuncFixed(rpAWS),
},
})
rpAWS.RefreshReturn = &ResourceState{
ID: "foo",
}
s, err := tf.Refresh(c, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if !rpAWS.RefreshCalled {
t.Fatal("refresh should be called")
}
if rpAWS.RefreshState.ID != "" {
t.Fatalf("bad: %#v", rpAWS.RefreshState)
}
if !reflect.DeepEqual(s.Resources["aws_instance.web"], rpAWS.RefreshReturn) {
t.Fatalf("bad: %#v", s.Resources)
}
for _, r := range s.Resources {
if r.Type == "" {
t.Fatalf("no type: %#v", r)
}
}
}
func TestTerraformRefresh_hook(t *testing.T) {
rpAWS := new(MockResourceProvider)
rpAWS.ResourcesReturn = []ResourceType{
ResourceType{Name: "aws_instance"},
}
h := new(MockHook)
c := testConfig(t, "refresh-basic")
tf := testTerraform2(t, &Config{
Hooks: []Hook{h},
Providers: map[string]ResourceProviderFactory{
"aws": testProviderFuncFixed(rpAWS),
},
})
if _, err := tf.Refresh(c, nil); err != nil {
t.Fatalf("err: %s", err)
}
if !h.PreRefreshCalled {
t.Fatal("should be called")
}
if h.PreRefreshState.Type != "aws_instance" {
t.Fatalf("bad: %#v", h.PreRefreshState)
}
if !h.PostRefreshCalled {
t.Fatal("should be called")
}
if h.PostRefreshState.Type != "aws_instance" {
t.Fatalf("bad: %#v", h.PostRefreshState)
}
}
func TestTerraformRefresh_state(t *testing.T) {
rpAWS := new(MockResourceProvider)
rpAWS.ResourcesReturn = []ResourceType{
ResourceType{Name: "aws_instance"},
}
c := testConfig(t, "refresh-basic")
tf := testTerraform2(t, &Config{
Providers: map[string]ResourceProviderFactory{
"aws": testProviderFuncFixed(rpAWS),
},
})
rpAWS.RefreshReturn = &ResourceState{
ID: "foo",
}
state := &State{
Resources: map[string]*ResourceState{
"aws_instance.web": &ResourceState{
ID: "bar",
},
},
}
s, err := tf.Refresh(c, state)
if err != nil {
t.Fatalf("err: %s", err)
}
if !rpAWS.RefreshCalled {
t.Fatal("refresh should be called")
}
if !reflect.DeepEqual(rpAWS.RefreshState, state.Resources["aws_instance.web"]) {
t.Fatalf("bad: %#v", rpAWS.RefreshState)
}
if !reflect.DeepEqual(s.Resources["aws_instance.web"], rpAWS.RefreshReturn) {
t.Fatalf("bad: %#v", s.Resources)
}
}
func testConfig(t *testing.T, name string) *config.Config {
c, err := config.Load(filepath.Join(fixtureDir, name, "main.tf"))
if err != nil {
@ -133,138 +20,6 @@ func testConfig(t *testing.T, name string) *config.Config {
return c
}
func testProviderFunc(n string, rs []string) ResourceProviderFactory {
resources := make([]ResourceType, len(rs))
for i, v := range rs {
resources[i] = ResourceType{
Name: v,
}
}
return func() (ResourceProvider, error) {
p := &MockResourceProvider{Meta: n}
applyFn := func(
s *ResourceState,
d *ResourceDiff) (*ResourceState, error) {
if d.Destroy {
return nil, nil
}
id := "foo"
if idAttr, ok := d.Attributes["id"]; ok && !idAttr.NewComputed {
id = idAttr.New
}
result := &ResourceState{
ID: id,
}
if d != nil {
result = result.MergeDiff(d)
}
if depAttr, ok := d.Attributes["dep"]; ok {
result.Dependencies = []ResourceDependency{
ResourceDependency{
ID: depAttr.New,
},
}
}
return result, nil
}
diffFn := func(
s *ResourceState,
c *ResourceConfig) (*ResourceDiff, error) {
var diff ResourceDiff
diff.Attributes = make(map[string]*ResourceAttrDiff)
diff.Attributes["type"] = &ResourceAttrDiff{
Old: "",
New: s.Type,
}
for k, v := range c.Raw {
if _, ok := v.(string); !ok {
continue
}
if k == "nil" {
return nil, nil
}
// This key is used for other purposes
if k == "compute_value" {
continue
}
if k == "compute" {
attrDiff := &ResourceAttrDiff{
Old: "",
New: "",
NewComputed: true,
}
if cv, ok := c.Config["compute_value"]; ok {
if cv.(string) == "1" {
attrDiff.NewComputed = false
attrDiff.New = fmt.Sprintf("computed_%s", v.(string))
}
}
diff.Attributes[v.(string)] = attrDiff
continue
}
// If this key is not computed, then look it up in the
// cleaned config.
found := false
for _, ck := range c.ComputedKeys {
if ck == k {
found = true
break
}
}
if !found {
v = c.Config[k]
}
attrDiff := &ResourceAttrDiff{
Old: "",
New: v.(string),
}
diff.Attributes[k] = attrDiff
}
for _, k := range c.ComputedKeys {
diff.Attributes[k] = &ResourceAttrDiff{
Old: "",
NewComputed: true,
}
}
return &diff, nil
}
refreshFn := func(s *ResourceState) (*ResourceState, error) {
if _, ok := s.Attributes["nil"]; ok {
return nil, nil
}
return s, nil
}
p.ApplyFn = applyFn
p.DiffFn = diffFn
p.RefreshFn = refreshFn
p.ResourcesReturn = resources
return p, nil
}
}
func testProviderFuncFixed(rp ResourceProvider) ResourceProviderFactory {
return func() (ResourceProvider, error) {
return rp, nil
@ -275,29 +30,6 @@ func testProviderMock(p ResourceProvider) *MockResourceProvider {
return p.(*MockResourceProvider)
}
func testTerraform2(t *testing.T, c *Config) *Terraform {
if c == nil {
c = new(Config)
}
if c.Providers == nil {
c.Providers = map[string]ResourceProviderFactory{
"aws": testProviderFunc("aws", []string{"aws_instance"}),
"do": testProviderFunc("do", []string{"do_droplet"}),
}
}
tf, err := New(c)
if err != nil {
t.Fatalf("err: %s", err)
}
if tf == nil {
t.Fatal("tf should not be nil")
}
return tf
}
// HookRecordApplyOrder is a test hook that records the order of applies
// by recording the PreApply event.
type HookRecordApplyOrder struct {