Merge pull request #8560 from hashicorp/jbardin/race2
Fix races in WaitForState
This commit is contained in:
commit
e0014198e1
|
@ -2,6 +2,7 @@ package resource
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -61,9 +62,15 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
|
||||||
conf.ContinuousTargetOccurence = 1
|
conf.ContinuousTargetOccurence = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
var result interface{}
|
// We can't safely read the result values if we timeout, so store them in
|
||||||
var resulterr error
|
// an atomic.Value
|
||||||
var currentState string
|
type Result struct {
|
||||||
|
Result interface{}
|
||||||
|
State string
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
var lastResult atomic.Value
|
||||||
|
lastResult.Store(Result{})
|
||||||
|
|
||||||
doneCh := make(chan struct{})
|
doneCh := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -74,39 +81,21 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
|
||||||
|
|
||||||
wait := 100 * time.Millisecond
|
wait := 100 * time.Millisecond
|
||||||
|
|
||||||
var err error
|
for {
|
||||||
for first := true; ; first = false {
|
res, currentState, err := conf.Refresh()
|
||||||
if !first {
|
result := Result{
|
||||||
// If a poll interval has been specified, choose that interval.
|
Result: res,
|
||||||
// Otherwise bound the default value.
|
State: currentState,
|
||||||
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
|
Error: err,
|
||||||
wait = conf.PollInterval
|
|
||||||
} else {
|
|
||||||
if wait < conf.MinTimeout {
|
|
||||||
wait = conf.MinTimeout
|
|
||||||
} else if wait > 10*time.Second {
|
|
||||||
wait = 10 * time.Second
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("[TRACE] Waiting %s before next try", wait)
|
|
||||||
time.Sleep(wait)
|
|
||||||
|
|
||||||
// Wait between refreshes using exponential backoff, except when
|
|
||||||
// waiting for the target state to reoccur.
|
|
||||||
if targetOccurence == 0 {
|
|
||||||
wait *= 2
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
lastResult.Store(result)
|
||||||
|
|
||||||
result, currentState, err = conf.Refresh()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resulterr = err
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we're waiting for the absence of a thing, then return
|
// If we're waiting for the absence of a thing, then return
|
||||||
if result == nil && len(conf.Target) == 0 {
|
if res == nil && len(conf.Target) == 0 {
|
||||||
targetOccurence += 1
|
targetOccurence += 1
|
||||||
if conf.ContinuousTargetOccurence == targetOccurence {
|
if conf.ContinuousTargetOccurence == targetOccurence {
|
||||||
return
|
return
|
||||||
|
@ -115,14 +104,15 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if result == nil {
|
if res == nil {
|
||||||
// If we didn't find the resource, check if we have been
|
// If we didn't find the resource, check if we have been
|
||||||
// not finding it for awhile, and if so, report an error.
|
// not finding it for awhile, and if so, report an error.
|
||||||
notfoundTick += 1
|
notfoundTick += 1
|
||||||
if notfoundTick > conf.NotFoundChecks {
|
if notfoundTick > conf.NotFoundChecks {
|
||||||
resulterr = &NotFoundError{
|
result.Error = &NotFoundError{
|
||||||
LastError: resulterr,
|
LastError: err,
|
||||||
}
|
}
|
||||||
|
lastResult.Store(result)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -151,24 +141,48 @@ func (conf *StateChangeConf) WaitForState() (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
resulterr = &UnexpectedStateError{
|
result.Error = &UnexpectedStateError{
|
||||||
LastError: resulterr,
|
LastError: err,
|
||||||
State: currentState,
|
State: result.State,
|
||||||
ExpectedState: conf.Target,
|
ExpectedState: conf.Target,
|
||||||
}
|
}
|
||||||
|
lastResult.Store(result)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If a poll interval has been specified, choose that interval.
|
||||||
|
// Otherwise bound the default value.
|
||||||
|
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
|
||||||
|
wait = conf.PollInterval
|
||||||
|
} else {
|
||||||
|
if wait < conf.MinTimeout {
|
||||||
|
wait = conf.MinTimeout
|
||||||
|
} else if wait > 10*time.Second {
|
||||||
|
wait = 10 * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[TRACE] Waiting %s before next try", wait)
|
||||||
|
time.Sleep(wait)
|
||||||
|
|
||||||
|
// Wait between refreshes using exponential backoff, except when
|
||||||
|
// waiting for the target state to reoccur.
|
||||||
|
if targetOccurence == 0 {
|
||||||
|
wait *= 2
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
return result, resulterr
|
r := lastResult.Load().(Result)
|
||||||
|
return r.Result, r.Error
|
||||||
case <-time.After(conf.Timeout):
|
case <-time.After(conf.Timeout):
|
||||||
|
r := lastResult.Load().(Result)
|
||||||
return nil, &TimeoutError{
|
return nil, &TimeoutError{
|
||||||
LastError: resulterr,
|
LastError: r.Error,
|
||||||
LastState: currentState,
|
LastState: r.State,
|
||||||
ExpectedState: conf.Target,
|
ExpectedState: conf.Target,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue