536 lines
13 KiB
Go
536 lines
13 KiB
Go
package swift
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gophercloud/gophercloud"
|
|
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/containers"
|
|
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/objects"
|
|
"github.com/gophercloud/gophercloud/pagination"
|
|
"github.com/hashicorp/terraform/states/remote"
|
|
"github.com/hashicorp/terraform/states/statemgr"
|
|
)
|
|
|
|
const (
|
|
consistencyTimeout = 15
|
|
|
|
// Suffix that will be appended to state file paths
|
|
// when locking
|
|
lockSuffix = ".lock"
|
|
|
|
// The TTL associated with this lock.
|
|
lockTTL = 60 * time.Second
|
|
|
|
// The Interval associated with this lock periodic renew.
|
|
lockRenewInterval = 30 * time.Second
|
|
|
|
// The amount of time we will retry to delete a container waiting for
|
|
// the objects to be deleted.
|
|
deleteRetryTimeout = 60 * time.Second
|
|
|
|
// delay when polling the objects
|
|
deleteRetryPollInterval = 5 * time.Second
|
|
)
|
|
|
|
// RemoteClient implements the Client interface for an Openstack Swift server.
|
|
// Implements "state/remote".ClientLocker
|
|
type RemoteClient struct {
|
|
client *gophercloud.ServiceClient
|
|
container string
|
|
archive bool
|
|
archiveContainer string
|
|
expireSecs int
|
|
objectName string
|
|
|
|
mu sync.Mutex
|
|
// lockState is true if we're using locks
|
|
lockState bool
|
|
|
|
info *statemgr.LockInfo
|
|
|
|
// lockCancel cancels the Context use for lockRenewPeriodic, and is
|
|
// called when unlocking, or before creating a new lock if the lock is
|
|
// lost.
|
|
lockCancel context.CancelFunc
|
|
}
|
|
|
|
func (c *RemoteClient) ListObjectsNames(prefix string, delim string) ([]string, error) {
|
|
if err := c.ensureContainerExists(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// List our raw path
|
|
listOpts := objects.ListOpts{
|
|
Full: false,
|
|
Prefix: prefix,
|
|
Delimiter: delim,
|
|
}
|
|
|
|
result := []string{}
|
|
pager := objects.List(c.client, c.container, listOpts)
|
|
// Define an anonymous function to be executed on each page's iteration
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
objectList, err := objects.ExtractNames(page)
|
|
if err != nil {
|
|
return false, fmt.Errorf("Error extracting names from objects from page %+v", err)
|
|
}
|
|
for _, object := range objectList {
|
|
result = append(result, object)
|
|
}
|
|
return true, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
|
|
}
|
|
|
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|
payload, err := c.get(c.objectName)
|
|
|
|
// 404 response is to be expected if the object doesn't already exist!
|
|
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
|
log.Println("[DEBUG] Object doesn't exist to download.")
|
|
return nil, nil
|
|
}
|
|
|
|
return payload, err
|
|
}
|
|
|
|
// swift is eventually constistent. Consistency
|
|
// is ensured by the Get func which will always try
|
|
// to retrieve the most recent object
|
|
func (c *RemoteClient) Put(data []byte) error {
|
|
if c.expireSecs != 0 {
|
|
log.Printf("[DEBUG] ExpireSecs = %d", c.expireSecs)
|
|
return c.put(c.objectName, data, c.expireSecs, "")
|
|
}
|
|
|
|
return c.put(c.objectName, data, -1, "")
|
|
|
|
}
|
|
|
|
func (c *RemoteClient) Delete() error {
|
|
return c.delete(c.objectName)
|
|
}
|
|
|
|
func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.lockState {
|
|
return "", nil
|
|
}
|
|
|
|
log.Printf("[DEBUG] Acquiring Lock %#v on %s/%s", info, c.container, c.objectName)
|
|
|
|
// This check only is to ensure we strictly follow the specification.
|
|
// Terraform shouldn't ever re-lock, so provide errors for the possible
|
|
// states if this is called.
|
|
if c.info != nil {
|
|
// we have an active lock already
|
|
return "", fmt.Errorf("state %q already locked", c.lockFilePath())
|
|
}
|
|
|
|
// update the path we're using
|
|
info.Path = c.lockFilePath()
|
|
|
|
if err := c.writeLockInfo(info, lockTTL, "*"); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
log.Printf("[DEBUG] Acquired Lock %s on %s", info.ID, c.objectName)
|
|
|
|
c.info = info
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.lockCancel = cancel
|
|
|
|
// keep the lock renewed
|
|
go c.lockRenewPeriodic(ctx, info)
|
|
|
|
return info.ID, nil
|
|
}
|
|
|
|
func (c *RemoteClient) Unlock(id string) error {
|
|
c.mu.Lock()
|
|
|
|
if !c.lockState {
|
|
return nil
|
|
}
|
|
|
|
defer func() {
|
|
// The periodic lock renew is canceled
|
|
// the lockCancel func may not be nil in most usecases
|
|
// but can typically be nil when using a second client
|
|
// to ForceUnlock the state based on the same lock Id
|
|
if c.lockCancel != nil {
|
|
c.lockCancel()
|
|
}
|
|
c.info = nil
|
|
c.mu.Unlock()
|
|
}()
|
|
|
|
log.Printf("[DEBUG] Releasing Lock %s on %s", id, c.objectName)
|
|
|
|
info, err := c.lockInfo()
|
|
if err != nil {
|
|
return c.lockError(fmt.Errorf("failed to retrieve lock info: %s", err), nil)
|
|
}
|
|
|
|
c.info = info
|
|
|
|
// conflicting lock
|
|
if info.ID != id {
|
|
return c.lockError(fmt.Errorf("lock id %q does not match existing lock", id), info)
|
|
}
|
|
|
|
// before the lock object deletion is ordered, we shall
|
|
// stop periodic renew
|
|
if c.lockCancel != nil {
|
|
c.lockCancel()
|
|
}
|
|
|
|
if err = c.delete(c.lockFilePath()); err != nil {
|
|
return c.lockError(fmt.Errorf("error deleting lock with %q: %s", id, err), info)
|
|
}
|
|
|
|
// Swift is eventually consistent; we have to wait until
|
|
// the lock is effectively deleted to return, or raise
|
|
// an error if deadline is reached.
|
|
|
|
warning := `
|
|
WARNING: Waiting for lock deletion timed out.
|
|
Swift has accepted the deletion order of the lock %s/%s.
|
|
But as it is eventually consistent, complete deletion
|
|
may happen later.
|
|
`
|
|
deadline := time.Now().Add(deleteRetryTimeout)
|
|
for {
|
|
if time.Now().Before(deadline) {
|
|
info, err := c.lockInfo()
|
|
|
|
// 404 response is to be expected if the lock deletion
|
|
// has been processed
|
|
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
|
log.Println("[DEBUG] Lock has been deleted.")
|
|
return nil
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// conflicting lock
|
|
if info.ID != id {
|
|
log.Printf("[DEBUG] Someone else has acquired a lock: %v.", info)
|
|
return nil
|
|
}
|
|
|
|
log.Printf("[DEBUG] Lock is still there, delete again and wait %v.", deleteRetryPollInterval)
|
|
c.delete(c.lockFilePath())
|
|
time.Sleep(deleteRetryPollInterval)
|
|
continue
|
|
}
|
|
|
|
return fmt.Errorf(warning, c.container, c.lockFilePath())
|
|
}
|
|
|
|
}
|
|
|
|
func (c *RemoteClient) get(object string) (*remote.Payload, error) {
|
|
log.Printf("[DEBUG] Getting object %s/%s", c.container, object)
|
|
result := objects.Download(c.client, c.container, object, objects.DownloadOpts{Newest: true})
|
|
|
|
// Extract any errors from result
|
|
_, err := result.Extract()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bytes, err := result.ExtractContent()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
hash := md5.Sum(bytes)
|
|
payload := &remote.Payload{
|
|
Data: bytes,
|
|
MD5: hash[:md5.Size],
|
|
}
|
|
|
|
return payload, nil
|
|
}
|
|
|
|
func (c *RemoteClient) put(object string, data []byte, deleteAfter int, ifNoneMatch string) error {
|
|
log.Printf("[DEBUG] Writing object in %s/%s", c.container, object)
|
|
if err := c.ensureContainerExists(); err != nil {
|
|
return err
|
|
}
|
|
|
|
contentType := "application/json"
|
|
contentLength := int64(len(data))
|
|
|
|
createOpts := objects.CreateOpts{
|
|
Content: bytes.NewReader(data),
|
|
ContentType: contentType,
|
|
ContentLength: int64(contentLength),
|
|
}
|
|
|
|
if deleteAfter >= 0 {
|
|
createOpts.DeleteAfter = deleteAfter
|
|
}
|
|
|
|
if ifNoneMatch != "" {
|
|
createOpts.IfNoneMatch = ifNoneMatch
|
|
}
|
|
|
|
result := objects.Create(c.client, c.container, object, createOpts)
|
|
if result.Err != nil {
|
|
return result.Err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RemoteClient) deleteContainer() error {
|
|
log.Printf("[DEBUG] Deleting container %s", c.container)
|
|
|
|
warning := `
|
|
WARNING: Waiting for container %s deletion timed out.
|
|
It may have been left in your Openstack account and may incur storage charges.
|
|
error was: %s
|
|
`
|
|
|
|
deadline := time.Now().Add(deleteRetryTimeout)
|
|
|
|
// Swift is eventually consistent; we have to retry until
|
|
// all objects are effectively deleted to delete the container
|
|
// If we still have objects in the container, or raise
|
|
// an error if deadline is reached
|
|
for {
|
|
if time.Now().Before(deadline) {
|
|
// Remove any objects
|
|
c.cleanObjects()
|
|
|
|
// Delete the container
|
|
log.Printf("[DEBUG] Deleting container %s", c.container)
|
|
deleteResult := containers.Delete(c.client, c.container)
|
|
if deleteResult.Err != nil {
|
|
// container is not found, thus has been deleted
|
|
if _, ok := deleteResult.Err.(gophercloud.ErrDefault404); ok {
|
|
return nil
|
|
}
|
|
|
|
// 409 http error is raised when deleting a container with
|
|
// remaining objects
|
|
if respErr, ok := deleteResult.Err.(gophercloud.ErrUnexpectedResponseCode); ok && respErr.Actual == 409 {
|
|
time.Sleep(deleteRetryPollInterval)
|
|
log.Printf("[DEBUG] Remaining objects, failed to delete container, retrying...")
|
|
continue
|
|
}
|
|
|
|
return fmt.Errorf(warning, deleteResult.Err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf(warning, c.container, "timeout reached")
|
|
}
|
|
|
|
}
|
|
|
|
// Helper function to delete Swift objects within a container
|
|
func (c *RemoteClient) cleanObjects() error {
|
|
// Get a slice of object names
|
|
objectNames, err := c.objectNames(c.container)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, object := range objectNames {
|
|
log.Printf("[DEBUG] Deleting object %s from container %s", object, c.container)
|
|
result := objects.Delete(c.client, c.container, object, nil)
|
|
if result.Err == nil {
|
|
continue
|
|
}
|
|
|
|
// if object is not found, it has already been deleted
|
|
if _, ok := result.Err.(gophercloud.ErrDefault404); !ok {
|
|
return fmt.Errorf("Error deleting object %s from container %s: %v", object, c.container, result.Err)
|
|
}
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (c *RemoteClient) delete(object string) error {
|
|
log.Printf("[DEBUG] Deleting object %s/%s", c.container, object)
|
|
|
|
result := objects.Delete(c.client, c.container, object, nil)
|
|
|
|
if result.Err != nil {
|
|
return result.Err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *RemoteClient) writeLockInfo(info *statemgr.LockInfo, deleteAfter time.Duration, ifNoneMatch string) error {
|
|
err := c.put(c.lockFilePath(), info.Marshal(), int(deleteAfter.Seconds()), ifNoneMatch)
|
|
|
|
if httpErr, ok := err.(gophercloud.ErrUnexpectedResponseCode); ok && httpErr.Actual == 412 {
|
|
log.Printf("[DEBUG] Couldn't write lock %s. One already exists.", info.ID)
|
|
info2, err2 := c.lockInfo()
|
|
if err2 != nil {
|
|
return fmt.Errorf("Couldn't read lock info: %v", err2)
|
|
}
|
|
|
|
return c.lockError(err, info2)
|
|
}
|
|
|
|
if err != nil {
|
|
return c.lockError(err, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *RemoteClient) lockError(err error, conflictingLock *statemgr.LockInfo) *statemgr.LockError {
|
|
lockErr := &statemgr.LockError{
|
|
Err: err,
|
|
Info: conflictingLock,
|
|
}
|
|
|
|
return lockErr
|
|
}
|
|
|
|
// lockInfo reads the lock file, parses its contents and returns the parsed
|
|
// LockInfo struct.
|
|
func (c *RemoteClient) lockInfo() (*statemgr.LockInfo, error) {
|
|
raw, err := c.get(c.lockFilePath())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
info := &statemgr.LockInfo{}
|
|
|
|
if err := json.Unmarshal(raw.Data, info); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
func (c *RemoteClient) lockRenewPeriodic(ctx context.Context, info *statemgr.LockInfo) error {
|
|
log.Printf("[DEBUG] Renew lock %v", info)
|
|
|
|
waitDur := lockRenewInterval
|
|
lastRenewTime := time.Now()
|
|
var lastErr error
|
|
for {
|
|
if time.Since(lastRenewTime) > lockTTL {
|
|
return lastErr
|
|
}
|
|
select {
|
|
case <-time.After(waitDur):
|
|
c.mu.Lock()
|
|
// Unlock may have released the mu.Lock
|
|
// in which case we shouldn't renew the lock
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("[DEBUG] Stopping Periodic renew of lock %v", info)
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
info2, err := c.lockInfo()
|
|
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
|
log.Println("[DEBUG] Lock has expired trying to reacquire.")
|
|
err = nil
|
|
}
|
|
|
|
if err == nil && (info2 == nil || info.ID == info2.ID) {
|
|
info2 = info
|
|
log.Printf("[DEBUG] Renewing lock %v.", info)
|
|
err = c.writeLockInfo(info, lockTTL, "")
|
|
}
|
|
|
|
c.mu.Unlock()
|
|
|
|
if err != nil {
|
|
log.Printf("[ERROR] could not reacquire lock (%v): %s", info, err)
|
|
waitDur = time.Second
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
// conflicting lock
|
|
if info2.ID != info.ID {
|
|
return c.lockError(fmt.Errorf("lock id %q does not match existing lock %q", info.ID, info2.ID), info2)
|
|
}
|
|
|
|
waitDur = lockRenewInterval
|
|
lastRenewTime = time.Now()
|
|
|
|
case <-ctx.Done():
|
|
log.Printf("[DEBUG] Stopping Periodic renew of lock %s", info.ID)
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *RemoteClient) lockFilePath() string {
|
|
return c.objectName + lockSuffix
|
|
}
|
|
|
|
func (c *RemoteClient) ensureContainerExists() error {
|
|
containerOpts := &containers.CreateOpts{}
|
|
|
|
if c.archive {
|
|
log.Printf("[DEBUG] Creating archive container %s", c.archiveContainer)
|
|
result := containers.Create(c.client, c.archiveContainer, nil)
|
|
if result.Err != nil {
|
|
log.Printf("[DEBUG] Error creating archive container %s: %s", c.archiveContainer, result.Err)
|
|
return result.Err
|
|
}
|
|
|
|
log.Printf("[DEBUG] Enabling Versioning on container %s", c.container)
|
|
containerOpts.VersionsLocation = c.archiveContainer
|
|
}
|
|
|
|
log.Printf("[DEBUG] Creating container %s", c.container)
|
|
result := containers.Create(c.client, c.container, containerOpts)
|
|
if result.Err != nil {
|
|
return result.Err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Helper function to get a list of objects in a Swift container
|
|
func (c *RemoteClient) objectNames(container string) (objectNames []string, err error) {
|
|
_ = objects.List(c.client, container, nil).EachPage(func(page pagination.Page) (bool, error) {
|
|
// Get a slice of object names
|
|
names, err := objects.ExtractNames(page)
|
|
if err != nil {
|
|
return false, fmt.Errorf("Error extracting object names from page: %s", err)
|
|
}
|
|
for _, object := range names {
|
|
objectNames = append(objectNames, object)
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
return
|
|
}
|