Switch pg backend to session-level advisory locking, to avoid rollback of partial state updates

This commit is contained in:
Mars Hall 2019-03-04 17:01:08 -08:00
parent 3c34c047f7
commit 31c9776d55
3 changed files with 33 additions and 104 deletions

View File

@ -1,11 +1,9 @@
package pg package pg
import ( import (
"context"
"crypto/md5" "crypto/md5"
"database/sql" "database/sql"
"fmt" "fmt"
"sync"
uuid "github.com/hashicorp/go-uuid" uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/terraform/state" "github.com/hashicorp/terraform/state"
@ -19,24 +17,12 @@ type RemoteClient struct {
Name string Name string
SchemaName string SchemaName string
// In-flight database transaction. Empty unless Locked.
txn *sql.Tx
txnMux sync.Mutex
info *state.LockInfo info *state.LockInfo
} }
func (c *RemoteClient) Get() (*remote.Payload, error) { func (c *RemoteClient) Get() (*remote.Payload, error) {
query := `SELECT data FROM %s.%s WHERE name = $1` query := `SELECT data FROM %s.%s WHERE name = $1`
var row *sql.Row row := c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
row = c.txn.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
row = c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
var data []byte var data []byte
err := row.Scan(&data) err := row.Scan(&data)
switch { switch {
@ -58,16 +44,7 @@ func (c *RemoteClient) Put(data []byte) error {
query := `INSERT INTO %s.%s (name, data) VALUES ($1, $2) query := `INSERT INTO %s.%s (name, data) VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE ON CONFLICT (name) DO UPDATE
SET data = $2 WHERE %s.name = $1` SET data = $2 WHERE %s.name = $1`
var err error _, err := c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
}
if err != nil { if err != nil {
return err return err
} }
@ -76,16 +53,7 @@ func (c *RemoteClient) Put(data []byte) error {
func (c *RemoteClient) Delete() error { func (c *RemoteClient) Delete() error {
query := `DELETE FROM %s.%s WHERE name = $1` query := `DELETE FROM %s.%s WHERE name = $1`
var err error _, err := c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
if err != nil { if err != nil {
return err return err
} }
@ -95,70 +63,42 @@ func (c *RemoteClient) Delete() error {
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
var err error var err error
var lockID string var lockID string
var txn *sql.Tx
if info.ID == "" { if info.ID == "" {
lockID, err = uuid.GenerateUUID() lockID, err = uuid.GenerateUUID()
if err != nil { if err != nil {
return "", err return "", err
} }
info.Operation = "client"
info.ID = lockID info.ID = lockID
} }
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
if c.txn == nil {
// Most strict transaction isolation to prevent cross-talk
// between incomplete state transactions.
txn, err = c.Client.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return "", &state.LockError{Info: info, Err: err}
}
c.txn = txn
} else {
return "", &state.LockError{Info: info, Err: fmt.Errorf("Client is already in a locking transaction")}
}
// Do not wait before giving up on a contended lock.
_, err = c.Client.Exec(`SET LOCAL lock_timeout = 0`)
if err != nil {
c.rollback(info)
return "", err
}
// Try to acquire lock for the existing row. // Try to acquire lock for the existing row.
query := `SELECT pg_try_advisory_xact_lock(%s.id) FROM %s.%s WHERE %s.name = $1` query := `SELECT %s.id, pg_try_advisory_lock(%s.id) FROM %s.%s WHERE %s.name = $1`
row := c.txn.QueryRow(fmt.Sprintf(query, statesTableName, c.SchemaName, statesTableName, statesTableName), c.Name) row := c.Client.QueryRow(fmt.Sprintf(query, statesTableName, statesTableName, c.SchemaName, statesTableName, statesTableName), c.Name)
var didLock []byte var pgLockId, didLock []byte
err = row.Scan(&didLock) err = row.Scan(&pgLockId, &didLock)
switch { switch {
case err == sql.ErrNoRows: case err == sql.ErrNoRows:
// When the row does not yet exist in state, take // When the row does not yet exist in state, take
// the `-1` lock to create the new row. // the `-1` lock to create the new row.
innerRow := c.txn.QueryRow(`SELECT pg_try_advisory_xact_lock(-1)`) innerRow := c.Client.QueryRow(`SELECT pg_try_advisory_lock(-1)`)
var innerDidLock []byte var innerDidLock []byte
err := innerRow.Scan(&innerDidLock) err := innerRow.Scan(&innerDidLock)
if err != nil { if err != nil {
c.rollback(info)
return "", &state.LockError{Info: info, Err: err} return "", &state.LockError{Info: info, Err: err}
} }
if string(innerDidLock) == "false" { if string(innerDidLock) == "false" {
c.rollback(info) return "", &state.LockError{Info: info, Err: fmt.Errorf("Already locked for workspace creation: %s", c.Name)}
return "", &state.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)}
} }
info.Path = "-1"
case err != nil: case err != nil:
c.rollback(info)
return "", &state.LockError{Info: info, Err: err} return "", &state.LockError{Info: info, Err: err}
case string(didLock) == "false": case string(didLock) == "false":
c.rollback(info)
return "", &state.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)} return "", &state.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)}
default: default:
info.Path = string(pgLockId)
} }
c.info = info
return info.ID, nil return info.ID, nil
} }
@ -168,35 +108,18 @@ func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
} }
func (c *RemoteClient) Unlock(id string) error { func (c *RemoteClient) Unlock(id string) error {
// Take exclusive access to the database transaction if c.info != nil && c.info.Path != "" {
c.txnMux.Lock() query := `SELECT pg_advisory_unlock(%s)`
defer c.txnMux.Unlock() row := c.Client.QueryRow(fmt.Sprintf(query, c.info.Path))
if c.txn != nil { var didUnlock []byte
err := c.txn.Commit() err := row.Scan(&didUnlock)
if err != nil { if err != nil {
return err return &state.LockError{Info: c.info, Err: err}
} }
c.txn = nil if string(didUnlock) == "false" {
} return &state.LockError{Info: c.info, Err: fmt.Errorf("Workspace is already unlocked: %s", c.Name)}
c.info = nil
return nil
}
// This must be called from any code path where the
// transaction would not be committed (unlocked),
// otherwise the transactions will leak and prevent
// the process from exiting cleanly.
//
// Does not use mutex because this will implicitly be
// called from within an already mutex'd scope.
func (c *RemoteClient) rollback(info *state.LockInfo) error {
if c.txn != nil {
err := c.txn.Rollback()
if err != nil {
return err
}
c.txn = nil
} }
c.info = nil c.info = nil
}
return nil return nil
} }

View File

@ -59,12 +59,18 @@ func TestRemoteLocks(t *testing.T) {
"conn_str": connStr, "conn_str": connStr,
"schema_name": schemaName, "schema_name": schemaName,
}) })
b := backend.TestBackendConfig(t, New(), config).(*Backend)
s, err := b.StateMgr(backend.DefaultStateName) b1 := backend.TestBackendConfig(t, New(), config).(*Backend)
s1, err := b1.StateMgr(backend.DefaultStateName)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
remote.TestRemoteLocks(t, s.(*remote.State).Client, s.(*remote.State).Client) b2 := backend.TestBackendConfig(t, New(), config).(*Backend)
s2, err := b2.StateMgr(backend.DefaultStateName)
if err != nil {
t.Fatal(err)
}
remote.TestRemoteLocks(t, s1.(*remote.State).Client, s2.(*remote.State).Client)
} }

View File

@ -80,7 +80,7 @@ This backend creates one table **states** in the automatically-managed Postgres
The table is keyed by the [workspace](/docs/state/workspaces.html) name. If workspaces are not in use, the name `default` is used. The table is keyed by the [workspace](/docs/state/workspaces.html) name. If workspaces are not in use, the name `default` is used.
Locking is supported using [Postgres advisory locks](https://www.postgresql.org/docs/9.5/explicit-locking.html#ADVISORY-LOCKS). [`force-unlock`](https://www.terraform.io/docs/commands/force-unlock.html) is not supported, because these database-native locks will automatically unlock when the transaction is aborted or the connection fails. To see outstanding locks in a Postgres server, use the [`pg_locks` system view](https://www.postgresql.org/docs/9.5/view-pg-locks.html). Locking is supported using [Postgres advisory locks](https://www.postgresql.org/docs/9.5/explicit-locking.html#ADVISORY-LOCKS). [`force-unlock`](https://www.terraform.io/docs/commands/force-unlock.html) is not supported, because these database-native locks will automatically unlock when the session is aborted or the connection fails. To see outstanding locks in a Postgres server, use the [`pg_locks` system view](https://www.postgresql.org/docs/9.5/view-pg-locks.html).
The **states** table contains: The **states** table contains: