diff --git a/backend/remote-state/consul/backend_state.go b/backend/remote-state/consul/backend_state.go index 9a8fd080f..e77729446 100644 --- a/backend/remote-state/consul/backend_state.go +++ b/backend/remote-state/consul/backend_state.go @@ -91,9 +91,10 @@ func (b *Backend) State(name string) (state.State, error) { // Build the state client var stateMgr state.State = &remote.State{ Client: &RemoteClient{ - Client: client, - Path: path, - GZip: gzip, + Client: client, + Path: path, + GZip: gzip, + lockState: b.lock, }, } diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index b11f31ba1..eb9f8254c 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -7,6 +7,8 @@ import ( "encoding/json" "errors" "fmt" + "log" + "sync" "time" consulapi "github.com/hashicorp/consul/api" @@ -26,11 +28,31 @@ type RemoteClient struct { Path string GZip bool + mu sync.Mutex + // lockState is true if we're using locks + lockState bool + + // The index of the last state we wrote. + // If this is > 0, Put will perform a CAS to ensure that the state wasn't + // changed during the operation. This is important even with locks, because + // if the client loses the lock for some reason, then reacquires it, we + // need to make sure that the state was not modified. + modifyIndex uint64 + consulLock *consulapi.Lock lockCh <-chan struct{} + + info *state.LockInfo + + // cancel the goroutine which is monitoring the lock. + monitorCancel chan struct{} + monitorDone chan struct{} } func (c *RemoteClient) Get() (*remote.Payload, error) { + c.mu.Lock() + defer c.mu.Unlock() + pair, _, err := c.Client.KV().Get(c.Path, nil) if err != nil { return nil, err @@ -39,6 +61,8 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { return nil, nil } + c.modifyIndex = pair.ModifyIndex + payload := pair.Value // If the payload starts with 0x1f, it's gzip, not json if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' { @@ -57,6 +81,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { } func (c *RemoteClient) Put(data []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + payload := data if c.GZip { if compressedState, err := compressState(data); err == nil { @@ -67,14 +94,50 @@ func (c *RemoteClient) Put(data []byte) error { } kv := c.Client.KV() - _, err := kv.Put(&consulapi.KVPair{ - Key: c.Path, - Value: payload, - }, nil) - return err + + // default to doing a CAS + verb := consulapi.KVCAS + + // Assume a 0 index doesn't need a CAS for now, since we are either + // creating a new state or purposely overwriting one. + if c.modifyIndex == 0 { + verb = consulapi.KVSet + } + + // KV.Put doesn't return the new index, so we use a single operation + // transaction to get the new index with a single request. + txOps := consulapi.KVTxnOps{ + &consulapi.KVTxnOp{ + Verb: verb, + Key: c.Path, + Value: payload, + Index: c.modifyIndex, + }, + } + + ok, resp, _, err := kv.Txn(txOps, nil) + if err != nil { + return err + } + + // transaction was rolled back + if !ok { + return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors) + } + + if len(resp.Results) != 1 { + // this probably shouldn't happen + return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results)) + } + + c.modifyIndex = resp.Results[0].ModifyIndex + return nil } func (c *RemoteClient) Delete() error { + c.mu.Lock() + defer c.mu.Unlock() + kv := c.Client.KV() _, err := kv.Delete(c.Path, nil) return err @@ -113,11 +176,22 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) { } func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.lockState { + return "", nil + } + + c.info = info + + // These checks only are to ensure we strictly follow the specification. + // Terraform shouldn't ever re-lock, so provide errors for the 2 possible + // states if this is called. select { case <-c.lockCh: // We had a lock, but lost it. - // Since we typically only call lock once, we shouldn't ever see this. - return "", errors.New("lost consul lock") + return "", errors.New("lost consul lock, cannot re-lock") default: if c.lockCh != nil { // we have an active lock already @@ -125,6 +199,13 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } } + return c.lock() +} + +// called after a lock is acquired +var testLockHook func() + +func (c *RemoteClient) lock() (string, error) { if c.consulLock == nil { opts := &consulapi.LockOptions{ Key: c.Path + lockSuffix, @@ -163,19 +244,83 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { c.lockCh = lockCh - err = c.putLockInfo(info) + err = c.putLockInfo(c.info) if err != nil { - if unlockErr := c.Unlock(info.ID); unlockErr != nil { + if unlockErr := c.unlock(c.info.ID); unlockErr != nil { err = multierror.Append(err, unlockErr) } return "", err } - return info.ID, nil + // Start a goroutine to monitor the lock state. + // If we lose the lock to due communication issues with the consul agent, + // attempt to immediately reacquire the lock. Put will verify the integrity + // of the state by using a CAS operation. + c.monitorCancel = make(chan struct{}) + c.monitorDone = make(chan struct{}) + go func(cancel, done chan struct{}) { + defer func() { + close(done) + }() + select { + case <-c.lockCh: + for { + c.mu.Lock() + c.consulLock = nil + _, err := c.lock() + c.mu.Unlock() + + if err != nil { + // We failed to get the lock, keep trying as long as + // terraform is running. There may be changes in progress, + // so there's no use in aborting. Either we eventually + // reacquire the lock, or a Put will fail on a CAS. + log.Printf("[ERROR] attempting to reacquire lock: %s", err) + time.Sleep(time.Second) + + select { + case <-cancel: + return + default: + } + continue + } + + // if the error was nil, the new lock started a new copy of + // this goroutine. + return + } + + case <-cancel: + return + } + }(c.monitorCancel, c.monitorDone) + + if testLockHook != nil { + testLockHook() + } + + return c.info.ID, nil } func (c *RemoteClient) Unlock(id string) error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.lockState { + return nil + } + + return c.unlock(id) +} + +func (c *RemoteClient) unlock(id string) error { + // cancel our monitoring goroutine + if c.monitorCancel != nil { + close(c.monitorCancel) + } + // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil @@ -187,20 +332,28 @@ func (c *RemoteClient) Unlock(id string) error { default: } - err := c.consulLock.Unlock() + kv := c.Client.KV() + + var errs error + + if _, err := kv.Delete(c.Path+lockInfoSuffix, nil); err != nil { + errs = multierror.Append(errs, err) + } + + if err := c.consulLock.Unlock(); err != nil { + errs = multierror.Append(errs, err) + } + + // the monitoring goroutine may be in a select on this chan, so we need to + // wait for it to return before changing the value. + <-c.monitorDone c.lockCh = nil // This is only cleanup, and will fail if the lock was immediately taken by // another client, so we don't report an error to the user here. c.consulLock.Destroy() - kv := c.Client.KV() - _, delErr := kv.Delete(c.Path+lockInfoSuffix, nil) - if delErr != nil { - err = multierror.Append(err, delErr) - } - - return err + return errs } func compressState(data []byte) ([]byte, error) { diff --git a/backend/remote-state/consul/client_test.go b/backend/remote-state/consul/client_test.go index dc7988135..910c5e9eb 100644 --- a/backend/remote-state/consul/client_test.go +++ b/backend/remote-state/consul/client_test.go @@ -139,3 +139,52 @@ func TestConsul_destroyLock(t *testing.T) { t.Fatalf("lock key not cleaned up at: %s", pair.Key) } } + +func TestConsul_lostLock(t *testing.T) { + srv := newConsulTestServer(t) + defer srv.Stop() + + path := fmt.Sprintf("tf-unit/%s", time.Now().String()) + + // create 2 instances to get 2 remote.Clients + sA, err := backend.TestBackendConfig(t, New(), map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path, + }).State(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + sB, err := backend.TestBackendConfig(t, New(), map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path + "-not-used", + }).State(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + info := state.NewLockInfo() + info.Operation = "test-lost-lock" + id, err := sA.Lock(info) + if err != nil { + t.Fatal(err) + } + + reLocked := make(chan struct{}) + testLockHook = func() { + close(reLocked) + } + + // now we use the second client to break the lock + kv := sB.(*remote.State).Client.(*RemoteClient).Client.KV() + _, err = kv.Delete(path+lockSuffix, nil) + if err != nil { + t.Fatal(err) + } + + <-reLocked + + if err := sA.Unlock(id); err != nil { + t.Fatal(err) + } +}