diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index eb9f8254c..a0013cd2c 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -3,6 +3,7 @@ package consul import ( "bytes" "compress/gzip" + "context" "crypto/md5" "encoding/json" "errors" @@ -20,6 +21,15 @@ import ( const ( lockSuffix = "/.lock" lockInfoSuffix = "/.lockinfo" + + // The Session TTL associated with this lock. + lockSessionTTL = "15s" + + // the delay time from when a session is lost to when the + // lock is released by the server + lockDelay = 5 * time.Second + // interval between attempts to reacquire a lost lock + lockReacquireInterval = 2 * time.Second ) // RemoteClient is a remote client that stores data in Consul. @@ -44,9 +54,15 @@ type RemoteClient struct { info *state.LockInfo - // cancel the goroutine which is monitoring the lock. - monitorCancel chan struct{} - monitorDone chan struct{} + // cancel our goroutine which is monitoring the lock to automatically + // reacquire it when possible. + monitorCancel context.CancelFunc + monitorWG sync.WaitGroup + + // sessionCancel cancels the Context use for session.RenewPeriodic, and is + // called when unlocking, or before creating a new lock if the lock is + // lost. + sessionCancel context.CancelFunc } func (c *RemoteClient) Get() (*remote.Payload, error) { @@ -202,25 +218,41 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { return c.lock() } -// called after a lock is acquired -var testLockHook func() - +// the lock implementation. +// Only to be called while holding Client.mu func (c *RemoteClient) lock() (string, error) { - if c.consulLock == nil { - opts := &consulapi.LockOptions{ - Key: c.Path + lockSuffix, - // only wait briefly, so terraform has the choice to fail fast or - // retry as needed. - LockWaitTime: time.Second, - LockTryOnce: true, - } + // We create a new session here, so it can be canceled when the lock is + // lost or unlocked. + lockSession, err := c.createSession() + if err != nil { + return "", err + } - lock, err := c.Client.LockOpts(opts) - if err != nil { - return "", err - } + opts := &consulapi.LockOptions{ + Key: c.Path + lockSuffix, + Session: lockSession, - c.consulLock = lock + // only wait briefly, so terraform has the choice to fail fast or + // retry as needed. + LockWaitTime: time.Second, + LockTryOnce: true, + + // Don't let the lock monitor give up right away, as it's possible the + // session is still OK. While the session is refreshed at a rate of + // TTL/2, the lock monitor is an idle blocking request and is more + // susceptible to being closed by a lower network layer. + MonitorRetries: 5, + // + // The delay between lock monitor retries. + // While the session has a 15s TTL plus a 5s wait period on a lost + // lock, if we can't get our lock back in 10+ seconds something is + // wrong so we're going to drop the session and start over. + MonitorRetryTime: 2 * time.Second, + } + + c.consulLock, err = c.Client.LockOpts(opts) + if err != nil { + return "", err } lockErr := &state.LockError{} @@ -239,6 +271,7 @@ func (c *RemoteClient) lock() (string, error) { } lockErr.Info = lockInfo + return "", lockErr } @@ -257,16 +290,22 @@ func (c *RemoteClient) lock() (string, error) { // If we lose the lock to due communication issues with the consul agent, // attempt to immediately reacquire the lock. Put will verify the integrity // of the state by using a CAS operation. - c.monitorCancel = make(chan struct{}) - c.monitorDone = make(chan struct{}) - go func(cancel, done chan struct{}) { - defer func() { - close(done) - }() + ctx, cancel := context.WithCancel(context.Background()) + c.monitorCancel = cancel + c.monitorWG.Add(1) + go func() { + defer c.monitorWG.Done() select { case <-c.lockCh: + log.Println("[ERROR] lost consul lock") for { c.mu.Lock() + // We lost our lock, so we need to cancel the session too. + // The CancelFunc is only replaced while holding Client.mu, so + // this is safe to call here. This will be replaced by the + // lock() call below. + c.sessionCancel() + c.consulLock = nil _, err := c.lock() c.mu.Unlock() @@ -276,11 +315,11 @@ func (c *RemoteClient) lock() (string, error) { // terraform is running. There may be changes in progress, // so there's no use in aborting. Either we eventually // reacquire the lock, or a Put will fail on a CAS. - log.Printf("[ERROR] attempting to reacquire lock: %s", err) - time.Sleep(time.Second) + log.Printf("[ERROR] could not reacquire lock: %s", err) + time.Sleep(lockReacquireInterval) select { - case <-cancel: + case <-ctx.Done(): return default: } @@ -292,10 +331,10 @@ func (c *RemoteClient) lock() (string, error) { return } - case <-cancel: + case <-ctx.Done(): return } - }(c.monitorCancel, c.monitorDone) + }() if testLockHook != nil { testLockHook() @@ -304,6 +343,42 @@ func (c *RemoteClient) lock() (string, error) { return c.info.ID, nil } +// called after a lock is acquired +var testLockHook func() + +func (c *RemoteClient) createSession() (string, error) { + // create the context first. Even if the session creation fails, we assume + // that the CancelFunc is always callable. + ctx, cancel := context.WithCancel(context.Background()) + c.sessionCancel = cancel + + session := c.Client.Session() + se := &consulapi.SessionEntry{ + Name: consulapi.DefaultLockSessionName, + TTL: lockSessionTTL, + LockDelay: lockDelay, + } + + id, _, err := session.Create(se, nil) + if err != nil { + return "", err + } + + log.Println("[INFO] created consul lock session", id) + + // keep the session renewed + // we need an adapter to convert the session Done() channel to a + // non-directional channel to satisfy the RenewPeriodic signature. + done := make(chan struct{}) + go func() { + <-ctx.Done() + close(done) + }() + go session.RenewPeriodic(lockSessionTTL, id, nil, done) + + return id, nil +} + func (c *RemoteClient) Unlock(id string) error { c.mu.Lock() defer c.mu.Unlock() @@ -315,17 +390,27 @@ func (c *RemoteClient) Unlock(id string) error { return c.unlock(id) } +// the unlock implementation. +// Only to be called while holding Client.mu func (c *RemoteClient) unlock(id string) error { - // cancel our monitoring goroutine - if c.monitorCancel != nil { - close(c.monitorCancel) - } - // this doesn't use the lock id, because the lock is tied to the consul client. if c.consulLock == nil || c.lockCh == nil { return nil } + // cancel our monitoring goroutine + c.monitorCancel() + + defer func() { + c.consulLock = nil + + // The consul session is only used for this single lock, so cancel it + // after we unlock. + // The session is only created and replaced holding Client.mu, so the + // CancelFunc must be non-nil. + c.sessionCancel() + }() + select { case <-c.lockCh: return errors.New("consul lock was lost") @@ -344,9 +429,9 @@ func (c *RemoteClient) unlock(id string) error { errs = multierror.Append(errs, err) } - // the monitoring goroutine may be in a select on this chan, so we need to + // the monitoring goroutine may be in a select on the lockCh, so we need to // wait for it to return before changing the value. - <-c.monitorDone + c.monitorWG.Wait() c.lockCh = nil // This is only cleanup, and will fail if the lock was immediately taken by