2017-01-19 05:49:01 +01:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
2017-03-09 08:00:19 +01:00
|
|
|
"bytes"
|
|
|
|
"compress/gzip"
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
"context"
|
2017-01-19 05:49:01 +01:00
|
|
|
"crypto/md5"
|
2017-02-07 16:05:53 +01:00
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
2017-02-15 20:01:18 +01:00
|
|
|
"fmt"
|
2017-05-28 22:07:24 +02:00
|
|
|
"log"
|
2020-08-13 16:29:43 +02:00
|
|
|
"strings"
|
2017-05-28 21:16:51 +02:00
|
|
|
"sync"
|
2017-02-07 16:05:53 +01:00
|
|
|
"time"
|
2017-01-19 05:49:01 +01:00
|
|
|
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
2017-02-07 16:05:53 +01:00
|
|
|
multierror "github.com/hashicorp/go-multierror"
|
2020-08-11 17:43:01 +02:00
|
|
|
"github.com/hashicorp/terraform/states/remote"
|
|
|
|
"github.com/hashicorp/terraform/states/statemgr"
|
2017-01-19 05:49:01 +01:00
|
|
|
)
|
|
|
|
|
2017-02-07 16:05:53 +01:00
|
|
|
const (
|
|
|
|
lockSuffix = "/.lock"
|
|
|
|
lockInfoSuffix = "/.lockinfo"
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
|
|
|
|
// The Session TTL associated with this lock.
|
|
|
|
lockSessionTTL = "15s"
|
|
|
|
|
|
|
|
// the delay time from when a session is lost to when the
|
|
|
|
// lock is released by the server
|
|
|
|
lockDelay = 5 * time.Second
|
|
|
|
// interval between attempts to reacquire a lost lock
|
|
|
|
lockReacquireInterval = 2 * time.Second
|
2017-02-07 16:05:53 +01:00
|
|
|
)
|
|
|
|
|
2017-10-08 18:57:11 +02:00
|
|
|
var lostLockErr = errors.New("consul lock was lost")
|
|
|
|
|
2017-01-19 05:49:01 +01:00
|
|
|
// RemoteClient is a remote client that stores data in Consul.
|
|
|
|
type RemoteClient struct {
|
|
|
|
Client *consulapi.Client
|
|
|
|
Path string
|
2017-03-13 08:17:33 +01:00
|
|
|
GZip bool
|
2017-02-07 16:05:53 +01:00
|
|
|
|
2017-05-28 21:16:51 +02:00
|
|
|
mu sync.Mutex
|
2017-05-28 22:07:24 +02:00
|
|
|
// lockState is true if we're using locks
|
|
|
|
lockState bool
|
2017-05-28 21:16:51 +02:00
|
|
|
|
|
|
|
// The index of the last state we wrote.
|
|
|
|
// If this is > 0, Put will perform a CAS to ensure that the state wasn't
|
|
|
|
// changed during the operation. This is important even with locks, because
|
|
|
|
// if the client loses the lock for some reason, then reacquires it, we
|
|
|
|
// need to make sure that the state was not modified.
|
|
|
|
modifyIndex uint64
|
|
|
|
|
2017-02-07 16:05:53 +01:00
|
|
|
consulLock *consulapi.Lock
|
|
|
|
lockCh <-chan struct{}
|
2017-05-28 21:16:51 +02:00
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
info *statemgr.LockInfo
|
2017-05-28 22:07:24 +02:00
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// cancel our goroutine which is monitoring the lock to automatically
|
|
|
|
// reacquire it when possible.
|
|
|
|
monitorCancel context.CancelFunc
|
|
|
|
monitorWG sync.WaitGroup
|
|
|
|
|
|
|
|
// sessionCancel cancels the Context use for session.RenewPeriodic, and is
|
|
|
|
// called when unlocking, or before creating a new lock if the lock is
|
|
|
|
// lost.
|
|
|
|
sessionCancel context.CancelFunc
|
2017-01-19 05:49:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
2017-05-28 21:16:51 +02:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-08-14 17:38:18 +02:00
|
|
|
kv := c.Client.KV()
|
|
|
|
|
|
|
|
chunked, hash, chunks, pair, err := c.chunkedMode()
|
2017-01-19 05:49:01 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pair == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2017-05-28 21:16:51 +02:00
|
|
|
c.modifyIndex = pair.ModifyIndex
|
|
|
|
|
2020-08-14 17:38:18 +02:00
|
|
|
var payload []byte
|
|
|
|
if chunked {
|
|
|
|
for _, c := range chunks {
|
|
|
|
pair, _, err := kv.Get(c, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pair == nil {
|
|
|
|
return nil, fmt.Errorf("Key %q could not be found", c)
|
|
|
|
}
|
|
|
|
payload = append(payload, pair.Value[:]...)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
payload = pair.Value
|
|
|
|
}
|
|
|
|
|
2017-03-09 08:00:19 +01:00
|
|
|
// If the payload starts with 0x1f, it's gzip, not json
|
2020-08-14 17:38:18 +02:00
|
|
|
if len(payload) >= 1 && payload[0] == '\x1f' {
|
|
|
|
payload, err = uncompressState(payload)
|
|
|
|
if err != nil {
|
2017-03-09 08:00:19 +01:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-14 17:38:18 +02:00
|
|
|
md5 := md5.Sum(payload)
|
|
|
|
|
|
|
|
if hash != "" && fmt.Sprintf("%x", md5) != hash {
|
|
|
|
return nil, fmt.Errorf("The remote state does not match the expected hash")
|
|
|
|
}
|
|
|
|
|
2017-01-19 05:49:01 +01:00
|
|
|
return &remote.Payload{
|
2017-03-09 08:00:19 +01:00
|
|
|
Data: payload,
|
2017-01-19 05:49:01 +01:00
|
|
|
MD5: md5[:],
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *RemoteClient) Put(data []byte) error {
|
2020-08-14 17:38:18 +02:00
|
|
|
// The state can be stored in 4 different ways, based on the payload size
|
|
|
|
// and whether the user enabled gzip:
|
|
|
|
// - single entry mode with plain JSON: a single JSON is stored at
|
|
|
|
// "tfstate/my_project"
|
|
|
|
// - single entry mode gzip: the JSON payload is first gziped and stored at
|
|
|
|
// "tfstate/my_project"
|
|
|
|
// - chunked mode with plain JSON: the JSON payload is split in pieces and
|
|
|
|
// stored like so:
|
|
|
|
// - "tfstate/my_project" -> a JSON payload that contains the path of
|
|
|
|
// the chunks and an MD5 sum like so:
|
|
|
|
// {
|
|
|
|
// "current-hash": "abcdef1234",
|
|
|
|
// "chunks": [
|
|
|
|
// "tfstate/my_project/tfstate.abcdef1234/0",
|
|
|
|
// "tfstate/my_project/tfstate.abcdef1234/1",
|
|
|
|
// "tfstate/my_project/tfstate.abcdef1234/2",
|
|
|
|
// ]
|
|
|
|
// }
|
|
|
|
// - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk
|
|
|
|
// - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one
|
|
|
|
// - ...
|
|
|
|
// - chunked mode with gzip: the same system but we gziped the JSON payload
|
|
|
|
// before splitting it in chunks
|
|
|
|
//
|
|
|
|
// When overwritting the current state, we need to clean the old chunks if
|
|
|
|
// we were in chunked mode (no matter whether we need to use chunks for the
|
|
|
|
// new one). To do so based on the 4 possibilities above we look at the
|
|
|
|
// value at "tfstate/my_project" and if it is:
|
|
|
|
// - absent then it's a new state and there will be nothing to cleanup,
|
|
|
|
// - not a JSON payload we were in single entry mode with gzip so there will
|
|
|
|
// be nothing to cleanup
|
|
|
|
// - a JSON payload, then we were either single entry mode with plain JSON
|
|
|
|
// or in chunked mode. To differentiate between the two we look whether a
|
|
|
|
// "current-hash" key is present in the payload. If we find one we were
|
|
|
|
// in chunked mode and we will need to remove the old chunks (whether or
|
|
|
|
// not we were using gzip does not matter in that case).
|
|
|
|
|
2017-05-28 21:16:51 +02:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2020-08-14 17:38:18 +02:00
|
|
|
kv := c.Client.KV()
|
|
|
|
|
|
|
|
// First we determine what mode we were using and to prepare the cleanup
|
|
|
|
chunked, hash, _, _, err := c.chunkedMode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
cleanupOldChunks := func() {}
|
|
|
|
if chunked {
|
|
|
|
cleanupOldChunks = func() {
|
|
|
|
// We ignore all errors that can happen here because we already
|
|
|
|
// saved the new state and there is no way to return a warning to
|
|
|
|
// the user. We may end up with dangling chunks but there is no way
|
|
|
|
// to be sure we won't.
|
|
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
|
|
|
|
kv.DeleteTree(path, nil)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-03-09 08:00:19 +01:00
|
|
|
payload := data
|
2017-03-13 08:17:33 +01:00
|
|
|
if c.GZip {
|
2017-03-09 08:00:19 +01:00
|
|
|
if compressedState, err := compressState(data); err == nil {
|
|
|
|
payload = compressedState
|
|
|
|
} else {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
// default to doing a CAS
|
2017-05-28 21:16:51 +02:00
|
|
|
verb := consulapi.KVCAS
|
|
|
|
|
|
|
|
// Assume a 0 index doesn't need a CAS for now, since we are either
|
|
|
|
// creating a new state or purposely overwriting one.
|
|
|
|
if c.modifyIndex == 0 {
|
|
|
|
verb = consulapi.KVSet
|
|
|
|
}
|
|
|
|
|
2020-08-14 17:38:18 +02:00
|
|
|
// If the payload is too large we first write the chunks and replace it
|
|
|
|
// 524288 is the default value, we just hope the user did not set a smaller
|
|
|
|
// one but there is really no reason for them to do so, if they changed it
|
|
|
|
// it is certainly to set a larger value.
|
|
|
|
limit := 524288
|
|
|
|
if len(payload) > limit {
|
|
|
|
md5 := md5.Sum(data)
|
|
|
|
chunks := split(payload, limit)
|
|
|
|
chunkPaths := make([]string, 0)
|
|
|
|
|
|
|
|
// First we write the new chunks
|
|
|
|
for i, p := range chunks {
|
|
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
|
|
|
|
chunkPaths = append(chunkPaths, path)
|
|
|
|
_, err := kv.Put(&consulapi.KVPair{
|
|
|
|
Key: path,
|
|
|
|
Value: p,
|
|
|
|
}, nil)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// We update the link to point to the new chunks
|
|
|
|
payload, err = json.Marshal(map[string]interface{}{
|
|
|
|
"current-hash": fmt.Sprintf("%x", md5),
|
|
|
|
"chunks": chunkPaths,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var txOps consulapi.KVTxnOps
|
2017-05-28 21:16:51 +02:00
|
|
|
// KV.Put doesn't return the new index, so we use a single operation
|
|
|
|
// transaction to get the new index with a single request.
|
2020-08-14 17:38:18 +02:00
|
|
|
txOps = consulapi.KVTxnOps{
|
2017-05-28 21:16:51 +02:00
|
|
|
&consulapi.KVTxnOp{
|
|
|
|
Verb: verb,
|
|
|
|
Key: c.Path,
|
|
|
|
Value: payload,
|
|
|
|
Index: c.modifyIndex,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
ok, resp, _, err := kv.Txn(txOps, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// transaction was rolled back
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(resp.Results) != 1 {
|
|
|
|
// this probably shouldn't happen
|
|
|
|
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
|
|
|
}
|
|
|
|
|
|
|
|
c.modifyIndex = resp.Results[0].ModifyIndex
|
2020-08-14 17:38:18 +02:00
|
|
|
|
|
|
|
// We remove all the old chunks
|
|
|
|
cleanupOldChunks()
|
|
|
|
|
2017-05-28 21:16:51 +02:00
|
|
|
return nil
|
2017-01-19 05:49:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *RemoteClient) Delete() error {
|
2017-05-28 21:16:51 +02:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2017-01-19 05:49:01 +01:00
|
|
|
kv := c.Client.KV()
|
2020-08-14 17:38:18 +02:00
|
|
|
|
|
|
|
chunked, hash, _, _, err := c.chunkedMode()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = kv.Delete(c.Path, nil)
|
|
|
|
|
|
|
|
// If there were chunks we need to remove them
|
|
|
|
if chunked {
|
|
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
|
|
|
|
kv.DeleteTree(path, nil)
|
|
|
|
}
|
|
|
|
|
2017-01-19 05:49:01 +01:00
|
|
|
return err
|
|
|
|
}
|
2017-02-07 16:05:53 +01:00
|
|
|
|
2020-08-13 16:29:43 +02:00
|
|
|
func (c *RemoteClient) lockPath() string {
|
|
|
|
// we sanitize the path for the lock as Consul does not like having
|
|
|
|
// two consecutive slashes for the lock path
|
|
|
|
return strings.TrimRight(c.Path, "/")
|
|
|
|
}
|
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
func (c *RemoteClient) putLockInfo(info *statemgr.LockInfo) error {
|
2017-02-15 00:19:47 +01:00
|
|
|
info.Path = c.Path
|
|
|
|
info.Created = time.Now().UTC()
|
2017-02-07 16:05:53 +01:00
|
|
|
|
|
|
|
kv := c.Client.KV()
|
2017-02-15 20:01:18 +01:00
|
|
|
_, err := kv.Put(&consulapi.KVPair{
|
2020-08-13 16:29:43 +02:00
|
|
|
Key: c.lockPath() + lockInfoSuffix,
|
2017-02-15 20:01:18 +01:00
|
|
|
Value: info.Marshal(),
|
2017-02-07 16:05:53 +01:00
|
|
|
}, nil)
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
func (c *RemoteClient) getLockInfo() (*statemgr.LockInfo, error) {
|
2020-08-13 16:29:43 +02:00
|
|
|
path := c.lockPath() + lockInfoSuffix
|
2017-02-07 16:05:53 +01:00
|
|
|
pair, _, err := c.Client.KV().Get(path, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if pair == nil {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
li := &statemgr.LockInfo{}
|
2017-02-07 16:05:53 +01:00
|
|
|
err = json.Unmarshal(pair.Value, li)
|
|
|
|
if err != nil {
|
2017-02-15 20:01:18 +01:00
|
|
|
return nil, fmt.Errorf("error unmarshaling lock info: %s", err)
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
return li, nil
|
|
|
|
}
|
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
|
2017-05-28 21:16:51 +02:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
if !c.lockState {
|
|
|
|
return "", nil
|
|
|
|
}
|
|
|
|
|
|
|
|
c.info = info
|
|
|
|
|
|
|
|
// These checks only are to ensure we strictly follow the specification.
|
|
|
|
// Terraform shouldn't ever re-lock, so provide errors for the 2 possible
|
|
|
|
// states if this is called.
|
2017-02-07 16:05:53 +01:00
|
|
|
select {
|
|
|
|
case <-c.lockCh:
|
|
|
|
// We had a lock, but lost it.
|
2017-05-28 22:07:24 +02:00
|
|
|
return "", errors.New("lost consul lock, cannot re-lock")
|
2017-02-07 16:05:53 +01:00
|
|
|
default:
|
|
|
|
if c.lockCh != nil {
|
|
|
|
// we have an active lock already
|
2017-04-06 20:04:50 +02:00
|
|
|
return "", fmt.Errorf("state %q already locked", c.Path)
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
return c.lock()
|
2017-05-28 21:16:51 +02:00
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// the lock implementation.
|
|
|
|
// Only to be called while holding Client.mu
|
2017-05-28 22:07:24 +02:00
|
|
|
func (c *RemoteClient) lock() (string, error) {
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// We create a new session here, so it can be canceled when the lock is
|
|
|
|
// lost or unlocked.
|
|
|
|
lockSession, err := c.createSession()
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
2017-02-07 16:05:53 +01:00
|
|
|
|
2017-10-08 17:24:43 +02:00
|
|
|
// store the session ID for correlation with consul logs
|
|
|
|
c.info.Info = "consul session: " + lockSession
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
opts := &consulapi.LockOptions{
|
2020-08-13 16:29:43 +02:00
|
|
|
Key: c.lockPath() + lockSuffix,
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
Session: lockSession,
|
|
|
|
|
|
|
|
// only wait briefly, so terraform has the choice to fail fast or
|
|
|
|
// retry as needed.
|
|
|
|
LockWaitTime: time.Second,
|
|
|
|
LockTryOnce: true,
|
|
|
|
|
|
|
|
// Don't let the lock monitor give up right away, as it's possible the
|
|
|
|
// session is still OK. While the session is refreshed at a rate of
|
|
|
|
// TTL/2, the lock monitor is an idle blocking request and is more
|
|
|
|
// susceptible to being closed by a lower network layer.
|
|
|
|
MonitorRetries: 5,
|
|
|
|
//
|
|
|
|
// The delay between lock monitor retries.
|
|
|
|
// While the session has a 15s TTL plus a 5s wait period on a lost
|
|
|
|
// lock, if we can't get our lock back in 10+ seconds something is
|
|
|
|
// wrong so we're going to drop the session and start over.
|
|
|
|
MonitorRetryTime: 2 * time.Second,
|
|
|
|
}
|
2017-02-07 16:05:53 +01:00
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
c.consulLock, err = c.Client.LockOpts(opts)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
2020-08-11 17:43:01 +02:00
|
|
|
lockErr := &statemgr.LockError{}
|
2017-02-15 20:01:18 +01:00
|
|
|
|
2017-02-07 16:05:53 +01:00
|
|
|
lockCh, err := c.consulLock.Lock(make(chan struct{}))
|
|
|
|
if err != nil {
|
2017-02-15 20:01:18 +01:00
|
|
|
lockErr.Err = err
|
|
|
|
return "", lockErr
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if lockCh == nil {
|
|
|
|
lockInfo, e := c.getLockInfo()
|
|
|
|
if e != nil {
|
2017-02-15 20:01:18 +01:00
|
|
|
lockErr.Err = e
|
|
|
|
return "", lockErr
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
2017-02-15 20:01:18 +01:00
|
|
|
|
|
|
|
lockErr.Info = lockInfo
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
|
2017-02-15 20:01:18 +01:00
|
|
|
return "", lockErr
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
c.lockCh = lockCh
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
err = c.putLockInfo(c.info)
|
2017-02-07 16:05:53 +01:00
|
|
|
if err != nil {
|
2017-05-28 22:07:24 +02:00
|
|
|
if unlockErr := c.unlock(c.info.ID); unlockErr != nil {
|
2017-02-15 20:01:18 +01:00
|
|
|
err = multierror.Append(err, unlockErr)
|
|
|
|
}
|
|
|
|
|
2017-02-15 00:19:47 +01:00
|
|
|
return "", err
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
// Start a goroutine to monitor the lock state.
|
|
|
|
// If we lose the lock to due communication issues with the consul agent,
|
|
|
|
// attempt to immediately reacquire the lock. Put will verify the integrity
|
|
|
|
// of the state by using a CAS operation.
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
c.monitorCancel = cancel
|
|
|
|
c.monitorWG.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer c.monitorWG.Done()
|
2017-05-28 22:07:24 +02:00
|
|
|
select {
|
|
|
|
case <-c.lockCh:
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
log.Println("[ERROR] lost consul lock")
|
2017-05-28 22:07:24 +02:00
|
|
|
for {
|
|
|
|
c.mu.Lock()
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// We lost our lock, so we need to cancel the session too.
|
|
|
|
// The CancelFunc is only replaced while holding Client.mu, so
|
|
|
|
// this is safe to call here. This will be replaced by the
|
|
|
|
// lock() call below.
|
|
|
|
c.sessionCancel()
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
c.consulLock = nil
|
|
|
|
_, err := c.lock()
|
|
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
// We failed to get the lock, keep trying as long as
|
|
|
|
// terraform is running. There may be changes in progress,
|
|
|
|
// so there's no use in aborting. Either we eventually
|
|
|
|
// reacquire the lock, or a Put will fail on a CAS.
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
log.Printf("[ERROR] could not reacquire lock: %s", err)
|
|
|
|
time.Sleep(lockReacquireInterval)
|
2017-05-28 22:07:24 +02:00
|
|
|
|
|
|
|
select {
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
case <-ctx.Done():
|
2017-05-28 22:07:24 +02:00
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// if the error was nil, the new lock started a new copy of
|
|
|
|
// this goroutine.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
case <-ctx.Done():
|
2017-05-28 22:07:24 +02:00
|
|
|
return
|
|
|
|
}
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
}()
|
2017-05-28 22:07:24 +02:00
|
|
|
|
|
|
|
if testLockHook != nil {
|
|
|
|
testLockHook()
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.info.ID, nil
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// called after a lock is acquired
|
|
|
|
var testLockHook func()
|
|
|
|
|
|
|
|
func (c *RemoteClient) createSession() (string, error) {
|
|
|
|
// create the context first. Even if the session creation fails, we assume
|
|
|
|
// that the CancelFunc is always callable.
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
c.sessionCancel = cancel
|
|
|
|
|
|
|
|
session := c.Client.Session()
|
|
|
|
se := &consulapi.SessionEntry{
|
|
|
|
Name: consulapi.DefaultLockSessionName,
|
|
|
|
TTL: lockSessionTTL,
|
|
|
|
LockDelay: lockDelay,
|
|
|
|
}
|
|
|
|
|
|
|
|
id, _, err := session.Create(se, nil)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Println("[INFO] created consul lock session", id)
|
|
|
|
|
|
|
|
// keep the session renewed
|
2017-07-17 17:28:45 +02:00
|
|
|
go session.RenewPeriodic(lockSessionTTL, id, nil, ctx.Done())
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
|
|
|
|
return id, nil
|
|
|
|
}
|
|
|
|
|
2017-02-15 00:19:47 +01:00
|
|
|
func (c *RemoteClient) Unlock(id string) error {
|
2017-05-28 21:16:51 +02:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
if !c.lockState {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.unlock(id)
|
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// the unlock implementation.
|
|
|
|
// Only to be called while holding Client.mu
|
2017-05-28 22:07:24 +02:00
|
|
|
func (c *RemoteClient) unlock(id string) error {
|
2017-02-15 18:02:37 +01:00
|
|
|
// this doesn't use the lock id, because the lock is tied to the consul client.
|
2017-02-07 16:05:53 +01:00
|
|
|
if c.consulLock == nil || c.lockCh == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// cancel our monitoring goroutine
|
|
|
|
c.monitorCancel()
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
c.consulLock = nil
|
|
|
|
|
|
|
|
// The consul session is only used for this single lock, so cancel it
|
|
|
|
// after we unlock.
|
|
|
|
// The session is only created and replaced holding Client.mu, so the
|
|
|
|
// CancelFunc must be non-nil.
|
|
|
|
c.sessionCancel()
|
|
|
|
}()
|
|
|
|
|
2017-02-07 16:05:53 +01:00
|
|
|
select {
|
|
|
|
case <-c.lockCh:
|
2017-10-08 18:57:11 +02:00
|
|
|
return lostLockErr
|
2017-02-07 16:05:53 +01:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
kv := c.Client.KV()
|
|
|
|
|
|
|
|
var errs error
|
|
|
|
|
2020-08-13 16:29:43 +02:00
|
|
|
if _, err := kv.Delete(c.lockPath()+lockInfoSuffix, nil); err != nil {
|
2017-05-28 22:07:24 +02:00
|
|
|
errs = multierror.Append(errs, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := c.consulLock.Unlock(); err != nil {
|
|
|
|
errs = multierror.Append(errs, err)
|
|
|
|
}
|
|
|
|
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
// the monitoring goroutine may be in a select on the lockCh, so we need to
|
2017-05-28 22:07:24 +02:00
|
|
|
// wait for it to return before changing the value.
|
have the consul client manage the lock session
When a consul lock is lost, there is a possibility that the associated
session is still active. Most commonly, the long request to watch the
lock key may error out, while the session is continually refreshed at a
rate of TTL/2.
First have the lock monitor retry the lock internally for at least 10
seconds (5 attempts with the default 2 second wait time). In most cases
this will reconnect on the first try, keeping the lock channel open.
If the consul lock can't recover itself, then cancel the session as soon
as possible (terminating the PreiodicRenew will call Session.Destroy),
and start over. In the worse case, the consul agents were split, and the
session still exists on the leader so we may need to wait for the old
session TTL, plus the LockWait time to renew the lock.
We use a Context for the cancellation channels here, because that
removes the need to worry about double-closes and nil channels. It
requires an awkward adapter goroutine for now to convert the Done()
`<-chan` to a `chan` for PeriodicRenew, but makes the rest of the code
safer in the long run.
2017-07-12 17:35:18 +02:00
|
|
|
c.monitorWG.Wait()
|
2017-02-07 16:05:53 +01:00
|
|
|
c.lockCh = nil
|
|
|
|
|
2017-04-06 20:04:50 +02:00
|
|
|
// This is only cleanup, and will fail if the lock was immediately taken by
|
|
|
|
// another client, so we don't report an error to the user here.
|
|
|
|
c.consulLock.Destroy()
|
|
|
|
|
2017-05-28 22:07:24 +02:00
|
|
|
return errs
|
2017-02-07 16:05:53 +01:00
|
|
|
}
|
2017-03-09 08:00:19 +01:00
|
|
|
|
|
|
|
func compressState(data []byte) ([]byte, error) {
|
|
|
|
b := new(bytes.Buffer)
|
|
|
|
gz := gzip.NewWriter(b)
|
|
|
|
if _, err := gz.Write(data); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if err := gz.Flush(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if err := gz.Close(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return b.Bytes(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func uncompressState(data []byte) ([]byte, error) {
|
|
|
|
b := new(bytes.Buffer)
|
|
|
|
gz, err := gzip.NewReader(bytes.NewReader(data))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
b.ReadFrom(gz)
|
|
|
|
if err := gz.Close(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return b.Bytes(), nil
|
|
|
|
}
|
2020-08-14 17:38:18 +02:00
|
|
|
|
|
|
|
func split(payload []byte, limit int) [][]byte {
|
|
|
|
var chunk []byte
|
|
|
|
chunks := make([][]byte, 0, len(payload)/limit+1)
|
|
|
|
for len(payload) >= limit {
|
|
|
|
chunk, payload = payload[:limit], payload[limit:]
|
|
|
|
chunks = append(chunks, chunk)
|
|
|
|
}
|
|
|
|
if len(payload) > 0 {
|
|
|
|
chunks = append(chunks, payload[:])
|
|
|
|
}
|
|
|
|
return chunks
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *RemoteClient) chunkedMode() (bool, string, []string, *consulapi.KVPair, error) {
|
|
|
|
kv := c.Client.KV()
|
|
|
|
pair, _, err := kv.Get(c.Path, nil)
|
|
|
|
if err != nil {
|
|
|
|
return false, "", nil, pair, err
|
|
|
|
}
|
|
|
|
if pair != nil {
|
|
|
|
var d map[string]interface{}
|
|
|
|
err = json.Unmarshal(pair.Value, &d)
|
|
|
|
// If there is an error when unmarshaling the payload, the state has
|
|
|
|
// probably been gziped in single entry mode.
|
|
|
|
if err == nil {
|
|
|
|
// If we find the "current-hash" key we were in chunked mode
|
|
|
|
hash, ok := d["current-hash"]
|
|
|
|
if ok {
|
|
|
|
chunks := make([]string, 0)
|
|
|
|
for _, c := range d["chunks"].([]interface{}) {
|
|
|
|
chunks = append(chunks, c.(string))
|
|
|
|
}
|
|
|
|
return true, hash.(string), chunks, pair, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false, "", nil, pair, nil
|
|
|
|
}
|