190 lines
5.1 KiB
Go
190 lines
5.1 KiB
Go
package resource
|
|
|
|
import (
|
|
"log"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// StateRefreshFunc is a function type used for StateChangeConf that is
|
|
// responsible for refreshing the item being watched for a state change.
|
|
//
|
|
// It returns three results. `result` is any object that will be returned
|
|
// as the final object after waiting for state change. This allows you to
|
|
// return the final updated object, for example an EC2 instance after refreshing
|
|
// it.
|
|
//
|
|
// `state` is the latest state of that object. And `err` is any error that
|
|
// may have happened while refreshing the state.
|
|
type StateRefreshFunc func() (result interface{}, state string, err error)
|
|
|
|
// StateChangeConf is the configuration struct used for `WaitForState`.
|
|
type StateChangeConf struct {
|
|
Delay time.Duration // Wait this time before starting checks
|
|
Pending []string // States that are "allowed" and will continue trying
|
|
Refresh StateRefreshFunc // Refreshes the current state
|
|
Target []string // Target state
|
|
Timeout time.Duration // The amount of time to wait before timeout
|
|
MinTimeout time.Duration // Smallest time to wait before refreshes
|
|
PollInterval time.Duration // Override MinTimeout/backoff and only poll this often
|
|
NotFoundChecks int // Number of times to allow not found
|
|
|
|
// This is to work around inconsistent APIs
|
|
ContinuousTargetOccurence int // Number of times the Target state has to occur continuously
|
|
}
|
|
|
|
// WaitForState watches an object and waits for it to achieve the state
|
|
// specified in the configuration using the specified Refresh() func,
|
|
// waiting the number of seconds specified in the timeout configuration.
|
|
//
|
|
// If the Refresh function returns a error, exit immediately with that error.
|
|
//
|
|
// If the Refresh function returns a state other than the Target state or one
|
|
// listed in Pending, return immediately with an error.
|
|
//
|
|
// If the Timeout is exceeded before reaching the Target state, return an
|
|
// error.
|
|
//
|
|
// Otherwise, result the result of the first call to the Refresh function to
|
|
// reach the target state.
|
|
func (conf *StateChangeConf) WaitForState() (interface{}, error) {
|
|
log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target)
|
|
|
|
notfoundTick := 0
|
|
targetOccurence := 0
|
|
|
|
// Set a default for times to check for not found
|
|
if conf.NotFoundChecks == 0 {
|
|
conf.NotFoundChecks = 20
|
|
}
|
|
|
|
if conf.ContinuousTargetOccurence == 0 {
|
|
conf.ContinuousTargetOccurence = 1
|
|
}
|
|
|
|
// We can't safely read the result values if we timeout, so store them in
|
|
// an atomic.Value
|
|
type Result struct {
|
|
Result interface{}
|
|
State string
|
|
Error error
|
|
}
|
|
var lastResult atomic.Value
|
|
lastResult.Store(Result{})
|
|
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
defer close(doneCh)
|
|
|
|
// Wait for the delay
|
|
time.Sleep(conf.Delay)
|
|
|
|
wait := 100 * time.Millisecond
|
|
|
|
for {
|
|
res, currentState, err := conf.Refresh()
|
|
result := Result{
|
|
Result: res,
|
|
State: currentState,
|
|
Error: err,
|
|
}
|
|
lastResult.Store(result)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// If we're waiting for the absence of a thing, then return
|
|
if res == nil && len(conf.Target) == 0 {
|
|
targetOccurence += 1
|
|
if conf.ContinuousTargetOccurence == targetOccurence {
|
|
return
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if res == nil {
|
|
// If we didn't find the resource, check if we have been
|
|
// not finding it for awhile, and if so, report an error.
|
|
notfoundTick += 1
|
|
if notfoundTick > conf.NotFoundChecks {
|
|
result.Error = &NotFoundError{
|
|
LastError: err,
|
|
}
|
|
lastResult.Store(result)
|
|
return
|
|
}
|
|
} else {
|
|
// Reset the counter for when a resource isn't found
|
|
notfoundTick = 0
|
|
found := false
|
|
|
|
for _, allowed := range conf.Target {
|
|
if currentState == allowed {
|
|
found = true
|
|
targetOccurence += 1
|
|
if conf.ContinuousTargetOccurence == targetOccurence {
|
|
return
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, allowed := range conf.Pending {
|
|
if currentState == allowed {
|
|
found = true
|
|
targetOccurence = 0
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
result.Error = &UnexpectedStateError{
|
|
LastError: err,
|
|
State: result.State,
|
|
ExpectedState: conf.Target,
|
|
}
|
|
lastResult.Store(result)
|
|
return
|
|
}
|
|
}
|
|
|
|
// If a poll interval has been specified, choose that interval.
|
|
// Otherwise bound the default value.
|
|
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
|
|
wait = conf.PollInterval
|
|
} else {
|
|
if wait < conf.MinTimeout {
|
|
wait = conf.MinTimeout
|
|
} else if wait > 10*time.Second {
|
|
wait = 10 * time.Second
|
|
}
|
|
}
|
|
|
|
log.Printf("[TRACE] Waiting %s before next try", wait)
|
|
time.Sleep(wait)
|
|
|
|
// Wait between refreshes using exponential backoff, except when
|
|
// waiting for the target state to reoccur.
|
|
if targetOccurence == 0 {
|
|
wait *= 2
|
|
}
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-doneCh:
|
|
r := lastResult.Load().(Result)
|
|
return r.Result, r.Error
|
|
case <-time.After(conf.Timeout):
|
|
r := lastResult.Load().(Result)
|
|
return nil, &TimeoutError{
|
|
LastError: r.Error,
|
|
LastState: r.State,
|
|
ExpectedState: conf.Target,
|
|
}
|
|
}
|
|
}
|