Add state locking to consul backend
Use consul locks to implement state locking. The lock path is state path + "/.lock" which matches the consul cli default for locks. Lockinfo is stored at path + "/.lockinfo".
This commit is contained in:
parent
5ca5a3c78a
commit
54cac349a3
|
@ -2,15 +2,35 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/errwrap"
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/terraform/state/remote"
|
"github.com/hashicorp/terraform/state/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
lockSuffix = "/.lock"
|
||||||
|
lockInfoSuffix = "/.lockinfo"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO: use single LockInfo struct
|
||||||
|
type lockInfo struct {
|
||||||
|
Created time.Time
|
||||||
|
Info string
|
||||||
|
}
|
||||||
|
|
||||||
// RemoteClient is a remote client that stores data in Consul.
|
// RemoteClient is a remote client that stores data in Consul.
|
||||||
type RemoteClient struct {
|
type RemoteClient struct {
|
||||||
Client *consulapi.Client
|
Client *consulapi.Client
|
||||||
Path string
|
Path string
|
||||||
|
|
||||||
|
consulLock *consulapi.Lock
|
||||||
|
lockCh <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||||
|
@ -43,3 +63,121 @@ func (c *RemoteClient) Delete() error {
|
||||||
_, err := kv.Delete(c.Path, nil)
|
_, err := kv.Delete(c.Path, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) putLockInfo(info string) error {
|
||||||
|
li := &lockInfo{
|
||||||
|
Created: time.Now().UTC(),
|
||||||
|
Info: info,
|
||||||
|
}
|
||||||
|
|
||||||
|
js, err := json.Marshal(li)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
kv := c.Client.KV()
|
||||||
|
_, err = kv.Put(&consulapi.KVPair{
|
||||||
|
Key: c.Path + lockInfoSuffix,
|
||||||
|
Value: js,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) getLockInfo() (*lockInfo, error) {
|
||||||
|
path := c.Path + lockInfoSuffix
|
||||||
|
pair, _, err := c.Client.KV().Get(path, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pair == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
li := &lockInfo{}
|
||||||
|
err = json.Unmarshal(pair.Value, li)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errwrap.Wrapf("error unmarshaling lock info: {{err}}", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return li, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) Lock(info string) error {
|
||||||
|
select {
|
||||||
|
case <-c.lockCh:
|
||||||
|
// We had a lock, but lost it.
|
||||||
|
// Since we typically only call lock once, we shouldn't ever see this.
|
||||||
|
return errors.New("lost consul lock")
|
||||||
|
default:
|
||||||
|
if c.lockCh != nil {
|
||||||
|
// we have an active lock already
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.consulLock == nil {
|
||||||
|
opts := &consulapi.LockOptions{
|
||||||
|
Key: c.Path + lockSuffix,
|
||||||
|
// We currently don't procide any options to block terraform and
|
||||||
|
// retry lock acquisition, but we can wait briefly in case the
|
||||||
|
// lock is about to be freed.
|
||||||
|
LockWaitTime: time.Second,
|
||||||
|
LockTryOnce: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
lock, err := c.Client.LockOpts(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.consulLock = lock
|
||||||
|
}
|
||||||
|
|
||||||
|
lockCh, err := c.consulLock.Lock(make(chan struct{}))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if lockCh == nil {
|
||||||
|
lockInfo, e := c.getLockInfo()
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
return fmt.Errorf("state locked: created:%s, info:%q",
|
||||||
|
lockInfo.Created, lockInfo.Info)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.lockCh = lockCh
|
||||||
|
|
||||||
|
err = c.putLockInfo(info)
|
||||||
|
if err != nil {
|
||||||
|
err = multierror.Append(err, c.Unlock())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *RemoteClient) Unlock() error {
|
||||||
|
if c.consulLock == nil || c.lockCh == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.lockCh:
|
||||||
|
return errors.New("consul lock was lost")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.consulLock.Unlock()
|
||||||
|
c.lockCh = nil
|
||||||
|
|
||||||
|
kv := c.Client.KV()
|
||||||
|
_, delErr := kv.Delete(c.Path+lockInfoSuffix, nil)
|
||||||
|
if delErr != nil {
|
||||||
|
err = multierror.Append(err, delErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -27,3 +28,25 @@ func TestRemoteClient(t *testing.T) {
|
||||||
// Test
|
// Test
|
||||||
remotestate.TestClient(t, b)
|
remotestate.TestClient(t, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsul_stateLock(t *testing.T) {
|
||||||
|
addr := os.Getenv("CONSUL_HTTP_ADDR")
|
||||||
|
if addr == "" {
|
||||||
|
t.Log("consul lock tests require a running consul instance")
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
path := "testing" //fmt.Sprintf("tf-unit/%s", time.Now().String())
|
||||||
|
|
||||||
|
// create 2 instances to get 2 remote.Clients
|
||||||
|
a := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": addr,
|
||||||
|
"path": path,
|
||||||
|
})
|
||||||
|
b := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": addr,
|
||||||
|
"path": path,
|
||||||
|
})
|
||||||
|
|
||||||
|
remotestate.TestRemoteLocks(t, a, b)
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/backend"
|
"github.com/hashicorp/terraform/backend"
|
||||||
|
"github.com/hashicorp/terraform/state"
|
||||||
"github.com/hashicorp/terraform/state/remote"
|
"github.com/hashicorp/terraform/state/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,3 +16,57 @@ func TestClient(t *testing.T, raw backend.Backend) {
|
||||||
|
|
||||||
remote.TestClient(t, b.client)
|
remote.TestClient(t, b.client)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test the lock implementation for a remote.Client.
|
||||||
|
// This test requires 2 backend instances, in oder to have multiple remote
|
||||||
|
// clients since some implementations may tie the client to the lock, or may
|
||||||
|
// have reentrant locks.
|
||||||
|
func TestRemoteLocks(t *testing.T, a, b backend.Backend) {
|
||||||
|
sA, err := a.State()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("failed to get state from backend A:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sB, err := b.State()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("failed to get state from backend B:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lockerA, ok := sA.(state.Locker)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("client A not a state.Locker")
|
||||||
|
}
|
||||||
|
|
||||||
|
lockerB, ok := sB.(state.Locker)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("client B not a state.Locker")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerA.Lock("test client A"); err != nil {
|
||||||
|
t.Fatal("unable to get initial lock:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Lock("test client B"); err == nil {
|
||||||
|
lockerA.Unlock()
|
||||||
|
t.Fatal("client B obtained lock while held by client A")
|
||||||
|
} else {
|
||||||
|
t.Log("lock info error:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerA.Unlock(); err != nil {
|
||||||
|
t.Fatal("error unlocking client A", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Lock("test client B"); err != nil {
|
||||||
|
t.Fatal("unable to obtain lock from client B")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lockerB.Unlock(); err != nil {
|
||||||
|
t.Fatal("error unlocking client B:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// unlock should be repeatable
|
||||||
|
if err := lockerA.Unlock(); err != nil {
|
||||||
|
t.Fatal("Unlock error from client A when state was not locked:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue