have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated session is still active. Most commonly, the long request to watch the lock key may error out, while the session is continually refreshed at a rate of TTL/2. First have the lock monitor retry the lock internally for at least 10 seconds (5 attempts with the default 2 second wait time). In most cases this will reconnect on the first try, keeping the lock channel open. If the consul lock can't recover itself, then cancel the session as soon as possible (terminating the PreiodicRenew will call Session.Destroy), and start over. In the worse case, the consul agents were split, and the session still exists on the leader so we may need to wait for the old session TTL, plus the LockWait time to renew the lock. We use a Context for the cancellation channels here, because that removes the need to worry about double-closes and nil channels. It requires an awkward adapter goroutine for now to convert the Done() `<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code safer in the long run.
This commit is contained in:
parent
8e2ee53ed3
commit
bcb11f6d89
|
@ -3,6 +3,7 @@ package consul
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
|
"context"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -20,6 +21,15 @@ import (
|
||||||
const (
|
const (
|
||||||
lockSuffix = "/.lock"
|
lockSuffix = "/.lock"
|
||||||
lockInfoSuffix = "/.lockinfo"
|
lockInfoSuffix = "/.lockinfo"
|
||||||
|
|
||||||
|
// The Session TTL associated with this lock.
|
||||||
|
lockSessionTTL = "15s"
|
||||||
|
|
||||||
|
// the delay time from when a session is lost to when the
|
||||||
|
// lock is released by the server
|
||||||
|
lockDelay = 5 * time.Second
|
||||||
|
// interval between attempts to reacquire a lost lock
|
||||||
|
lockReacquireInterval = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// RemoteClient is a remote client that stores data in Consul.
|
// RemoteClient is a remote client that stores data in Consul.
|
||||||
|
@ -44,9 +54,15 @@ type RemoteClient struct {
|
||||||
|
|
||||||
info *state.LockInfo
|
info *state.LockInfo
|
||||||
|
|
||||||
// cancel the goroutine which is monitoring the lock.
|
// cancel our goroutine which is monitoring the lock to automatically
|
||||||
monitorCancel chan struct{}
|
// reacquire it when possible.
|
||||||
monitorDone chan struct{}
|
monitorCancel context.CancelFunc
|
||||||
|
monitorWG sync.WaitGroup
|
||||||
|
|
||||||
|
// sessionCancel cancels the Context use for session.RenewPeriodic, and is
|
||||||
|
// called when unlocking, or before creating a new lock if the lock is
|
||||||
|
// lost.
|
||||||
|
sessionCancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
|
@ -202,25 +218,41 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||||
return c.lock()
|
return c.lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// called after a lock is acquired
|
// the lock implementation.
|
||||||
var testLockHook func()
|
// Only to be called while holding Client.mu
|
||||||
|
|
||||||
func (c *RemoteClient) lock() (string, error) {
|
func (c *RemoteClient) lock() (string, error) {
|
||||||
if c.consulLock == nil {
|
// We create a new session here, so it can be canceled when the lock is
|
||||||
opts := &consulapi.LockOptions{
|
// lost or unlocked.
|
||||||
Key: c.Path + lockSuffix,
|
lockSession, err := c.createSession()
|
||||||
// only wait briefly, so terraform has the choice to fail fast or
|
|
||||||
// retry as needed.
|
|
||||||
LockWaitTime: time.Second,
|
|
||||||
LockTryOnce: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
lock, err := c.Client.LockOpts(opts)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.consulLock = lock
|
opts := &consulapi.LockOptions{
|
||||||
|
Key: c.Path + lockSuffix,
|
||||||
|
Session: lockSession,
|
||||||
|
|
||||||
|
// only wait briefly, so terraform has the choice to fail fast or
|
||||||
|
// retry as needed.
|
||||||
|
LockWaitTime: time.Second,
|
||||||
|
LockTryOnce: true,
|
||||||
|
|
||||||
|
// Don't let the lock monitor give up right away, as it's possible the
|
||||||
|
// session is still OK. While the session is refreshed at a rate of
|
||||||
|
// TTL/2, the lock monitor is an idle blocking request and is more
|
||||||
|
// susceptible to being closed by a lower network layer.
|
||||||
|
MonitorRetries: 5,
|
||||||
|
//
|
||||||
|
// The delay between lock monitor retries.
|
||||||
|
// While the session has a 15s TTL plus a 5s wait period on a lost
|
||||||
|
// lock, if we can't get our lock back in 10+ seconds something is
|
||||||
|
// wrong so we're going to drop the session and start over.
|
||||||
|
MonitorRetryTime: 2 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.consulLock, err = c.Client.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
lockErr := &state.LockError{}
|
lockErr := &state.LockError{}
|
||||||
|
@ -239,6 +271,7 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
lockErr.Info = lockInfo
|
lockErr.Info = lockInfo
|
||||||
|
|
||||||
return "", lockErr
|
return "", lockErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,16 +290,22 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
// If we lose the lock to due communication issues with the consul agent,
|
// If we lose the lock to due communication issues with the consul agent,
|
||||||
// attempt to immediately reacquire the lock. Put will verify the integrity
|
// attempt to immediately reacquire the lock. Put will verify the integrity
|
||||||
// of the state by using a CAS operation.
|
// of the state by using a CAS operation.
|
||||||
c.monitorCancel = make(chan struct{})
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c.monitorDone = make(chan struct{})
|
c.monitorCancel = cancel
|
||||||
go func(cancel, done chan struct{}) {
|
c.monitorWG.Add(1)
|
||||||
defer func() {
|
go func() {
|
||||||
close(done)
|
defer c.monitorWG.Done()
|
||||||
}()
|
|
||||||
select {
|
select {
|
||||||
case <-c.lockCh:
|
case <-c.lockCh:
|
||||||
|
log.Println("[ERROR] lost consul lock")
|
||||||
for {
|
for {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
// We lost our lock, so we need to cancel the session too.
|
||||||
|
// The CancelFunc is only replaced while holding Client.mu, so
|
||||||
|
// this is safe to call here. This will be replaced by the
|
||||||
|
// lock() call below.
|
||||||
|
c.sessionCancel()
|
||||||
|
|
||||||
c.consulLock = nil
|
c.consulLock = nil
|
||||||
_, err := c.lock()
|
_, err := c.lock()
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
@ -276,11 +315,11 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
// terraform is running. There may be changes in progress,
|
// terraform is running. There may be changes in progress,
|
||||||
// so there's no use in aborting. Either we eventually
|
// so there's no use in aborting. Either we eventually
|
||||||
// reacquire the lock, or a Put will fail on a CAS.
|
// reacquire the lock, or a Put will fail on a CAS.
|
||||||
log.Printf("[ERROR] attempting to reacquire lock: %s", err)
|
log.Printf("[ERROR] could not reacquire lock: %s", err)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(lockReacquireInterval)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-cancel:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -292,10 +331,10 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-cancel:
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}(c.monitorCancel, c.monitorDone)
|
}()
|
||||||
|
|
||||||
if testLockHook != nil {
|
if testLockHook != nil {
|
||||||
testLockHook()
|
testLockHook()
|
||||||
|
@ -304,6 +343,42 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
return c.info.ID, nil
|
return c.info.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// called after a lock is acquired
|
||||||
|
var testLockHook func()
|
||||||
|
|
||||||
|
func (c *RemoteClient) createSession() (string, error) {
|
||||||
|
// create the context first. Even if the session creation fails, we assume
|
||||||
|
// that the CancelFunc is always callable.
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
c.sessionCancel = cancel
|
||||||
|
|
||||||
|
session := c.Client.Session()
|
||||||
|
se := &consulapi.SessionEntry{
|
||||||
|
Name: consulapi.DefaultLockSessionName,
|
||||||
|
TTL: lockSessionTTL,
|
||||||
|
LockDelay: lockDelay,
|
||||||
|
}
|
||||||
|
|
||||||
|
id, _, err := session.Create(se, nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("[INFO] created consul lock session", id)
|
||||||
|
|
||||||
|
// keep the session renewed
|
||||||
|
// we need an adapter to convert the session Done() channel to a
|
||||||
|
// non-directional channel to satisfy the RenewPeriodic signature.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
go session.RenewPeriodic(lockSessionTTL, id, nil, done)
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Unlock(id string) error {
|
func (c *RemoteClient) Unlock(id string) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
@ -315,17 +390,27 @@ func (c *RemoteClient) Unlock(id string) error {
|
||||||
return c.unlock(id)
|
return c.unlock(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the unlock implementation.
|
||||||
|
// Only to be called while holding Client.mu
|
||||||
func (c *RemoteClient) unlock(id string) error {
|
func (c *RemoteClient) unlock(id string) error {
|
||||||
// cancel our monitoring goroutine
|
|
||||||
if c.monitorCancel != nil {
|
|
||||||
close(c.monitorCancel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// this doesn't use the lock id, because the lock is tied to the consul client.
|
// this doesn't use the lock id, because the lock is tied to the consul client.
|
||||||
if c.consulLock == nil || c.lockCh == nil {
|
if c.consulLock == nil || c.lockCh == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel our monitoring goroutine
|
||||||
|
c.monitorCancel()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
c.consulLock = nil
|
||||||
|
|
||||||
|
// The consul session is only used for this single lock, so cancel it
|
||||||
|
// after we unlock.
|
||||||
|
// The session is only created and replaced holding Client.mu, so the
|
||||||
|
// CancelFunc must be non-nil.
|
||||||
|
c.sessionCancel()
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c.lockCh:
|
case <-c.lockCh:
|
||||||
return errors.New("consul lock was lost")
|
return errors.New("consul lock was lost")
|
||||||
|
@ -344,9 +429,9 @@ func (c *RemoteClient) unlock(id string) error {
|
||||||
errs = multierror.Append(errs, err)
|
errs = multierror.Append(errs, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the monitoring goroutine may be in a select on this chan, so we need to
|
// the monitoring goroutine may be in a select on the lockCh, so we need to
|
||||||
// wait for it to return before changing the value.
|
// wait for it to return before changing the value.
|
||||||
<-c.monitorDone
|
c.monitorWG.Wait()
|
||||||
c.lockCh = nil
|
c.lockCh = nil
|
||||||
|
|
||||||
// This is only cleanup, and will fail if the lock was immediately taken by
|
// This is only cleanup, and will fail if the lock was immediately taken by
|
||||||
|
|
Loading…
Reference in New Issue