use CAS for consul state Put
This commit is contained in:
parent
46be301806
commit
e8330b6f53
|
@ -7,6 +7,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
consulapi "github.com/hashicorp/consul/api"
|
||||
|
@ -26,11 +27,25 @@ type RemoteClient struct {
|
|||
Path string
|
||||
GZip bool
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
// The index of the last state we wrote.
|
||||
// If this is > 0, Put will perform a CAS to ensure that the state wasn't
|
||||
// changed during the operation. This is important even with locks, because
|
||||
// if the client loses the lock for some reason, then reacquires it, we
|
||||
// need to make sure that the state was not modified.
|
||||
modifyIndex uint64
|
||||
|
||||
consulLock *consulapi.Lock
|
||||
lockCh <-chan struct{}
|
||||
|
||||
info *state.LockInfo
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
pair, _, err := c.Client.KV().Get(c.Path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -39,6 +54,8 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
c.modifyIndex = pair.ModifyIndex
|
||||
|
||||
payload := pair.Value
|
||||
// If the payload starts with 0x1f, it's gzip, not json
|
||||
if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' {
|
||||
|
@ -57,6 +74,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|||
}
|
||||
|
||||
func (c *RemoteClient) Put(data []byte) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
payload := data
|
||||
if c.GZip {
|
||||
if compressedState, err := compressState(data); err == nil {
|
||||
|
@ -67,14 +87,50 @@ func (c *RemoteClient) Put(data []byte) error {
|
|||
}
|
||||
|
||||
kv := c.Client.KV()
|
||||
_, err := kv.Put(&consulapi.KVPair{
|
||||
Key: c.Path,
|
||||
Value: payload,
|
||||
}, nil)
|
||||
return err
|
||||
|
||||
verb := consulapi.KVCAS
|
||||
|
||||
// Assume a 0 index doesn't need a CAS for now, since we are either
|
||||
// creating a new state or purposely overwriting one.
|
||||
if c.modifyIndex == 0 {
|
||||
verb = consulapi.KVSet
|
||||
}
|
||||
|
||||
// KV.Put doesn't return the new index, so we use a single operation
|
||||
// transaction to get the new index with a single request.
|
||||
txOps := consulapi.KVTxnOps{
|
||||
&consulapi.KVTxnOp{
|
||||
Verb: verb,
|
||||
Key: c.Path,
|
||||
Value: payload,
|
||||
Index: c.modifyIndex,
|
||||
},
|
||||
}
|
||||
|
||||
ok, resp, _, err := kv.Txn(txOps, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// transaction was rolled back
|
||||
if !ok {
|
||||
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
||||
}
|
||||
|
||||
if len(resp.Results) != 1 {
|
||||
// this probably shouldn't happen
|
||||
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
||||
}
|
||||
|
||||
c.modifyIndex = resp.Results[0].ModifyIndex
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Delete() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
kv := c.Client.KV()
|
||||
_, err := kv.Delete(c.Path, nil)
|
||||
return err
|
||||
|
@ -113,6 +169,9 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
|
|||
}
|
||||
|
||||
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-c.lockCh:
|
||||
// We had a lock, but lost it.
|
||||
|
@ -125,6 +184,10 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return c.lock(info)
|
||||
}
|
||||
|
||||
func (c *RemoteClient) lock(info *state.LockInfo) (string, error) {
|
||||
if c.consulLock == nil {
|
||||
opts := &consulapi.LockOptions{
|
||||
Key: c.Path + lockSuffix,
|
||||
|
@ -176,6 +239,9 @@ func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
|||
}
|
||||
|
||||
func (c *RemoteClient) Unlock(id string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// this doesn't use the lock id, because the lock is tied to the consul client.
|
||||
if c.consulLock == nil || c.lockCh == nil {
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue