Merge pull request #26435 from hashicorp/jbardin/races
Fix race conditions
This commit is contained in:
commit
c51104fb7c
|
@ -12,10 +12,12 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
tfe "github.com/hashicorp/go-tfe"
|
tfe "github.com/hashicorp/go-tfe"
|
||||||
"github.com/hashicorp/terraform/terraform"
|
"github.com/hashicorp/terraform/terraform"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockClient struct {
|
type mockClient struct {
|
||||||
|
@ -693,6 +695,8 @@ func (m *mockPolicyChecks) Logs(ctx context.Context, policyCheckID string) (io.R
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockRuns struct {
|
type mockRuns struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
client *mockClient
|
client *mockClient
|
||||||
runs map[string]*tfe.Run
|
runs map[string]*tfe.Run
|
||||||
workspaces map[string][]*tfe.Run
|
workspaces map[string][]*tfe.Run
|
||||||
|
@ -712,13 +716,21 @@ func newMockRuns(client *mockClient) *mockRuns {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRuns) List(ctx context.Context, workspaceID string, options tfe.RunListOptions) (*tfe.RunList, error) {
|
func (m *mockRuns) List(ctx context.Context, workspaceID string, options tfe.RunListOptions) (*tfe.RunList, error) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
w, ok := m.client.Workspaces.workspaceIDs[workspaceID]
|
w, ok := m.client.Workspaces.workspaceIDs[workspaceID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, tfe.ErrResourceNotFound
|
return nil, tfe.ErrResourceNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
rl := &tfe.RunList{
|
rl := &tfe.RunList{}
|
||||||
Items: m.workspaces[w.ID],
|
for _, run := range m.workspaces[w.ID] {
|
||||||
|
rc, err := copystructure.Copy(run)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
rl.Items = append(rl.Items, rc.(*tfe.Run))
|
||||||
}
|
}
|
||||||
|
|
||||||
rl.Pagination = &tfe.Pagination{
|
rl.Pagination = &tfe.Pagination{
|
||||||
|
@ -733,6 +745,9 @@ func (m *mockRuns) List(ctx context.Context, workspaceID string, options tfe.Run
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRuns) Create(ctx context.Context, options tfe.RunCreateOptions) (*tfe.Run, error) {
|
func (m *mockRuns) Create(ctx context.Context, options tfe.RunCreateOptions) (*tfe.Run, error) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
a, err := m.client.Applies.create(options.ConfigurationVersion.ID, options.Workspace.ID)
|
a, err := m.client.Applies.create(options.ConfigurationVersion.ID, options.Workspace.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -798,6 +813,9 @@ func (m *mockRuns) Create(ctx context.Context, options tfe.RunCreateOptions) (*t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRuns) Read(ctx context.Context, runID string) (*tfe.Run, error) {
|
func (m *mockRuns) Read(ctx context.Context, runID string) (*tfe.Run, error) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
r, ok := m.runs[runID]
|
r, ok := m.runs[runID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, tfe.ErrResourceNotFound
|
return nil, tfe.ErrResourceNotFound
|
||||||
|
@ -833,10 +851,19 @@ func (m *mockRuns) Read(ctx context.Context, runID string) (*tfe.Run, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
// we must return a copy for the client
|
||||||
|
rc, err := copystructure.Copy(r)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc.(*tfe.Run), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRuns) Apply(ctx context.Context, runID string, options tfe.RunApplyOptions) error {
|
func (m *mockRuns) Apply(ctx context.Context, runID string, options tfe.RunApplyOptions) error {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
r, ok := m.runs[runID]
|
r, ok := m.runs[runID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return tfe.ErrResourceNotFound
|
return tfe.ErrResourceNotFound
|
||||||
|
@ -859,6 +886,9 @@ func (m *mockRuns) ForceCancel(ctx context.Context, runID string, options tfe.Ru
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRuns) Discard(ctx context.Context, runID string, options tfe.RunDiscardOptions) error {
|
func (m *mockRuns) Discard(ctx context.Context, runID string, options tfe.RunDiscardOptions) error {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
r, ok := m.runs[runID]
|
r, ok := m.runs[runID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return tfe.ErrResourceNotFound
|
return tfe.ErrResourceNotFound
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/terraform/builtin/provisioners/puppet/bolt"
|
"github.com/hashicorp/terraform/builtin/provisioners/puppet/bolt"
|
||||||
|
@ -39,6 +40,8 @@ type provisioner struct {
|
||||||
instanceState *terraform.InstanceState
|
instanceState *terraform.InstanceState
|
||||||
output terraform.UIOutput
|
output terraform.UIOutput
|
||||||
comm communicator.Communicator
|
comm communicator.Communicator
|
||||||
|
|
||||||
|
outputWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
type csrAttributes struct {
|
type csrAttributes struct {
|
||||||
|
@ -297,8 +300,11 @@ func (p *provisioner) runCommand(command string) (stdout string, err error) {
|
||||||
outR, outW := io.Pipe()
|
outR, outW := io.Pipe()
|
||||||
errR, errW := io.Pipe()
|
errR, errW := io.Pipe()
|
||||||
outTee := io.TeeReader(outR, &stdoutBuffer)
|
outTee := io.TeeReader(outR, &stdoutBuffer)
|
||||||
|
|
||||||
|
p.outputWG.Add(2)
|
||||||
go p.copyToOutput(outTee)
|
go p.copyToOutput(outTee)
|
||||||
go p.copyToOutput(errR)
|
go p.copyToOutput(errR)
|
||||||
|
|
||||||
defer outW.Close()
|
defer outW.Close()
|
||||||
defer errW.Close()
|
defer errW.Close()
|
||||||
|
|
||||||
|
@ -315,12 +321,19 @@ func (p *provisioner) runCommand(command string) (stdout string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cmd.Wait()
|
err = cmd.Wait()
|
||||||
|
|
||||||
|
outW.Close()
|
||||||
|
errW.Close()
|
||||||
|
p.outputWG.Wait()
|
||||||
|
|
||||||
stdout = strings.TrimSpace(stdoutBuffer.String())
|
stdout = strings.TrimSpace(stdoutBuffer.String())
|
||||||
|
|
||||||
return stdout, err
|
return stdout, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *provisioner) copyToOutput(reader io.Reader) {
|
func (p *provisioner) copyToOutput(reader io.Reader) {
|
||||||
|
defer p.outputWG.Done()
|
||||||
|
|
||||||
lr := linereader.New(reader)
|
lr := linereader.New(reader)
|
||||||
for line := range lr.Ch {
|
for line := range lr.Ch {
|
||||||
p.output.Output(line)
|
p.output.Output(line)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
@ -40,21 +41,54 @@ type reattachConfigAddr struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type providerServer struct {
|
type providerServer struct {
|
||||||
|
sync.Mutex
|
||||||
*grpcplugin.GRPCProviderServer
|
*grpcplugin.GRPCProviderServer
|
||||||
planResourceChangeCalled bool
|
planResourceChangeCalled bool
|
||||||
applyResourceChangeCalled bool
|
applyResourceChangeCalled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *providerServer) PlanResourceChange(ctx context.Context, req *proto.PlanResourceChange_Request) (*proto.PlanResourceChange_Response, error) {
|
func (p *providerServer) PlanResourceChange(ctx context.Context, req *proto.PlanResourceChange_Request) (*proto.PlanResourceChange_Response, error) {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
p.planResourceChangeCalled = true
|
p.planResourceChangeCalled = true
|
||||||
return p.GRPCProviderServer.PlanResourceChange(ctx, req)
|
return p.GRPCProviderServer.PlanResourceChange(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *providerServer) ApplyResourceChange(ctx context.Context, req *proto.ApplyResourceChange_Request) (*proto.ApplyResourceChange_Response, error) {
|
func (p *providerServer) ApplyResourceChange(ctx context.Context, req *proto.ApplyResourceChange_Request) (*proto.ApplyResourceChange_Response, error) {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
p.applyResourceChangeCalled = true
|
p.applyResourceChangeCalled = true
|
||||||
return p.GRPCProviderServer.ApplyResourceChange(ctx, req)
|
return p.GRPCProviderServer.ApplyResourceChange(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *providerServer) PlanResourceChangeCalled() bool {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
return p.planResourceChangeCalled
|
||||||
|
}
|
||||||
|
func (p *providerServer) ResetPlanResourceChangeCalled() {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
p.planResourceChangeCalled = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *providerServer) ApplyResourceChangeCalled() bool {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
return p.applyResourceChangeCalled
|
||||||
|
}
|
||||||
|
func (p *providerServer) ResetApplyResourceChangeCalled() {
|
||||||
|
p.Lock()
|
||||||
|
defer p.Unlock()
|
||||||
|
|
||||||
|
p.applyResourceChangeCalled = false
|
||||||
|
}
|
||||||
|
|
||||||
func TestUnmanagedSeparatePlan(t *testing.T) {
|
func TestUnmanagedSeparatePlan(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -129,7 +163,7 @@ func TestUnmanagedSeparatePlan(t *testing.T) {
|
||||||
t.Fatalf("unexpected plan error: %s\nstderr:\n%s", err, stderr)
|
t.Fatalf("unexpected plan error: %s\nstderr:\n%s", err, stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !provider.planResourceChangeCalled {
|
if !provider.PlanResourceChangeCalled() {
|
||||||
t.Error("PlanResourceChange not called on in-process provider")
|
t.Error("PlanResourceChange not called on in-process provider")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,10 +173,10 @@ func TestUnmanagedSeparatePlan(t *testing.T) {
|
||||||
t.Fatalf("unexpected apply error: %s\nstderr:\n%s", err, stderr)
|
t.Fatalf("unexpected apply error: %s\nstderr:\n%s", err, stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !provider.applyResourceChangeCalled {
|
if !provider.ApplyResourceChangeCalled() {
|
||||||
t.Error("ApplyResourceChange not called on in-process provider")
|
t.Error("ApplyResourceChange not called on in-process provider")
|
||||||
}
|
}
|
||||||
provider.applyResourceChangeCalled = false
|
provider.ResetApplyResourceChangeCalled()
|
||||||
|
|
||||||
//// DESTROY
|
//// DESTROY
|
||||||
_, stderr, err = tf.Run("destroy", "-auto-approve")
|
_, stderr, err = tf.Run("destroy", "-auto-approve")
|
||||||
|
@ -150,7 +184,7 @@ func TestUnmanagedSeparatePlan(t *testing.T) {
|
||||||
t.Fatalf("unexpected destroy error: %s\nstderr:\n%s", err, stderr)
|
t.Fatalf("unexpected destroy error: %s\nstderr:\n%s", err, stderr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !provider.applyResourceChangeCalled {
|
if !provider.ApplyResourceChangeCalled() {
|
||||||
t.Error("ApplyResourceChange (destroy) not called on in-process provider")
|
t.Error("ApplyResourceChange (destroy) not called on in-process provider")
|
||||||
}
|
}
|
||||||
cancel()
|
cancel()
|
||||||
|
|
|
@ -562,11 +562,13 @@ The -target option is not for routine use, and is provided only for exceptional
|
||||||
p.Changes = c.changes
|
p.Changes = c.changes
|
||||||
|
|
||||||
c.refreshState.SyncWrapper().RemovePlannedResourceInstanceObjects()
|
c.refreshState.SyncWrapper().RemovePlannedResourceInstanceObjects()
|
||||||
p.State = c.refreshState.DeepCopy()
|
|
||||||
|
refreshedState := c.refreshState.DeepCopy()
|
||||||
|
p.State = refreshedState
|
||||||
|
|
||||||
// replace the working state with the updated state, so that immediate calls
|
// replace the working state with the updated state, so that immediate calls
|
||||||
// to Apply work as expected.
|
// to Apply work as expected.
|
||||||
c.state = c.refreshState
|
c.state = refreshedState
|
||||||
|
|
||||||
return p, diags
|
return p, diags
|
||||||
}
|
}
|
||||||
|
@ -743,12 +745,11 @@ func (c *Context) graphWalker(operation walkOperation) *ContextGraphWalker {
|
||||||
switch operation {
|
switch operation {
|
||||||
case walkValidate:
|
case walkValidate:
|
||||||
// validate should not use any state
|
// validate should not use any state
|
||||||
s := states.NewState()
|
state = states.NewState().SyncWrapper()
|
||||||
state = s.SyncWrapper()
|
|
||||||
|
|
||||||
// validate currently uses the plan graph, so we have to populate the
|
// validate currently uses the plan graph, so we have to populate the
|
||||||
// refreshState.
|
// refreshState.
|
||||||
refreshState = s.SyncWrapper()
|
refreshState = states.NewState().SyncWrapper()
|
||||||
|
|
||||||
case walkPlan:
|
case walkPlan:
|
||||||
state = c.state.SyncWrapper()
|
state = c.state.SyncWrapper()
|
||||||
|
|
|
@ -10135,9 +10135,15 @@ func TestContext2Apply_ProviderMeta_apply_set(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pmMu sync.Mutex
|
||||||
arcPMs := map[string]cty.Value{}
|
arcPMs := map[string]cty.Value{}
|
||||||
|
|
||||||
p.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
p.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
||||||
|
pmMu.Lock()
|
||||||
|
defer pmMu.Unlock()
|
||||||
arcPMs[req.TypeName] = req.ProviderMeta
|
arcPMs[req.TypeName] = req.ProviderMeta
|
||||||
|
|
||||||
s := req.PlannedState.AsValueMap()
|
s := req.PlannedState.AsValueMap()
|
||||||
s["id"] = cty.StringVal("ID")
|
s["id"] = cty.StringVal("ID")
|
||||||
return providers.ApplyResourceChangeResponse{
|
return providers.ApplyResourceChangeResponse{
|
||||||
|
@ -10209,9 +10215,13 @@ func TestContext2Apply_ProviderMeta_apply_unset(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
var pmMu sync.Mutex
|
||||||
arcPMs := map[string]cty.Value{}
|
arcPMs := map[string]cty.Value{}
|
||||||
p.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
p.ApplyResourceChangeFn = func(req providers.ApplyResourceChangeRequest) providers.ApplyResourceChangeResponse {
|
||||||
|
pmMu.Lock()
|
||||||
|
defer pmMu.Unlock()
|
||||||
arcPMs[req.TypeName] = req.ProviderMeta
|
arcPMs[req.TypeName] = req.ProviderMeta
|
||||||
|
|
||||||
s := req.PlannedState.AsValueMap()
|
s := req.PlannedState.AsValueMap()
|
||||||
s["id"] = cty.StringVal("ID")
|
s["id"] = cty.StringVal("ID")
|
||||||
return providers.ApplyResourceChangeResponse{
|
return providers.ApplyResourceChangeResponse{
|
||||||
|
|
|
@ -79,8 +79,8 @@ type BuiltinEvalContext struct {
|
||||||
var _ EvalContext = (*BuiltinEvalContext)(nil)
|
var _ EvalContext = (*BuiltinEvalContext)(nil)
|
||||||
|
|
||||||
func (ctx *BuiltinEvalContext) WithPath(path addrs.ModuleInstance) EvalContext {
|
func (ctx *BuiltinEvalContext) WithPath(path addrs.ModuleInstance) EvalContext {
|
||||||
ctx.pathSet = true
|
|
||||||
newCtx := *ctx
|
newCtx := *ctx
|
||||||
|
newCtx.pathSet = true
|
||||||
newCtx.PathValue = path
|
newCtx.PathValue = path
|
||||||
return &newCtx
|
return &newCtx
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue