Merge pull request #16289 from hashicorp/jbardin/consul-lock
some consul lock improvements
This commit is contained in:
commit
344fa54638
|
@ -2,7 +2,9 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/terraform/backend"
|
"github.com/hashicorp/terraform/backend"
|
||||||
|
@ -100,6 +102,7 @@ type Backend struct {
|
||||||
*schema.Backend
|
*schema.Backend
|
||||||
|
|
||||||
// The fields below are set from configure
|
// The fields below are set from configure
|
||||||
|
client *consulapi.Client
|
||||||
configData *schema.ResourceData
|
configData *schema.ResourceData
|
||||||
lock bool
|
lock bool
|
||||||
}
|
}
|
||||||
|
@ -111,16 +114,14 @@ func (b *Backend) configure(ctx context.Context) error {
|
||||||
// Store the lock information
|
// Store the lock information
|
||||||
b.lock = b.configData.Get("lock").(bool)
|
b.lock = b.configData.Get("lock").(bool)
|
||||||
|
|
||||||
// Initialize a client to test config
|
|
||||||
_, err := b.clientRaw()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Backend) clientRaw() (*consulapi.Client, error) {
|
|
||||||
data := b.configData
|
data := b.configData
|
||||||
|
|
||||||
// Configure the client
|
// Configure the client
|
||||||
config := consulapi.DefaultConfig()
|
config := consulapi.DefaultConfig()
|
||||||
|
|
||||||
|
// replace the default Transport Dialer to reduce the KeepAlive
|
||||||
|
config.Transport.DialContext = dialContext
|
||||||
|
|
||||||
if v, ok := data.GetOk("access_token"); ok && v.(string) != "" {
|
if v, ok := data.GetOk("access_token"); ok && v.(string) != "" {
|
||||||
config.Token = v.(string)
|
config.Token = v.(string)
|
||||||
}
|
}
|
||||||
|
@ -162,5 +163,18 @@ func (b *Backend) clientRaw() (*consulapi.Client, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return consulapi.NewClient(config)
|
client, err := consulapi.NewClient(config)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b.client = client
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dialContext is the DialContext function for the consul client transport.
|
||||||
|
// This is stored in a package var to inject a different dialer for tests.
|
||||||
|
var dialContext = (&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 17 * time.Second,
|
||||||
|
}).DialContext
|
||||||
|
|
|
@ -15,15 +15,9 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *Backend) States() ([]string, error) {
|
func (b *Backend) States() ([]string, error) {
|
||||||
// Get the Consul client
|
|
||||||
client, err := b.clientRaw()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// List our raw path
|
// List our raw path
|
||||||
prefix := b.configData.Get("path").(string) + keyEnvPrefix
|
prefix := b.configData.Get("path").(string) + keyEnvPrefix
|
||||||
keys, _, err := client.KV().Keys(prefix, "/", nil)
|
keys, _, err := b.client.KV().Keys(prefix, "/", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -60,28 +54,16 @@ func (b *Backend) DeleteState(name string) error {
|
||||||
return fmt.Errorf("can't delete default state")
|
return fmt.Errorf("can't delete default state")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the Consul API client
|
|
||||||
client, err := b.clientRaw()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the path of the data
|
// Determine the path of the data
|
||||||
path := b.path(name)
|
path := b.path(name)
|
||||||
|
|
||||||
// Delete it. We just delete it without any locking since
|
// Delete it. We just delete it without any locking since
|
||||||
// the DeleteState API is documented as such.
|
// the DeleteState API is documented as such.
|
||||||
_, err = client.KV().Delete(path, nil)
|
_, err := b.client.KV().Delete(path, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backend) State(name string) (state.State, error) {
|
func (b *Backend) State(name string) (state.State, error) {
|
||||||
// Get the Consul API client
|
|
||||||
client, err := b.clientRaw()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine the path of the data
|
// Determine the path of the data
|
||||||
path := b.path(name)
|
path := b.path(name)
|
||||||
|
|
||||||
|
@ -91,7 +73,7 @@ func (b *Backend) State(name string) (state.State, error) {
|
||||||
// Build the state client
|
// Build the state client
|
||||||
var stateMgr state.State = &remote.State{
|
var stateMgr state.State = &remote.State{
|
||||||
Client: &RemoteClient{
|
Client: &RemoteClient{
|
||||||
Client: client,
|
Client: b.client,
|
||||||
Path: path,
|
Path: path,
|
||||||
GZip: gzip,
|
GZip: gzip,
|
||||||
lockState: b.lock,
|
lockState: b.lock,
|
||||||
|
@ -103,6 +85,11 @@ func (b *Backend) State(name string) (state.State, error) {
|
||||||
stateMgr = &state.LockDisabled{Inner: stateMgr}
|
stateMgr = &state.LockDisabled{Inner: stateMgr}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the default state always exists
|
||||||
|
if name == backend.DefaultStateName {
|
||||||
|
return stateMgr, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Grab a lock, we use this to write an empty state if one doesn't
|
// Grab a lock, we use this to write an empty state if one doesn't
|
||||||
// exist already. We have to write an empty state as a sentinel value
|
// exist already. We have to write an empty state as a sentinel value
|
||||||
// so States() knows it exists.
|
// so States() knows it exists.
|
||||||
|
|
|
@ -32,6 +32,8 @@ const (
|
||||||
lockReacquireInterval = 2 * time.Second
|
lockReacquireInterval = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var lostLockErr = errors.New("consul lock was lost")
|
||||||
|
|
||||||
// RemoteClient is a remote client that stores data in Consul.
|
// RemoteClient is a remote client that stores data in Consul.
|
||||||
type RemoteClient struct {
|
type RemoteClient struct {
|
||||||
Client *consulapi.Client
|
Client *consulapi.Client
|
||||||
|
@ -228,6 +230,9 @@ func (c *RemoteClient) lock() (string, error) {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// store the session ID for correlation with consul logs
|
||||||
|
c.info.Info = "consul session: " + lockSession
|
||||||
|
|
||||||
opts := &consulapi.LockOptions{
|
opts := &consulapi.LockOptions{
|
||||||
Key: c.Path + lockSuffix,
|
Key: c.Path + lockSuffix,
|
||||||
Session: lockSession,
|
Session: lockSession,
|
||||||
|
@ -406,7 +411,7 @@ func (c *RemoteClient) unlock(id string) error {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-c.lockCh:
|
case <-c.lockCh:
|
||||||
return errors.New("consul lock was lost")
|
return lostLockErr
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -173,6 +176,7 @@ func TestConsul_lostLock(t *testing.T) {
|
||||||
reLocked := make(chan struct{})
|
reLocked := make(chan struct{})
|
||||||
testLockHook = func() {
|
testLockHook = func() {
|
||||||
close(reLocked)
|
close(reLocked)
|
||||||
|
testLockHook = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we use the second client to break the lock
|
// now we use the second client to break the lock
|
||||||
|
@ -188,3 +192,100 @@ func TestConsul_lostLock(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsul_lostLockConnection(t *testing.T) {
|
||||||
|
srv := newConsulTestServer(t)
|
||||||
|
defer srv.Stop()
|
||||||
|
|
||||||
|
// create an "unreliable" network by closing all the consul client's
|
||||||
|
// network connections
|
||||||
|
conns := &unreliableConns{}
|
||||||
|
origDialFn := dialContext
|
||||||
|
defer func() {
|
||||||
|
dialContext = origDialFn
|
||||||
|
}()
|
||||||
|
dialContext = conns.DialContext
|
||||||
|
|
||||||
|
path := fmt.Sprintf("tf-unit/%s", time.Now().String())
|
||||||
|
|
||||||
|
b := backend.TestBackendConfig(t, New(), map[string]interface{}{
|
||||||
|
"address": srv.HTTPAddr,
|
||||||
|
"path": path,
|
||||||
|
})
|
||||||
|
|
||||||
|
s, err := b.State(backend.DefaultStateName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
info := state.NewLockInfo()
|
||||||
|
info.Operation = "test-lost-lock-connection"
|
||||||
|
id, err := s.Lock(info)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// set a callback to know when the monitor loop re-connects
|
||||||
|
dialed := make(chan struct{})
|
||||||
|
conns.dialCallback = func() {
|
||||||
|
close(dialed)
|
||||||
|
conns.dialCallback = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// kill any open connections
|
||||||
|
// once the consul client is fixed, we should loop over this a few time to
|
||||||
|
// be sure, since we can't hook into the client's internal lock monitor
|
||||||
|
// loop.
|
||||||
|
conns.Kill()
|
||||||
|
// wait for a new connection to be dialed, and kill it again
|
||||||
|
<-dialed
|
||||||
|
conns.Kill()
|
||||||
|
|
||||||
|
// since the lock monitor loop is hidden in the consul api client, we can
|
||||||
|
// only wait a bit to make sure we were notified of the failure
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// once the consul client can reconnect properly, there will no longer be
|
||||||
|
// an error here
|
||||||
|
//if err := s.Unlock(id); err != nil {
|
||||||
|
if err := s.Unlock(id); err != lostLockErr {
|
||||||
|
t.Fatalf("expected lost lock error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type unreliableConns struct {
|
||||||
|
sync.Mutex
|
||||||
|
conns []net.Conn
|
||||||
|
dialCallback func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *unreliableConns) DialContext(ctx context.Context, netw, addr string) (net.Conn, error) {
|
||||||
|
u.Lock()
|
||||||
|
defer u.Unlock()
|
||||||
|
|
||||||
|
dialer := &net.Dialer{}
|
||||||
|
conn, err := dialer.DialContext(ctx, netw, addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.conns = append(u.conns, conn)
|
||||||
|
|
||||||
|
if u.dialCallback != nil {
|
||||||
|
u.dialCallback()
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill these with a deadline, just to make sure we don't end up with any EOFs
|
||||||
|
// that get ignored.
|
||||||
|
func (u *unreliableConns) Kill() {
|
||||||
|
u.Lock()
|
||||||
|
defer u.Unlock()
|
||||||
|
|
||||||
|
for _, conn := range u.conns {
|
||||||
|
conn.(*net.TCPConn).SetDeadline(time.Now())
|
||||||
|
}
|
||||||
|
u.conns = nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue