From e8330b6f53f028234363a82a7d750e4a738ddffc Mon Sep 17 00:00:00 2001 From: James Bardin Date: Sun, 28 May 2017 15:16:51 -0400 Subject: [PATCH 1/3] use CAS for consul state Put --- backend/remote-state/consul/client.go | 76 +++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index b11f31ba1..be9873417 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" consulapi "github.com/hashicorp/consul/api" @@ -26,11 +27,25 @@ type RemoteClient struct { Path string GZip bool + mu sync.Mutex + + // The index of the last state we wrote. + // If this is > 0, Put will perform a CAS to ensure that the state wasn't + // changed during the operation. This is important even with locks, because + // if the client loses the lock for some reason, then reacquires it, we + // need to make sure that the state was not modified. + modifyIndex uint64 + consulLock *consulapi.Lock lockCh <-chan struct{} + + info *state.LockInfo } func (c *RemoteClient) Get() (*remote.Payload, error) { + c.mu.Lock() + defer c.mu.Unlock() + pair, _, err := c.Client.KV().Get(c.Path, nil) if err != nil { return nil, err @@ -39,6 +54,8 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { return nil, nil } + c.modifyIndex = pair.ModifyIndex + payload := pair.Value // If the payload starts with 0x1f, it's gzip, not json if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' { @@ -57,6 +74,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { } func (c *RemoteClient) Put(data []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + payload := data if c.GZip { if compressedState, err := compressState(data); err == nil { @@ -67,14 +87,50 @@ func (c *RemoteClient) Put(data []byte) error { } kv := c.Client.KV() - _, err := kv.Put(&consulapi.KVPair{ - Key: c.Path, - Value: payload, - }, nil) - return err + + verb := consulapi.KVCAS + + // Assume a 0 index doesn't need a CAS for now, since we are either + // creating a new state or purposely overwriting one. + if c.modifyIndex == 0 { + verb = consulapi.KVSet + } + + // KV.Put doesn't return the new index, so we use a single operation + // transaction to get the new index with a single request. + txOps := consulapi.KVTxnOps{ + &consulapi.KVTxnOp{ + Verb: verb, + Key: c.Path, + Value: payload, + Index: c.modifyIndex, + }, + } + + ok, resp, _, err := kv.Txn(txOps, nil) + if err != nil { + return err + } + + // transaction was rolled back + if !ok { + return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors) + } + + if len(resp.Results) != 1 { + // this probably shouldn't happen + return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results)) + } + + c.modifyIndex = resp.Results[0].ModifyIndex + + return nil } func (c *RemoteClient) Delete() error { + c.mu.Lock() + defer c.mu.Unlock() + kv := c.Client.KV() _, err := kv.Delete(c.Path, nil) return err @@ -113,6 +169,9 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) { } func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + select { case <-c.lockCh: // We had a lock, but lost it. @@ -125,6 +184,10 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } } + return c.lock(info) +} + +func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { if c.consulLock == nil { opts := &consulapi.LockOptions{ Key: c.Path + lockSuffix, @@ -176,6 +239,9 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } func (c *RemoteClient) Unlock(id string) error { + c.mu.Lock() + defer c.mu.Unlock() + // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil From 3df48bfc277e53aa963b1c1e06aefcc5e99984f0 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Sun, 28 May 2017 16:07:24 -0400 Subject: [PATCH 2/3] relock consul when lock is lost Consul locks are based on liveness, and may be lost due timeouts, network issued, etc. If the client determines the lock was lost, attempt to reacquire the lock immediately. The client was also not using the `lock` config option. Disable locks if that is not set. --- backend/remote-state/consul/backend_state.go | 7 +- backend/remote-state/consul/client.go | 119 ++++++++++++++++--- 2 files changed, 107 insertions(+), 19 deletions(-) diff --git a/backend/remote-state/consul/backend_state.go b/backend/remote-state/consul/backend_state.go index 9a8fd080f..e77729446 100644 --- a/backend/remote-state/consul/backend_state.go +++ b/backend/remote-state/consul/backend_state.go @@ -91,9 +91,10 @@ func (b *Backend) State(name string) (state.State, error) { // Build the state client var stateMgr state.State = &remote.State{ Client: &RemoteClient{ - Client: client, - Path: path, - GZip: gzip, + Client: client, + Path: path, + GZip: gzip, + lockState: b.lock, }, } diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index be9873417..eb9f8254c 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "sync" "time" @@ -28,6 +29,8 @@ type RemoteClient struct { GZip bool mu sync.Mutex + // lockState is true if we're using locks + lockState bool // The index of the last state we wrote. // If this is > 0, Put will perform a CAS to ensure that the state wasn't @@ -40,6 +43,10 @@ type RemoteClient struct { lockCh <-chan struct{} info *state.LockInfo + + // cancel the goroutine which is monitoring the lock. + monitorCancel chan struct{} + monitorDone chan struct{} } func (c *RemoteClient) Get() (*remote.Payload, error) { @@ -88,6 +95,7 @@ func (c *RemoteClient) Put(data []byte) error { kv := c.Client.KV() + // default to doing a CAS verb := consulapi.KVCAS // Assume a 0 index doesn't need a CAS for now, since we are either @@ -123,7 +131,6 @@ func (c *RemoteClient) Put(data []byte) error { } c.modifyIndex = resp.Results[0].ModifyIndex - return nil } @@ -172,11 +179,19 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { c.mu.Lock() defer c.mu.Unlock() + if !c.lockState { + return "", nil + } + + c.info = info + + // These checks only are to ensure we strictly follow the specification. + // Terraform shouldn't ever re-lock, so provide errors for the 2 possible + // states if this is called. select { case <-c.lockCh: // We had a lock, but lost it. - // Since we typically only call lock once, we shouldn't ever see this. - return "", errors.New("lost consul lock") + return "", errors.New("lost consul lock, cannot re-lock") default: if c.lockCh != nil { // we have an active lock already @@ -184,10 +199,13 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { } } - return c.lock(info) + return c.lock() } -func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { +// called after a lock is acquired +var testLockHook func() + +func (c *RemoteClient) lock() (string, error) { if c.consulLock == nil { opts := &consulapi.LockOptions{ Key: c.Path + lockSuffix, @@ -226,22 +244,83 @@ func (c *RemoteClient) lock(info *state.LockInfo) (string, error) { c.lockCh = lockCh - err = c.putLockInfo(info) + err = c.putLockInfo(c.info) if err != nil { - if unlockErr := c.Unlock(info.ID); unlockErr != nil { + if unlockErr := c.unlock(c.info.ID); unlockErr != nil { err = multierror.Append(err, unlockErr) } return "", err } - return info.ID, nil + // Start a goroutine to monitor the lock state. + // If we lose the lock to due communication issues with the consul agent, + // attempt to immediately reacquire the lock. Put will verify the integrity + // of the state by using a CAS operation. + c.monitorCancel = make(chan struct{}) + c.monitorDone = make(chan struct{}) + go func(cancel, done chan struct{}) { + defer func() { + close(done) + }() + select { + case <-c.lockCh: + for { + c.mu.Lock() + c.consulLock = nil + _, err := c.lock() + c.mu.Unlock() + + if err != nil { + // We failed to get the lock, keep trying as long as + // terraform is running. There may be changes in progress, + // so there's no use in aborting. Either we eventually + // reacquire the lock, or a Put will fail on a CAS. + log.Printf("[ERROR] attempting to reacquire lock: %s", err) + time.Sleep(time.Second) + + select { + case <-cancel: + return + default: + } + continue + } + + // if the error was nil, the new lock started a new copy of + // this goroutine. + return + } + + case <-cancel: + return + } + }(c.monitorCancel, c.monitorDone) + + if testLockHook != nil { + testLockHook() + } + + return c.info.ID, nil } func (c *RemoteClient) Unlock(id string) error { c.mu.Lock() defer c.mu.Unlock() + if !c.lockState { + return nil + } + + return c.unlock(id) +} + +func (c *RemoteClient) unlock(id string) error { + // cancel our monitoring goroutine + if c.monitorCancel != nil { + close(c.monitorCancel) + } + // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil @@ -253,20 +332,28 @@ func (c *RemoteClient) Unlock(id string) error { default: } - err := c.consulLock.Unlock() + kv := c.Client.KV() + + var errs error + + if _, err := kv.Delete(c.Path+lockInfoSuffix, nil); err != nil { + errs = multierror.Append(errs, err) + } + + if err := c.consulLock.Unlock(); err != nil { + errs = multierror.Append(errs, err) + } + + // the monitoring goroutine may be in a select on this chan, so we need to + // wait for it to return before changing the value. + <-c.monitorDone c.lockCh = nil // This is only cleanup, and will fail if the lock was immediately taken by // another client, so we don't report an error to the user here. c.consulLock.Destroy() - kv := c.Client.KV() - _, delErr := kv.Delete(c.Path+lockInfoSuffix, nil) - if delErr != nil { - err = multierror.Append(err, delErr) - } - - return err + return errs } func compressState(data []byte) ([]byte, error) { From 82eba5801d2f692a70c01491f849c07c5c664eb9 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Tue, 30 May 2017 11:07:56 -0400 Subject: [PATCH 3/3] Test losing and reacquiring a consul lock --- backend/remote-state/consul/client_test.go | 49 ++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/backend/remote-state/consul/client_test.go b/backend/remote-state/consul/client_test.go index dc7988135..910c5e9eb 100644 --- a/backend/remote-state/consul/client_test.go +++ b/backend/remote-state/consul/client_test.go @@ -139,3 +139,52 @@ func TestConsul_destroyLock(t *testing.T) { t.Fatalf("lock key not cleaned up at: %s", pair.Key) } } + +func TestConsul_lostLock(t *testing.T) { + srv := newConsulTestServer(t) + defer srv.Stop() + + path := fmt.Sprintf("tf-unit/%s", time.Now().String()) + + // create 2 instances to get 2 remote.Clients + sA, err := backend.TestBackendConfig(t, New(), map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path, + }).State(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + sB, err := backend.TestBackendConfig(t, New(), map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path + "-not-used", + }).State(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + info := state.NewLockInfo() + info.Operation = "test-lost-lock" + id, err := sA.Lock(info) + if err != nil { + t.Fatal(err) + } + + reLocked := make(chan struct{}) + testLockHook = func() { + close(reLocked) + } + + // now we use the second client to break the lock + kv := sB.(*remote.State).Client.(*RemoteClient).Client.KV() + _, err = kv.Delete(path+lockSuffix, nil) + if err != nil { + t.Fatal(err) + } + + <-reLocked + + if err := sA.Unlock(id); err != nil { + t.Fatal(err) + } +}