212 lines
4.3 KiB
Go
212 lines
4.3 KiB
Go
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
etcdv3 "github.com/coreos/etcd/clientv3"
|
|
etcdv3sync "github.com/coreos/etcd/clientv3/concurrency"
|
|
"github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/terraform/states/remote"
|
|
"github.com/hashicorp/terraform/states/statemgr"
|
|
)
|
|
|
|
const (
|
|
lockAcquireTimeout = 2 * time.Second
|
|
lockInfoSuffix = ".lockinfo"
|
|
)
|
|
|
|
// RemoteClient is a remote client that will store data in etcd.
|
|
type RemoteClient struct {
|
|
Client *etcdv3.Client
|
|
DoLock bool
|
|
Key string
|
|
|
|
etcdMutex *etcdv3sync.Mutex
|
|
etcdSession *etcdv3sync.Session
|
|
info *statemgr.LockInfo
|
|
mu sync.Mutex
|
|
modRevision int64
|
|
}
|
|
|
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
res, err := c.Client.KV.Get(context.TODO(), c.Key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if res.Count == 0 {
|
|
return nil, nil
|
|
}
|
|
if res.Count >= 2 {
|
|
return nil, fmt.Errorf("Expected a single result but got %d.", res.Count)
|
|
}
|
|
|
|
c.modRevision = res.Kvs[0].ModRevision
|
|
|
|
payload := res.Kvs[0].Value
|
|
md5 := md5.Sum(payload)
|
|
|
|
return &remote.Payload{
|
|
Data: payload,
|
|
MD5: md5[:],
|
|
}, nil
|
|
}
|
|
|
|
func (c *RemoteClient) Put(data []byte) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
res, err := etcdv3.NewKV(c.Client).Txn(context.TODO()).If(
|
|
etcdv3.Compare(etcdv3.ModRevision(c.Key), "=", c.modRevision),
|
|
).Then(
|
|
etcdv3.OpPut(c.Key, string(data)),
|
|
etcdv3.OpGet(c.Key),
|
|
).Commit()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !res.Succeeded {
|
|
return fmt.Errorf("The transaction did not succeed.")
|
|
}
|
|
if len(res.Responses) != 2 {
|
|
return fmt.Errorf("Expected two responses but got %d.", len(res.Responses))
|
|
}
|
|
|
|
c.modRevision = res.Responses[1].GetResponseRange().Kvs[0].ModRevision
|
|
return nil
|
|
}
|
|
|
|
func (c *RemoteClient) Delete() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
_, err := c.Client.KV.Delete(context.TODO(), c.Key)
|
|
return err
|
|
}
|
|
|
|
func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.DoLock {
|
|
return "", nil
|
|
}
|
|
if c.etcdSession != nil {
|
|
return "", fmt.Errorf("state %q already locked", c.Key)
|
|
}
|
|
|
|
c.info = info
|
|
return c.lock()
|
|
}
|
|
|
|
func (c *RemoteClient) Unlock(id string) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.DoLock {
|
|
return nil
|
|
}
|
|
|
|
return c.unlock(id)
|
|
}
|
|
|
|
func (c *RemoteClient) deleteLockInfo(info *statemgr.LockInfo) error {
|
|
res, err := c.Client.KV.Delete(context.TODO(), c.Key+lockInfoSuffix)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if res.Deleted == 0 {
|
|
return fmt.Errorf("No keys deleted for %s when deleting lock info.", c.Key+lockInfoSuffix)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *RemoteClient) getLockInfo() (*statemgr.LockInfo, error) {
|
|
res, err := c.Client.KV.Get(context.TODO(), c.Key+lockInfoSuffix)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if res.Count == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
li := &statemgr.LockInfo{}
|
|
err = json.Unmarshal(res.Kvs[0].Value, li)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error unmarshaling lock info: %s.", err)
|
|
}
|
|
|
|
return li, nil
|
|
}
|
|
|
|
func (c *RemoteClient) putLockInfo(info *statemgr.LockInfo) error {
|
|
c.info.Path = c.etcdMutex.Key()
|
|
c.info.Created = time.Now().UTC()
|
|
|
|
_, err := c.Client.KV.Put(context.TODO(), c.Key+lockInfoSuffix, string(c.info.Marshal()))
|
|
return err
|
|
}
|
|
|
|
func (c *RemoteClient) lock() (string, error) {
|
|
session, err := etcdv3sync.NewSession(c.Client)
|
|
if err != nil {
|
|
return "", nil
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.TODO(), lockAcquireTimeout)
|
|
defer cancel()
|
|
|
|
mutex := etcdv3sync.NewMutex(session, c.Key)
|
|
if err1 := mutex.Lock(ctx); err1 != nil {
|
|
lockInfo, err2 := c.getLockInfo()
|
|
if err2 != nil {
|
|
return "", &statemgr.LockError{Err: err2}
|
|
}
|
|
return "", &statemgr.LockError{Info: lockInfo, Err: err1}
|
|
}
|
|
|
|
c.etcdMutex = mutex
|
|
c.etcdSession = session
|
|
|
|
err = c.putLockInfo(c.info)
|
|
if err != nil {
|
|
if unlockErr := c.unlock(c.info.ID); unlockErr != nil {
|
|
err = multierror.Append(err, unlockErr)
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
return c.info.ID, nil
|
|
}
|
|
|
|
func (c *RemoteClient) unlock(id string) error {
|
|
if c.etcdMutex == nil {
|
|
return nil
|
|
}
|
|
|
|
var errs error
|
|
|
|
if err := c.deleteLockInfo(c.info); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
if err := c.etcdMutex.Unlock(context.TODO()); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
if err := c.etcdSession.Close(); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
|
|
c.etcdMutex = nil
|
|
c.etcdSession = nil
|
|
|
|
return errs
|
|
}
|