terraform/backend/remote-state/pg/client.go

203 lines
5.1 KiB
Go

package pg
import (
"context"
"crypto/md5"
"database/sql"
"fmt"
"sync"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/terraform/state"
"github.com/hashicorp/terraform/state/remote"
_ "github.com/lib/pq"
)
// RemoteClient is a remote client that stores data in a Postgres database
type RemoteClient struct {
Client *sql.DB
Name string
SchemaName string
// In-flight database transaction. Empty unless Locked.
txn *sql.Tx
txnMux sync.Mutex
info *state.LockInfo
}
func (c *RemoteClient) Get() (*remote.Payload, error) {
query := `SELECT data FROM %s.%s WHERE name = $1`
var row *sql.Row
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
row = c.txn.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
row = c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
var data []byte
err := row.Scan(&data)
switch {
case err == sql.ErrNoRows:
// No existing state returns empty.
return nil, nil
case err != nil:
return nil, err
default:
md5 := md5.Sum(data)
return &remote.Payload{
Data: data,
MD5: md5[:],
}, nil
}
}
func (c *RemoteClient) Put(data []byte) error {
query := `INSERT INTO %s.%s (name, data) VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE
SET data = $2 WHERE %s.name = $1`
var err error
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
}
if err != nil {
return err
}
return nil
}
func (c *RemoteClient) Delete() error {
query := `DELETE FROM %s.%s WHERE name = $1`
var err error
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
if err != nil {
return err
}
return nil
}
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
var err error
var lockID string
var txn *sql.Tx
if info.ID == "" {
lockID, err = uuid.GenerateUUID()
if err != nil {
return "", err
}
info.Operation = "client"
info.ID = lockID
}
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
if c.txn == nil {
// Most strict transaction isolation to prevent cross-talk
// between incomplete state transactions.
txn, err = c.Client.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return "", &state.LockError{Info: info, Err: err}
}
c.txn = txn
} else {
return "", &state.LockError{Info: info, Err: fmt.Errorf("Client is already in a locking transaction")}
}
// Do not wait before giving up on a contended lock.
_, err = c.Client.Exec(`SET LOCAL lock_timeout = 0`)
if err != nil {
c.rollback(info)
return "", err
}
// Try to acquire lock for the existing row.
query := `SELECT pg_try_advisory_xact_lock(%s.id) FROM %s.%s WHERE %s.name = $1`
row := c.txn.QueryRow(fmt.Sprintf(query, statesTableName, c.SchemaName, statesTableName, statesTableName), c.Name)
var didLock []byte
err = row.Scan(&didLock)
switch {
case err == sql.ErrNoRows:
// When the row does not yet exist in state, take
// the `-1` lock to create the new row.
innerRow := c.txn.QueryRow(`SELECT pg_try_advisory_xact_lock(-1)`)
var innerDidLock []byte
err := innerRow.Scan(&innerDidLock)
if err != nil {
c.rollback(info)
return "", &state.LockError{Info: info, Err: err}
}
if string(innerDidLock) == "false" {
c.rollback(info)
return "", &state.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)}
}
case err != nil:
c.rollback(info)
return "", &state.LockError{Info: info, Err: err}
case string(didLock) == "false":
c.rollback(info)
return "", &state.LockError{Info: info, Err: fmt.Errorf("Workspace is already locked: %s", c.Name)}
default:
}
return info.ID, nil
}
func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
return c.info, nil
}
func (c *RemoteClient) Unlock(id string) error {
// Take exclusive access to the database transaction
c.txnMux.Lock()
defer c.txnMux.Unlock()
if c.txn != nil {
err := c.txn.Commit()
if err != nil {
return err
}
c.txn = nil
}
c.info = nil
return nil
}
// This must be called from any code path where the
// transaction would not be committed (unlocked),
// otherwise the transactions will leak and prevent
// the process from exiting cleanly.
//
// Does not use mutex because this will implicitly be
// called from within an already mutex'd scope.
func (c *RemoteClient) rollback(info *state.LockInfo) error {
if c.txn != nil {
err := c.txn.Rollback()
if err != nil {
return err
}
c.txn = nil
}
c.info = nil
return nil
}