Merge pull request #17323 from hashicorp/jbardin/shutdown
Fix race in writing state during hard cancelation
This commit is contained in:
commit
a65089fcea
|
@ -145,8 +145,6 @@ type Operation struct {
|
|||
|
||||
// RunningOperation is the result of starting an operation.
|
||||
type RunningOperation struct {
|
||||
// Context should be used to track Done and Err for errors.
|
||||
//
|
||||
// For implementers of a backend, this context should not wrap the
|
||||
// passed in context. Otherwise, canceling the parent context will
|
||||
// immediately mark this context as "done" but those aren't the semantics
|
||||
|
@ -154,6 +152,16 @@ type RunningOperation struct {
|
|||
// is fully done.
|
||||
context.Context
|
||||
|
||||
// Stop requests the operation to complete early, by calling Stop on all
|
||||
// the plugins. If the process needs to terminate immediately, call Cancel.
|
||||
Stop context.CancelFunc
|
||||
|
||||
// Cancel is the context.CancelFunc associated with the embedded context,
|
||||
// and can be called to terminate the operation early.
|
||||
// Once Cancel is called, the operation should return as soon as possible
|
||||
// to avoid running operations during process exit.
|
||||
Cancel context.CancelFunc
|
||||
|
||||
// Err is the error of the operation. This is populated after
|
||||
// the operation has completed.
|
||||
Err error
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
@ -224,7 +225,7 @@ func (b *Local) State(name string) (state.State, error) {
|
|||
// name conflicts, assume that the field is overwritten if set.
|
||||
func (b *Local) Operation(ctx context.Context, op *backend.Operation) (*backend.RunningOperation, error) {
|
||||
// Determine the function to call for our operation
|
||||
var f func(context.Context, *backend.Operation, *backend.RunningOperation)
|
||||
var f func(context.Context, context.Context, *backend.Operation, *backend.RunningOperation)
|
||||
switch op.Type {
|
||||
case backend.OperationTypeRefresh:
|
||||
f = b.opRefresh
|
||||
|
@ -244,20 +245,82 @@ func (b *Local) Operation(ctx context.Context, op *backend.Operation) (*backend.
|
|||
b.opLock.Lock()
|
||||
|
||||
// Build our running operation
|
||||
runningCtx, runningCtxCancel := context.WithCancel(context.Background())
|
||||
runningOp := &backend.RunningOperation{Context: runningCtx}
|
||||
// the runninCtx is only used to block until the operation returns.
|
||||
runningCtx, done := context.WithCancel(context.Background())
|
||||
runningOp := &backend.RunningOperation{
|
||||
Context: runningCtx,
|
||||
}
|
||||
|
||||
// stopCtx wraps the context passed in, and is used to signal a graceful Stop.
|
||||
stopCtx, stop := context.WithCancel(ctx)
|
||||
runningOp.Stop = stop
|
||||
|
||||
// cancelCtx is used to cancel the operation immediately, usually
|
||||
// indicating that the process is exiting.
|
||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
runningOp.Cancel = cancel
|
||||
|
||||
// Do it
|
||||
go func() {
|
||||
defer done()
|
||||
defer stop()
|
||||
defer cancel()
|
||||
|
||||
defer b.opLock.Unlock()
|
||||
defer runningCtxCancel()
|
||||
f(ctx, op, runningOp)
|
||||
f(stopCtx, cancelCtx, op, runningOp)
|
||||
}()
|
||||
|
||||
// Return
|
||||
return runningOp, nil
|
||||
}
|
||||
|
||||
// opWait wats for the operation to complete, and a stop signal or a
|
||||
// cancelation signal.
|
||||
func (b *Local) opWait(
|
||||
doneCh <-chan struct{},
|
||||
stopCtx context.Context,
|
||||
cancelCtx context.Context,
|
||||
tfCtx *terraform.Context,
|
||||
opState state.State) (canceled bool) {
|
||||
// Wait for the operation to finish or for us to be interrupted so
|
||||
// we can handle it properly.
|
||||
select {
|
||||
case <-stopCtx.Done():
|
||||
if b.CLI != nil {
|
||||
b.CLI.Output("stopping operation...")
|
||||
}
|
||||
|
||||
// try to force a PersistState just in case the process is terminated
|
||||
// before we can complete.
|
||||
if err := opState.PersistState(); err != nil {
|
||||
// We can't error out from here, but warn the user if there was an error.
|
||||
// If this isn't transient, we will catch it again below, and
|
||||
// attempt to save the state another way.
|
||||
if b.CLI != nil {
|
||||
b.CLI.Error(fmt.Sprintf(earlyStateWriteErrorFmt, err))
|
||||
}
|
||||
}
|
||||
|
||||
// Stop execution
|
||||
go tfCtx.Stop()
|
||||
|
||||
select {
|
||||
case <-cancelCtx.Done():
|
||||
log.Println("[WARN] running operation canceled")
|
||||
// if the operation was canceled, we need to return immediately
|
||||
canceled = true
|
||||
case <-doneCh:
|
||||
}
|
||||
case <-cancelCtx.Done():
|
||||
// this should not be called without first attempting to stop the
|
||||
// operation
|
||||
log.Println("[ERROR] running operation canceled without Stop")
|
||||
canceled = true
|
||||
case <-doneCh:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Colorize returns the Colorize structure that can be used for colorizing
|
||||
// output. This is gauranteed to always return a non-nil value and so is useful
|
||||
// as a helper to wrap any potentially colored strings.
|
||||
|
|
|
@ -19,7 +19,8 @@ import (
|
|||
)
|
||||
|
||||
func (b *Local) opApply(
|
||||
ctx context.Context,
|
||||
stopCtx context.Context,
|
||||
cancelCtx context.Context,
|
||||
op *backend.Operation,
|
||||
runningOp *backend.RunningOperation) {
|
||||
log.Printf("[INFO] backend/local: starting Apply operation")
|
||||
|
@ -55,7 +56,7 @@ func (b *Local) opApply(
|
|||
}
|
||||
|
||||
if op.LockState {
|
||||
lockCtx, cancel := context.WithTimeout(ctx, op.StateLockTimeout)
|
||||
lockCtx, cancel := context.WithTimeout(stopCtx, op.StateLockTimeout)
|
||||
defer cancel()
|
||||
|
||||
lockInfo := state.NewLockInfo()
|
||||
|
@ -153,32 +154,8 @@ func (b *Local) opApply(
|
|||
applyState = tfCtx.State()
|
||||
}()
|
||||
|
||||
// Wait for the apply to finish or for us to be interrupted so
|
||||
// we can handle it properly.
|
||||
err = nil
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if b.CLI != nil {
|
||||
b.CLI.Output("stopping apply operation...")
|
||||
}
|
||||
|
||||
// try to force a PersistState just in case the process is terminated
|
||||
// before we can complete.
|
||||
if err := opState.PersistState(); err != nil {
|
||||
// We can't error out from here, but warn the user if there was an error.
|
||||
// If this isn't transient, we will catch it again below, and
|
||||
// attempt to save the state another way.
|
||||
if b.CLI != nil {
|
||||
b.CLI.Error(fmt.Sprintf(earlyStateWriteErrorFmt, err))
|
||||
}
|
||||
}
|
||||
|
||||
// Stop execution
|
||||
go tfCtx.Stop()
|
||||
|
||||
// Wait for completion still
|
||||
<-doneCh
|
||||
case <-doneCh:
|
||||
if b.opWait(doneCh, stopCtx, cancelCtx, tfCtx, opState) {
|
||||
return
|
||||
}
|
||||
|
||||
// Store the final state
|
||||
|
|
|
@ -19,7 +19,8 @@ import (
|
|||
)
|
||||
|
||||
func (b *Local) opPlan(
|
||||
ctx context.Context,
|
||||
stopCtx context.Context,
|
||||
cancelCtx context.Context,
|
||||
op *backend.Operation,
|
||||
runningOp *backend.RunningOperation) {
|
||||
log.Printf("[INFO] backend/local: starting Plan operation")
|
||||
|
@ -62,7 +63,7 @@ func (b *Local) opPlan(
|
|||
}
|
||||
|
||||
if op.LockState {
|
||||
lockCtx, cancel := context.WithTimeout(ctx, op.StateLockTimeout)
|
||||
lockCtx, cancel := context.WithTimeout(stopCtx, op.StateLockTimeout)
|
||||
defer cancel()
|
||||
|
||||
lockInfo := state.NewLockInfo()
|
||||
|
@ -111,18 +112,8 @@ func (b *Local) opPlan(
|
|||
plan, planErr = tfCtx.Plan()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if b.CLI != nil {
|
||||
b.CLI.Output("stopping plan operation...")
|
||||
}
|
||||
|
||||
// Stop execution
|
||||
go tfCtx.Stop()
|
||||
|
||||
// Wait for completion still
|
||||
<-doneCh
|
||||
case <-doneCh:
|
||||
if b.opWait(doneCh, stopCtx, cancelCtx, tfCtx, opState) {
|
||||
return
|
||||
}
|
||||
|
||||
if planErr != nil {
|
||||
|
|
|
@ -17,7 +17,8 @@ import (
|
|||
)
|
||||
|
||||
func (b *Local) opRefresh(
|
||||
ctx context.Context,
|
||||
stopCtx context.Context,
|
||||
cancelCtx context.Context,
|
||||
op *backend.Operation,
|
||||
runningOp *backend.RunningOperation) {
|
||||
// Check if our state exists if we're performing a refresh operation. We
|
||||
|
@ -53,7 +54,7 @@ func (b *Local) opRefresh(
|
|||
}
|
||||
|
||||
if op.LockState {
|
||||
lockCtx, cancel := context.WithTimeout(ctx, op.StateLockTimeout)
|
||||
lockCtx, cancel := context.WithTimeout(stopCtx, op.StateLockTimeout)
|
||||
defer cancel()
|
||||
|
||||
lockInfo := state.NewLockInfo()
|
||||
|
@ -90,18 +91,8 @@ func (b *Local) opRefresh(
|
|||
log.Printf("[INFO] backend/local: refresh calling Refresh")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if b.CLI != nil {
|
||||
b.CLI.Output("stopping refresh operation...")
|
||||
}
|
||||
|
||||
// Stop execution
|
||||
go tfCtx.Stop()
|
||||
|
||||
// Wait for completion still
|
||||
<-doneCh
|
||||
case <-doneCh:
|
||||
if b.opWait(doneCh, stopCtx, cancelCtx, tfCtx, opState) {
|
||||
return
|
||||
}
|
||||
|
||||
// write the resulting state to the running op
|
||||
|
|
|
@ -2,7 +2,6 @@ package command
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
|
@ -158,38 +157,9 @@ func (c *ApplyCommand) Run(args []string) int {
|
|||
opReq.AutoApprove = autoApprove
|
||||
opReq.DestroyForce = destroyForce
|
||||
|
||||
// Perform the operation
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
defer ctxCancel()
|
||||
|
||||
op, err := b.Operation(ctx, opReq)
|
||||
op, err := c.RunOperation(b, opReq)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error starting operation: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Wait for the operation to complete or an interrupt to occur
|
||||
select {
|
||||
case <-c.ShutdownCh:
|
||||
// Cancel our context so we can start gracefully exiting
|
||||
ctxCancel()
|
||||
|
||||
// Notify the user
|
||||
c.Ui.Output(outputInterrupt)
|
||||
|
||||
// Still get the result, since there is still one
|
||||
select {
|
||||
case <-c.ShutdownCh:
|
||||
c.Ui.Error(
|
||||
"Two interrupts received. Exiting immediately. Note that data\n" +
|
||||
"loss may have occurred.")
|
||||
return 1
|
||||
case <-op.Done():
|
||||
}
|
||||
case <-op.Done():
|
||||
if err := op.Err; err != nil {
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
|
||||
c.showDiagnostics(diags)
|
||||
|
|
|
@ -824,12 +824,11 @@ func TestApply_refresh(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestApply_shutdown(t *testing.T) {
|
||||
cancelled := false
|
||||
stopped := make(chan struct{})
|
||||
cancelled := make(chan struct{})
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
statePath := testTempFile(t)
|
||||
p := testProvider()
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
ui := new(cli.MockUi)
|
||||
c := &ApplyCommand{
|
||||
|
@ -841,8 +840,7 @@ func TestApply_shutdown(t *testing.T) {
|
|||
}
|
||||
|
||||
p.StopFn = func() error {
|
||||
close(stopped)
|
||||
cancelled = true
|
||||
close(cancelled)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -858,15 +856,26 @@ func TestApply_shutdown(t *testing.T) {
|
|||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
p.ApplyFn = func(
|
||||
*terraform.InstanceInfo,
|
||||
*terraform.InstanceState,
|
||||
*terraform.InstanceDiff) (*terraform.InstanceState, error) {
|
||||
|
||||
// only cancel once
|
||||
if !cancelled {
|
||||
once.Do(func() {
|
||||
shutdownCh <- struct{}{}
|
||||
<-stopped
|
||||
}
|
||||
})
|
||||
|
||||
// Because of the internal lock in the MockProvider, we can't
|
||||
// coordiante directly with the calling of Stop, and making the
|
||||
// MockProvider concurrent is disruptive to a lot of existing tests.
|
||||
// Wait here a moment to help make sure the main goroutine gets to the
|
||||
// Stop call before we exit, or the plan may finish before it can be
|
||||
// canceled.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
return &terraform.InstanceState{
|
||||
ID: "foo",
|
||||
Attributes: map[string]string{
|
||||
|
@ -888,7 +897,9 @@ func TestApply_shutdown(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if !cancelled {
|
||||
select {
|
||||
case <-cancelled:
|
||||
default:
|
||||
t.Fatal("command not cancelled")
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package command
|
|||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
@ -259,6 +260,54 @@ func (m *Meta) StdinPiped() bool {
|
|||
return fi.Mode()&os.ModeNamedPipe != 0
|
||||
}
|
||||
|
||||
func (m *Meta) RunOperation(b backend.Enhanced, opReq *backend.Operation) (*backend.RunningOperation, error) {
|
||||
op, err := b.Operation(context.Background(), opReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error starting operation: %s", err)
|
||||
}
|
||||
|
||||
// Wait for the operation to complete or an interrupt to occur
|
||||
select {
|
||||
case <-m.ShutdownCh:
|
||||
// gracefully stop the operation
|
||||
op.Stop()
|
||||
|
||||
// Notify the user
|
||||
m.Ui.Output(outputInterrupt)
|
||||
|
||||
// Still get the result, since there is still one
|
||||
select {
|
||||
case <-m.ShutdownCh:
|
||||
m.Ui.Error(
|
||||
"Two interrupts received. Exiting immediately. Note that data\n" +
|
||||
"loss may have occurred.")
|
||||
|
||||
// cancel the operation completely
|
||||
op.Cancel()
|
||||
|
||||
// the operation should return asap
|
||||
// but timeout just in case
|
||||
select {
|
||||
case <-op.Done():
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
return nil, errors.New("operation canceled")
|
||||
|
||||
case <-op.Done():
|
||||
// operation completed after Stop
|
||||
}
|
||||
case <-op.Done():
|
||||
// operation completed normally
|
||||
}
|
||||
|
||||
if op.Err != nil {
|
||||
return op, op.Err
|
||||
}
|
||||
|
||||
return op, nil
|
||||
}
|
||||
|
||||
const (
|
||||
ProviderSkipVerifyEnvVar = "TF_SKIP_PROVIDER_VERIFY"
|
||||
)
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
|
@ -107,35 +106,9 @@ func (c *PlanCommand) Run(args []string) int {
|
|||
opReq.Type = backend.OperationTypePlan
|
||||
|
||||
// Perform the operation
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
defer ctxCancel()
|
||||
|
||||
op, err := b.Operation(ctx, opReq)
|
||||
op, err := c.RunOperation(b, opReq)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error starting operation: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.ShutdownCh:
|
||||
// Cancel our context so we can start gracefully exiting
|
||||
ctxCancel()
|
||||
|
||||
// Notify the user
|
||||
c.Ui.Output(outputInterrupt)
|
||||
|
||||
// Still get the result, since there is still one
|
||||
select {
|
||||
case <-c.ShutdownCh:
|
||||
c.Ui.Error(
|
||||
"Two interrupts received. Exiting immediately")
|
||||
return 1
|
||||
case <-op.Done():
|
||||
}
|
||||
case <-op.Done():
|
||||
if err := op.Err; err != nil {
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
|
||||
c.showDiagnostics(diags)
|
||||
|
|
|
@ -835,8 +835,8 @@ func TestPlan_detailedExitcode_emptyDiff(t *testing.T) {
|
|||
|
||||
func TestPlan_shutdown(t *testing.T) {
|
||||
cancelled := make(chan struct{})
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
p := testProvider()
|
||||
ui := new(cli.MockUi)
|
||||
c := &PlanCommand{
|
||||
|
@ -880,8 +880,10 @@ func TestPlan_shutdown(t *testing.T) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
if code := c.Run([]string{testFixturePath("apply-shutdown")}); code != 0 {
|
||||
t.Fatalf("bad: %d\n\n%s", code, ui.ErrorWriter.String())
|
||||
if code := c.Run([]string{testFixturePath("apply-shutdown")}); code != 1 {
|
||||
// FIXME: we should be able to avoid the error during evaluation
|
||||
// the early exit isn't caught before the interpolation is evaluated
|
||||
t.Fatal(ui.OutputWriter.String())
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
|
@ -75,16 +74,8 @@ func (c *RefreshCommand) Run(args []string) int {
|
|||
opReq.Type = backend.OperationTypeRefresh
|
||||
opReq.Module = mod
|
||||
|
||||
// Perform the operation
|
||||
op, err := b.Operation(context.Background(), opReq)
|
||||
op, err := c.RunOperation(b, opReq)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error starting operation: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
// Wait for the operation to complete
|
||||
<-op.Done()
|
||||
if err := op.Err; err != nil {
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue