Merge pull request #14930 from hashicorp/jbardin/consul-cas
Use CAS to Put consul state, and reacquire locks when necessary
This commit is contained in:
commit
9548987f96
|
@ -94,6 +94,7 @@ func (b *Backend) State(name string) (state.State, error) {
|
||||||
Client: client,
|
Client: client,
|
||||||
Path: path,
|
Path: path,
|
||||||
GZip: gzip,
|
GZip: gzip,
|
||||||
|
lockState: b.lock,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
@ -26,11 +28,31 @@ type RemoteClient struct {
|
||||||
Path string
|
Path string
|
||||||
GZip bool
|
GZip bool
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
// lockState is true if we're using locks
|
||||||
|
lockState bool
|
||||||
|
|
||||||
|
// The index of the last state we wrote.
|
||||||
|
// If this is > 0, Put will perform a CAS to ensure that the state wasn't
|
||||||
|
// changed during the operation. This is important even with locks, because
|
||||||
|
// if the client loses the lock for some reason, then reacquires it, we
|
||||||
|
// need to make sure that the state was not modified.
|
||||||
|
modifyIndex uint64
|
||||||
|
|
||||||
consulLock *consulapi.Lock
|
consulLock *consulapi.Lock
|
||||||
lockCh <-chan struct{}
|
lockCh <-chan struct{}
|
||||||
|
|
||||||
|
info *state.LockInfo
|
||||||
|
|
||||||
|
// cancel the goroutine which is monitoring the lock.
|
||||||
|
monitorCancel chan struct{}
|
||||||
|
monitorDone chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
pair, _, err := c.Client.KV().Get(c.Path, nil)
|
pair, _, err := c.Client.KV().Get(c.Path, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -39,6 +61,8 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.modifyIndex = pair.ModifyIndex
|
||||||
|
|
||||||
payload := pair.Value
|
payload := pair.Value
|
||||||
// If the payload starts with 0x1f, it's gzip, not json
|
// If the payload starts with 0x1f, it's gzip, not json
|
||||||
if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' {
|
if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' {
|
||||||
|
@ -57,6 +81,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Put(data []byte) error {
|
func (c *RemoteClient) Put(data []byte) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
payload := data
|
payload := data
|
||||||
if c.GZip {
|
if c.GZip {
|
||||||
if compressedState, err := compressState(data); err == nil {
|
if compressedState, err := compressState(data); err == nil {
|
||||||
|
@ -67,14 +94,50 @@ func (c *RemoteClient) Put(data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
kv := c.Client.KV()
|
kv := c.Client.KV()
|
||||||
_, err := kv.Put(&consulapi.KVPair{
|
|
||||||
|
// default to doing a CAS
|
||||||
|
verb := consulapi.KVCAS
|
||||||
|
|
||||||
|
// Assume a 0 index doesn't need a CAS for now, since we are either
|
||||||
|
// creating a new state or purposely overwriting one.
|
||||||
|
if c.modifyIndex == 0 {
|
||||||
|
verb = consulapi.KVSet
|
||||||
|
}
|
||||||
|
|
||||||
|
// KV.Put doesn't return the new index, so we use a single operation
|
||||||
|
// transaction to get the new index with a single request.
|
||||||
|
txOps := consulapi.KVTxnOps{
|
||||||
|
&consulapi.KVTxnOp{
|
||||||
|
Verb: verb,
|
||||||
Key: c.Path,
|
Key: c.Path,
|
||||||
Value: payload,
|
Value: payload,
|
||||||
}, nil)
|
Index: c.modifyIndex,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, resp, _, err := kv.Txn(txOps, nil)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// transaction was rolled back
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Results) != 1 {
|
||||||
|
// this probably shouldn't happen
|
||||||
|
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.modifyIndex = resp.Results[0].ModifyIndex
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Delete() error {
|
func (c *RemoteClient) Delete() error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
kv := c.Client.KV()
|
kv := c.Client.KV()
|
||||||
_, err := kv.Delete(c.Path, nil)
|
_, err := kv.Delete(c.Path, nil)
|
||||||
return err
|
return err
|
||||||
|
@ -113,11 +176,22 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if !c.lockState {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.info = info
|
||||||
|
|
||||||
|
// These checks only are to ensure we strictly follow the specification.
|
||||||
|
// Terraform shouldn't ever re-lock, so provide errors for the 2 possible
|
||||||
|
// states if this is called.
|
||||||
select {
|
select {
|
||||||
case <-c.lockCh:
|
case <-c.lockCh:
|
||||||
// We had a lock, but lost it.
|
// We had a lock, but lost it.
|
||||||
// Since we typically only call lock once, we shouldn't ever see this.
|
return "", errors.New("lost consul lock, cannot re-lock")
|
||||||
return "", errors.New("lost consul lock")
|
|
||||||
default:
|
default:
|
||||||
if c.lockCh != nil {
|
if c.lockCh != nil {
|
||||||
// we have an active lock already
|
// we have an active lock already
|
||||||
|
@ -125,6 +199,13 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return c.lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// called after a lock is acquired
|
||||||
|
var testLockHook func()
|
||||||
|
|
||||||
|
func (c *RemoteClient) lock() (string, error) {
|
||||||
if c.consulLock == nil {
|
if c.consulLock == nil {
|
||||||
opts := &consulapi.LockOptions{
|
opts := &consulapi.LockOptions{
|
||||||
Key: c.Path + lockSuffix,
|
Key: c.Path + lockSuffix,
|
||||||
|
@ -163,19 +244,83 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||||
|
|
||||||
c.lockCh = lockCh
|
c.lockCh = lockCh
|
||||||
|
|
||||||
err = c.putLockInfo(info)
|
err = c.putLockInfo(c.info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if unlockErr := c.Unlock(info.ID); unlockErr != nil {
|
if unlockErr := c.unlock(c.info.ID); unlockErr != nil {
|
||||||
err = multierror.Append(err, unlockErr)
|
err = multierror.Append(err, unlockErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return info.ID, nil
|
// Start a goroutine to monitor the lock state.
|
||||||
|
// If we lose the lock to due communication issues with the consul agent,
|
||||||
|
// attempt to immediately reacquire the lock. Put will verify the integrity
|
||||||
|
// of the state by using a CAS operation.
|
||||||
|
c.monitorCancel = make(chan struct{})
|
||||||
|
c.monitorDone = make(chan struct{})
|
||||||
|
go func(cancel, done chan struct{}) {
|
||||||
|
defer func() {
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-c.lockCh:
|
||||||
|
for {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.consulLock = nil
|
||||||
|
_, err := c.lock()
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// We failed to get the lock, keep trying as long as
|
||||||
|
// terraform is running. There may be changes in progress,
|
||||||
|
// so there's no use in aborting. Either we eventually
|
||||||
|
// reacquire the lock, or a Put will fail on a CAS.
|
||||||
|
log.Printf("[ERROR] attempting to reacquire lock: %s", err)
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cancel:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the error was nil, the new lock started a new copy of
|
||||||
|
// this goroutine.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-cancel:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(c.monitorCancel, c.monitorDone)
|
||||||
|
|
||||||
|
if testLockHook != nil {
|
||||||
|
testLockHook()
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.info.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Unlock(id string) error {
|
func (c *RemoteClient) Unlock(id string) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if !c.lockState {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.unlock(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) unlock(id string) error {
|
||||||
|
// cancel our monitoring goroutine
|
||||||
|
if c.monitorCancel != nil {
|
||||||
|
close(c.monitorCancel)
|
||||||
|
}
|
||||||
|
|
||||||
// this doesn't use the lock id, because the lock is tied to the consul client.
|
// this doesn't use the lock id, because the lock is tied to the consul client.
|
||||||
if c.consulLock == nil || c.lockCh == nil {
|
if c.consulLock == nil || c.lockCh == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -187,20 +332,28 @@ func (c *RemoteClient) Unlock(id string) error {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.consulLock.Unlock()
|
kv := c.Client.KV()
|
||||||
|
|
||||||
|
var errs error
|
||||||
|
|
||||||
|
if _, err := kv.Delete(c.Path+lockInfoSuffix, nil); err != nil {
|
||||||
|
errs = multierror.Append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.consulLock.Unlock(); err != nil {
|
||||||
|
errs = multierror.Append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the monitoring goroutine may be in a select on this chan, so we need to
|
||||||
|
// wait for it to return before changing the value.
|
||||||
|
<-c.monitorDone
|
||||||
c.lockCh = nil
|
c.lockCh = nil
|
||||||
|
|
||||||
// This is only cleanup, and will fail if the lock was immediately taken by
|
// This is only cleanup, and will fail if the lock was immediately taken by
|
||||||
// another client, so we don't report an error to the user here.
|
// another client, so we don't report an error to the user here.
|
||||||
c.consulLock.Destroy()
|
c.consulLock.Destroy()
|
||||||
|
|
||||||
kv := c.Client.KV()
|
return errs
|
||||||
_, delErr := kv.Delete(c.Path+lockInfoSuffix, nil)
|
|
||||||
if delErr != nil {
|
|
||||||
err = multierror.Append(err, delErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func compressState(data []byte) ([]byte, error) {
|
func compressState(data []byte) ([]byte, error) {
|
||||||
|
|
|
@ -139,3 +139,52 @@ func TestConsul_destroyLock(t *testing.T) {
|
||||||
t.Fatalf("lock key not cleaned up at: %s", pair.Key)
|
t.Fatalf("lock key not cleaned up at: %s", pair.Key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsul_lostLock(t *testing.T) {
|
||||||
|
srv := newConsulTestServer(t)
|
||||||
|
defer srv.Stop()
|
||||||
|
|
||||||
|
path := fmt.Sprintf("tf-unit/%s", time.Now().String())
|
||||||
|
|
||||||
|
// create 2 instances to get 2 remote.Clients
|
||||||
|
sA, err := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": srv.HTTPAddr,
|
||||||
|
"path": path,
|
||||||
|
}).State(backend.DefaultStateName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sB, err := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": srv.HTTPAddr,
|
||||||
|
"path": path + "-not-used",
|
||||||
|
}).State(backend.DefaultStateName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
info := state.NewLockInfo()
|
||||||
|
info.Operation = "test-lost-lock"
|
||||||
|
id, err := sA.Lock(info)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reLocked := make(chan struct{})
|
||||||
|
testLockHook = func() {
|
||||||
|
close(reLocked)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we use the second client to break the lock
|
||||||
|
kv := sB.(*remote.State).Client.(*RemoteClient).Client.KV()
|
||||||
|
_, err = kv.Delete(path+lockSuffix, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-reLocked
|
||||||
|
|
||||||
|
if err := sA.Unlock(id); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue