From 3e9ae69a1298e9a08c6b338d13d5b7e1fe090793 Mon Sep 17 00:00:00 2001 From: uturunku1 Date: Tue, 7 Dec 2021 15:35:04 -0800 Subject: [PATCH] [cloud] run tasks integration This change will await the completion of pre-apply run tasks if they exist on a run and then report the results. It also adds an abstraction when interacting with cloud integrations such as policy checking and cost estimation that simplify and unify output, although I did not go so far as to refactor those callers to use it yet. --- go.mod | 1 + go.sum | 4 +- internal/backend/remote/backend_common.go | 2 +- internal/cloud/backend_plan.go | 15 +++ internal/cloud/backend_runTasks.go | 139 ++++++++++++++++++++ internal/cloud/backend_runTasks_test.go | 146 ++++++++++++++++++++++ internal/cloud/cloud_integration.go | 117 +++++++++++++++++ 7 files changed, 421 insertions(+), 3 deletions(-) create mode 100644 internal/cloud/backend_runTasks.go create mode 100644 internal/cloud/backend_runTasks_test.go create mode 100644 internal/cloud/cloud_integration.go diff --git a/go.mod b/go.mod index 2cbdb0fc0..8b780419b 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/hashicorp/go-plugin v1.4.3 github.com/hashicorp/go-retryablehttp v0.7.0 github.com/hashicorp/go-tfe v0.21.0 + github.com/hashicorp/go-safetemp v1.0.0 github.com/hashicorp/go-uuid v1.0.2 github.com/hashicorp/go-version v1.3.0 github.com/hashicorp/hcl v0.0.0-20170504190234-a4b07c25de5f diff --git a/go.sum b/go.sum index 447f84bbb..e8ef3c7f2 100644 --- a/go.sum +++ b/go.sum @@ -407,8 +407,8 @@ github.com/hashicorp/go-slug v0.7.0/go.mod h1:Ib+IWBYfEfJGI1ZyXMGNbu2BU+aa3Dzu41 github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= -github.com/hashicorp/go-tfe v0.21.0 h1:P1QoeLkigDi4BXGQ//42kyXwfcHUqbh5jJemml6iQJs= -github.com/hashicorp/go-tfe v0.21.0/go.mod h1:gyXLXbpBVxA2F/6opah8XBsOkZJxHYQmghl0OWi8keI= +github.com/hashicorp/go-tfe v0.20.1-0.20211210153426-243d1b1eb033 h1:SoQhT2l6xHh7LOprw1F6MPzxSk65kSXdyfubOs8D+Ho= +github.com/hashicorp/go-tfe v0.20.1-0.20211210153426-243d1b1eb033/go.mod h1:gyXLXbpBVxA2F/6opah8XBsOkZJxHYQmghl0OWi8keI= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= diff --git a/internal/backend/remote/backend_common.go b/internal/backend/remote/backend_common.go index dc102f594..07437f3a3 100644 --- a/internal/backend/remote/backend_common.go +++ b/internal/backend/remote/backend_common.go @@ -299,7 +299,7 @@ func (b *Remote) costEstimate(stopCtx, cancelCtx context.Context, op *backend.Op deltaRepr := strings.Replace(ce.DeltaMonthlyCost, "-", "", 1) if b.CLI != nil { - b.CLI.Output(b.Colorize().Color(msgPrefix + ":\n")) + b.CLI.Output(b.Colorize().Color("[bold]" + msgPrefix + ":\n")) b.CLI.Output(b.Colorize().Color(fmt.Sprintf("Resources: %d of %d estimated", ce.MatchedResourcesCount, ce.ResourcesCount))) b.CLI.Output(b.Colorize().Color(fmt.Sprintf(" $%s/mo %s$%s", ce.ProposedMonthlyCost, sign, deltaRepr))) diff --git a/internal/cloud/backend_plan.go b/internal/cloud/backend_plan.go index 1956a9c82..2ec1439ea 100644 --- a/internal/cloud/backend_plan.go +++ b/internal/cloud/backend_plan.go @@ -351,9 +351,24 @@ in order to capture the filesystem context the remote workspace expects: } } + // Await collective run tasks + if len(r.TaskStage) > 0 { + context := NewIntegrationContext(stopCtx, cancelCtx, b, op, r) + err = b.runTasks(context, context.BeginOutput("Run Tasks (pre-apply)"), r.TaskStage[0].ID) + + if err != nil { + return r, err + } + } + return r, nil } +// String returns a pointer to the given string. +func String(v string) *string { + return &v +} + const planDefaultHeader = ` [reset][yellow]Running plan in Terraform Cloud. Output will stream here. Pressing Ctrl-C will stop streaming the logs, but will not stop the plan running remotely.[reset] diff --git a/internal/cloud/backend_runTasks.go b/internal/cloud/backend_runTasks.go new file mode 100644 index 000000000..8ef8679e4 --- /dev/null +++ b/internal/cloud/backend_runTasks.go @@ -0,0 +1,139 @@ +package cloud + +import ( + "context" + "fmt" + "strings" + + "github.com/hashicorp/go-tfe" +) + +type taskResultSummary struct { + pending int + failed int + failedMandatory int + passed int +} + +type taskStageReadFunc func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error) + +func summarizeTaskResults(taskResults []*tfe.TaskResult) taskResultSummary { + var pe, er, erm, pa int + for _, task := range taskResults { + if task.Status == "running" || task.Status == "pending" { + pe++ + } else if task.Status == "passed" { + pa++ + } else { + // Everything else is a failure + er++ + if task.WorkspaceTaskEnforcementLevel == "mandatory" { + erm++ + } + } + } + + return taskResultSummary{ + pending: pe, + failed: er, + failedMandatory: erm, + passed: pa, + } +} + +// elapsedMessageMax is 50 chars: the length of this message with 6 digits +// 99 tasks still pending, 99 passed, 99 failed ... +const elapsedMessageMax int = 50 + +func (b *Cloud) runTasksWithTaskResults(context *IntegrationContext, output IntegrationOutputWriter, fetchTaskStage taskStageReadFunc) error { + // TODO: Do not fetch run tasks if there are no changes to infrastructure + // if !context.Run.HasChanges { + // output.Output("No Changes. Tasks will not run") + // output.End() + + // return nil + // } + + return context.Poll(func(i int) (bool, error) { + // TODO: get the stage that corresponds to an argument passed to this function + stage, err := fetchTaskStage(b, context.StopContext) + + if err != nil { + return false, generalError("Failed to retrieve pre-apply task stage", err) + } + + summary := summarizeTaskResults(stage.TaskResults) + if summary.pending > 0 { + message := fmt.Sprintf("%d tasks still pending, %d passed, %d failed ... ", summary.pending, summary.passed, summary.failed) + + if i%4 == 0 { + if i > 0 { + output.OutputElapsed(message, elapsedMessageMax) + } + } + return true, nil + } + + // No more tasks pending/running. Print all the results. + + // Track the first task name that is a mandatory enforcement level breach. + var firstMandatoryTaskFailed *string = nil + + if i == 0 { + output.Output(fmt.Sprintf("All tasks completed! %d passed, %d failed", summary.passed, summary.failed)) + } else { + output.OutputElapsed(fmt.Sprintf("All tasks completed! %d passed, %d failed", summary.passed, summary.failed), 50) + } + + output.Output("") + + for _, t := range stage.TaskResults { + capitalizedStatus := string(t.Status) + capitalizedStatus = strings.ToUpper(capitalizedStatus[:1]) + capitalizedStatus[1:] + + status := "[green]" + capitalizedStatus + if t.Status != "passed" { + level := string(t.WorkspaceTaskEnforcementLevel) + level = strings.ToUpper(level[:1]) + level[1:] + status = fmt.Sprintf("[red]%s (%s)", capitalizedStatus, level) + + if t.WorkspaceTaskEnforcementLevel == "mandatory" && firstMandatoryTaskFailed == nil { + firstMandatoryTaskFailed = &t.TaskName + } + } + + title := fmt.Sprintf(`%s ⸺ %s`, t.TaskName, status) + output.SubOutput(title) + + output.SubOutput(fmt.Sprintf("[dim]%s", t.Message)) + output.SubOutput("") + } + + // If a mandatory enforcement level is breached, return an error. + var taskErr error = nil + var overall string = "[green]Passed" + if firstMandatoryTaskFailed != nil { + overall = "[red]Failed" + taskErr = fmt.Errorf("the run failed because the run task, %s, is required to succeed", *firstMandatoryTaskFailed) + } else if summary.failed > 0 { // we have failures but none of them mandatory + overall = "[green]Passed with advisory failures" + } + + output.SubOutput("") + output.SubOutput("[bold]Overall Result: " + overall) + + output.End() + + return false, taskErr + }) +} + +func (b *Cloud) runTasks(ctx *IntegrationContext, output IntegrationOutputWriter, stageID string) error { + return b.runTasksWithTaskResults(ctx, output, func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error) { + options := tfe.TaskStageReadOptions{ + Include: "task_results", + } + + return b.client.TaskStages.Read(ctx.StopContext, stageID, &options) + }) +} diff --git a/internal/cloud/backend_runTasks_test.go b/internal/cloud/backend_runTasks_test.go new file mode 100644 index 000000000..9cb133cd5 --- /dev/null +++ b/internal/cloud/backend_runTasks_test.go @@ -0,0 +1,146 @@ +package cloud + +import ( + "context" + "strings" + "testing" + + "github.com/hashicorp/go-tfe" +) + +type testIntegrationOutput struct { + ctx *IntegrationContext + output *strings.Builder + t *testing.T +} + +var _ IntegrationOutputWriter = (*testIntegrationOutput)(nil) // Compile time check + +func (s *testIntegrationOutput) End() { + s.output.WriteString("END\n") +} + +func (s *testIntegrationOutput) SubOutput(str string) { + s.output.WriteString(s.ctx.B.Colorize().Color("[reset]│ "+str) + "\n") +} + +func (s *testIntegrationOutput) Output(str string) { + s.output.WriteString(s.ctx.B.Colorize().Color("[reset]│ ") + str + "\n") +} + +func (s *testIntegrationOutput) OutputElapsed(message string, maxMessage int) { + s.output.WriteString("PENDING MESSAGE: " + message) +} + +func newMockIntegrationContext(b *Cloud, t *testing.T) (*IntegrationContext, *testIntegrationOutput) { + ctx := context.Background() + + // Retrieve the workspace used to run this operation in. + w, err := b.client.Workspaces.Read(ctx, b.organization, b.WorkspaceMapping.Name) + if err != nil { + t.Fatalf("error retrieving workspace: %v", err) + } + + // Create a new configuration version. + c, err := b.client.ConfigurationVersions.Create(ctx, w.ID, tfe.ConfigurationVersionCreateOptions{}) + if err != nil { + t.Fatalf("error creating configuration version: %v", err) + } + + // Create a pending run to block this run. + r, err := b.client.Runs.Create(ctx, tfe.RunCreateOptions{ + ConfigurationVersion: c, + Workspace: w, + }) + if err != nil { + t.Fatalf("error creating pending run: %v", err) + } + + op, configCleanup, done := testOperationPlan(t, "./testdata/plan") + defer configCleanup() + defer done(t) + + integrationContext := &IntegrationContext{ + B: b, + StopContext: ctx, + CancelContext: ctx, + Op: op, + Run: r, + } + + return integrationContext, &testIntegrationOutput{ + ctx: integrationContext, + output: &strings.Builder{}, + t: t, + } +} + +func TestCloud_runTasksWithTaskResults(t *testing.T) { + b, bCleanup := testBackendWithName(t) + defer bCleanup() + + integrationContext, writer := newMockIntegrationContext(b, t) + + cases := map[string]struct { + taskResults []*tfe.TaskResult + context *IntegrationContext + writer *testIntegrationOutput + expectedOutputs []string + isError bool + }{ + "all-succeeded": { + taskResults: []*tfe.TaskResult{ + {ID: "1", TaskName: "Mandatory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "mandatory"}, + {ID: "2", TaskName: "Advisory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "advisory"}, + }, + writer: writer, + context: integrationContext, + expectedOutputs: []string{"Overall Result: Passed\n"}, + isError: false, + }, + "mandatory-failed": { + taskResults: []*tfe.TaskResult{ + {ID: "1", TaskName: "Mandatory", Message: "500 Error", Status: "failed", WorkspaceTaskEnforcementLevel: "mandatory"}, + {ID: "2", TaskName: "Advisory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "advisory"}, + }, + writer: writer, + context: integrationContext, + expectedOutputs: []string{"Passed\n", "A-OK\n", "Overall Result: Failed\n"}, + isError: true, + }, + "advisory-failed": { + taskResults: []*tfe.TaskResult{ + {ID: "1", TaskName: "Mandatory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "mandatory"}, + {ID: "2", TaskName: "Advisory", Message: "500 Error", Status: "failed", WorkspaceTaskEnforcementLevel: "advisory"}, + }, + writer: writer, + context: integrationContext, + expectedOutputs: []string{"Failed (Advisory)", "Overall Result: Passed with advisory failure"}, + isError: false, + }, + } + + for caseName, c := range cases { + c.writer.output.Reset() + err := b.runTasksWithTaskResults(c.writer.ctx, c.writer, func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error) { + return &tfe.TaskStage{ + TaskResults: c.taskResults, + }, nil + }) + + if c.isError && err == nil { + t.Fatalf("Expected %s to be error", caseName) + } + + if !c.isError && err != nil { + t.Errorf("Expected %s to not be error but received %s", caseName, err) + } + + output := c.writer.output.String() + for _, expected := range c.expectedOutputs { + if !strings.Contains(output, expected) { + t.Fatalf("Expected output to contain '%s' but it was:\n\n%s", expected, output) + } + } + } +} diff --git a/internal/cloud/cloud_integration.go b/internal/cloud/cloud_integration.go new file mode 100644 index 000000000..168e1ae26 --- /dev/null +++ b/internal/cloud/cloud_integration.go @@ -0,0 +1,117 @@ +package cloud + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/hashicorp/go-tfe" + "github.com/hashicorp/terraform/internal/backend" +) + +type IntegrationOutputWriter interface { + End() + OutputElapsed(message string, maxMessage int) + Output(str string) + SubOutput(str string) +} + +type IntegrationContext struct { + started time.Time + B *Cloud + StopContext context.Context + CancelContext context.Context + Op *backend.Operation + Run *tfe.Run +} + +type integrationCLIOutput struct { + ctx *IntegrationContext +} + +var _ IntegrationOutputWriter = (*integrationCLIOutput)(nil) // Compile time check + +func NewIntegrationContext(stopCtx, cancelCtx context.Context, b *Cloud, op *backend.Operation, r *tfe.Run) *IntegrationContext { + return &IntegrationContext{ + B: b, + StopContext: stopCtx, + CancelContext: cancelCtx, + Op: op, + Run: r, + } +} + +func (s *IntegrationContext) Poll(every func(i int) (bool, error)) error { + for i := 0; ; i++ { + select { + case <-s.StopContext.Done(): + return s.StopContext.Err() + case <-s.CancelContext.Done(): + return s.CancelContext.Err() + case <-time.After(backoff(backoffMin, backoffMax, i)): + // blocks for a time between min and max + } + + cont, err := every(i) + if !cont { + return err + } + } +} + +func (s *IntegrationContext) BeginOutput(name string) IntegrationOutputWriter { + var result IntegrationOutputWriter = &integrationCLIOutput{ + ctx: s, + } + + s.started = time.Now() + + if s.HasCLI() { + s.B.CLI.Output("\n------------------------------------------------------------------------\n") + } + + result.Output("[bold]" + name + ":\n") + + return result +} + +func (s *IntegrationContext) HasCLI() bool { + return s.B.CLI != nil +} + +func (s *integrationCLIOutput) End() { + if !s.ctx.HasCLI() { + return + } + + s.ctx.B.CLI.Output("\n------------------------------------------------------------------------\n") +} + +func (s *integrationCLIOutput) Output(str string) { + if !s.ctx.HasCLI() { + return + } + s.ctx.B.CLI.Output(s.ctx.B.Colorize().Color(str)) +} + +func (s *integrationCLIOutput) SubOutput(str string) { + if !s.ctx.HasCLI() { + return + } + s.ctx.B.CLI.Output(s.ctx.B.Colorize().Color(fmt.Sprintf("[reset]│ %s", str))) +} + +// Example pending output; the variable spacing (50 chars) allows up to 99 tasks (two digits) in each category: +// --------------- +// 13 tasks still pending, 0 passed, 0 failed ... +// 13 tasks still pending, 0 passed, 0 failed ... (8s elapsed) +// 13 tasks still pending, 0 passed, 0 failed ... (19s elapsed) +// 13 tasks still pending, 0 passed, 0 failed ... (33s elapsed) +func (s *integrationCLIOutput) OutputElapsed(message string, maxMessage int) { + if !s.ctx.HasCLI() { + return + } + elapsed := time.Since(s.ctx.started).Truncate(1 * time.Second) + s.ctx.B.CLI.Output(fmt.Sprintf("%-"+strconv.FormatInt(int64(maxMessage), 10)+"s", message) + s.ctx.B.Colorize().Color(fmt.Sprintf("[dim](%s elapsed)", elapsed))) +}