[cloud] run tasks integration

This change will await the completion of pre-apply run tasks if they
exist on a run and then report the results.

It also adds an abstraction when interacting with cloud integrations such
as policy checking and cost estimation that simplify and unify output,
although I did not go so far as to refactor those callers to use it yet.
This commit is contained in:
uturunku1 2021-12-07 15:35:04 -08:00 committed by Sebastian Rivera
parent 0900c7e0bb
commit 3e9ae69a12
7 changed files with 421 additions and 3 deletions

1
go.mod
View File

@ -42,6 +42,7 @@ require (
github.com/hashicorp/go-plugin v1.4.3
github.com/hashicorp/go-retryablehttp v0.7.0
github.com/hashicorp/go-tfe v0.21.0
github.com/hashicorp/go-safetemp v1.0.0
github.com/hashicorp/go-uuid v1.0.2
github.com/hashicorp/go-version v1.3.0
github.com/hashicorp/hcl v0.0.0-20170504190234-a4b07c25de5f

4
go.sum
View File

@ -407,8 +407,8 @@ github.com/hashicorp/go-slug v0.7.0/go.mod h1:Ib+IWBYfEfJGI1ZyXMGNbu2BU+aa3Dzu41
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-tfe v0.21.0 h1:P1QoeLkigDi4BXGQ//42kyXwfcHUqbh5jJemml6iQJs=
github.com/hashicorp/go-tfe v0.21.0/go.mod h1:gyXLXbpBVxA2F/6opah8XBsOkZJxHYQmghl0OWi8keI=
github.com/hashicorp/go-tfe v0.20.1-0.20211210153426-243d1b1eb033 h1:SoQhT2l6xHh7LOprw1F6MPzxSk65kSXdyfubOs8D+Ho=
github.com/hashicorp/go-tfe v0.20.1-0.20211210153426-243d1b1eb033/go.mod h1:gyXLXbpBVxA2F/6opah8XBsOkZJxHYQmghl0OWi8keI=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=

View File

@ -299,7 +299,7 @@ func (b *Remote) costEstimate(stopCtx, cancelCtx context.Context, op *backend.Op
deltaRepr := strings.Replace(ce.DeltaMonthlyCost, "-", "", 1)
if b.CLI != nil {
b.CLI.Output(b.Colorize().Color(msgPrefix + ":\n"))
b.CLI.Output(b.Colorize().Color("[bold]" + msgPrefix + ":\n"))
b.CLI.Output(b.Colorize().Color(fmt.Sprintf("Resources: %d of %d estimated", ce.MatchedResourcesCount, ce.ResourcesCount)))
b.CLI.Output(b.Colorize().Color(fmt.Sprintf(" $%s/mo %s$%s", ce.ProposedMonthlyCost, sign, deltaRepr)))

View File

@ -351,9 +351,24 @@ in order to capture the filesystem context the remote workspace expects:
}
}
// Await collective run tasks
if len(r.TaskStage) > 0 {
context := NewIntegrationContext(stopCtx, cancelCtx, b, op, r)
err = b.runTasks(context, context.BeginOutput("Run Tasks (pre-apply)"), r.TaskStage[0].ID)
if err != nil {
return r, err
}
}
return r, nil
}
// String returns a pointer to the given string.
func String(v string) *string {
return &v
}
const planDefaultHeader = `
[reset][yellow]Running plan in Terraform Cloud. Output will stream here. Pressing Ctrl-C
will stop streaming the logs, but will not stop the plan running remotely.[reset]

View File

@ -0,0 +1,139 @@
package cloud
import (
"context"
"fmt"
"strings"
"github.com/hashicorp/go-tfe"
)
type taskResultSummary struct {
pending int
failed int
failedMandatory int
passed int
}
type taskStageReadFunc func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error)
func summarizeTaskResults(taskResults []*tfe.TaskResult) taskResultSummary {
var pe, er, erm, pa int
for _, task := range taskResults {
if task.Status == "running" || task.Status == "pending" {
pe++
} else if task.Status == "passed" {
pa++
} else {
// Everything else is a failure
er++
if task.WorkspaceTaskEnforcementLevel == "mandatory" {
erm++
}
}
}
return taskResultSummary{
pending: pe,
failed: er,
failedMandatory: erm,
passed: pa,
}
}
// elapsedMessageMax is 50 chars: the length of this message with 6 digits
// 99 tasks still pending, 99 passed, 99 failed ...
const elapsedMessageMax int = 50
func (b *Cloud) runTasksWithTaskResults(context *IntegrationContext, output IntegrationOutputWriter, fetchTaskStage taskStageReadFunc) error {
// TODO: Do not fetch run tasks if there are no changes to infrastructure
// if !context.Run.HasChanges {
// output.Output("No Changes. Tasks will not run")
// output.End()
// return nil
// }
return context.Poll(func(i int) (bool, error) {
// TODO: get the stage that corresponds to an argument passed to this function
stage, err := fetchTaskStage(b, context.StopContext)
if err != nil {
return false, generalError("Failed to retrieve pre-apply task stage", err)
}
summary := summarizeTaskResults(stage.TaskResults)
if summary.pending > 0 {
message := fmt.Sprintf("%d tasks still pending, %d passed, %d failed ... ", summary.pending, summary.passed, summary.failed)
if i%4 == 0 {
if i > 0 {
output.OutputElapsed(message, elapsedMessageMax)
}
}
return true, nil
}
// No more tasks pending/running. Print all the results.
// Track the first task name that is a mandatory enforcement level breach.
var firstMandatoryTaskFailed *string = nil
if i == 0 {
output.Output(fmt.Sprintf("All tasks completed! %d passed, %d failed", summary.passed, summary.failed))
} else {
output.OutputElapsed(fmt.Sprintf("All tasks completed! %d passed, %d failed", summary.passed, summary.failed), 50)
}
output.Output("")
for _, t := range stage.TaskResults {
capitalizedStatus := string(t.Status)
capitalizedStatus = strings.ToUpper(capitalizedStatus[:1]) + capitalizedStatus[1:]
status := "[green]" + capitalizedStatus
if t.Status != "passed" {
level := string(t.WorkspaceTaskEnforcementLevel)
level = strings.ToUpper(level[:1]) + level[1:]
status = fmt.Sprintf("[red]%s (%s)", capitalizedStatus, level)
if t.WorkspaceTaskEnforcementLevel == "mandatory" && firstMandatoryTaskFailed == nil {
firstMandatoryTaskFailed = &t.TaskName
}
}
title := fmt.Sprintf(`%s ⸺ %s`, t.TaskName, status)
output.SubOutput(title)
output.SubOutput(fmt.Sprintf("[dim]%s", t.Message))
output.SubOutput("")
}
// If a mandatory enforcement level is breached, return an error.
var taskErr error = nil
var overall string = "[green]Passed"
if firstMandatoryTaskFailed != nil {
overall = "[red]Failed"
taskErr = fmt.Errorf("the run failed because the run task, %s, is required to succeed", *firstMandatoryTaskFailed)
} else if summary.failed > 0 { // we have failures but none of them mandatory
overall = "[green]Passed with advisory failures"
}
output.SubOutput("")
output.SubOutput("[bold]Overall Result: " + overall)
output.End()
return false, taskErr
})
}
func (b *Cloud) runTasks(ctx *IntegrationContext, output IntegrationOutputWriter, stageID string) error {
return b.runTasksWithTaskResults(ctx, output, func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error) {
options := tfe.TaskStageReadOptions{
Include: "task_results",
}
return b.client.TaskStages.Read(ctx.StopContext, stageID, &options)
})
}

View File

@ -0,0 +1,146 @@
package cloud
import (
"context"
"strings"
"testing"
"github.com/hashicorp/go-tfe"
)
type testIntegrationOutput struct {
ctx *IntegrationContext
output *strings.Builder
t *testing.T
}
var _ IntegrationOutputWriter = (*testIntegrationOutput)(nil) // Compile time check
func (s *testIntegrationOutput) End() {
s.output.WriteString("END\n")
}
func (s *testIntegrationOutput) SubOutput(str string) {
s.output.WriteString(s.ctx.B.Colorize().Color("[reset]│ "+str) + "\n")
}
func (s *testIntegrationOutput) Output(str string) {
s.output.WriteString(s.ctx.B.Colorize().Color("[reset]│ ") + str + "\n")
}
func (s *testIntegrationOutput) OutputElapsed(message string, maxMessage int) {
s.output.WriteString("PENDING MESSAGE: " + message)
}
func newMockIntegrationContext(b *Cloud, t *testing.T) (*IntegrationContext, *testIntegrationOutput) {
ctx := context.Background()
// Retrieve the workspace used to run this operation in.
w, err := b.client.Workspaces.Read(ctx, b.organization, b.WorkspaceMapping.Name)
if err != nil {
t.Fatalf("error retrieving workspace: %v", err)
}
// Create a new configuration version.
c, err := b.client.ConfigurationVersions.Create(ctx, w.ID, tfe.ConfigurationVersionCreateOptions{})
if err != nil {
t.Fatalf("error creating configuration version: %v", err)
}
// Create a pending run to block this run.
r, err := b.client.Runs.Create(ctx, tfe.RunCreateOptions{
ConfigurationVersion: c,
Workspace: w,
})
if err != nil {
t.Fatalf("error creating pending run: %v", err)
}
op, configCleanup, done := testOperationPlan(t, "./testdata/plan")
defer configCleanup()
defer done(t)
integrationContext := &IntegrationContext{
B: b,
StopContext: ctx,
CancelContext: ctx,
Op: op,
Run: r,
}
return integrationContext, &testIntegrationOutput{
ctx: integrationContext,
output: &strings.Builder{},
t: t,
}
}
func TestCloud_runTasksWithTaskResults(t *testing.T) {
b, bCleanup := testBackendWithName(t)
defer bCleanup()
integrationContext, writer := newMockIntegrationContext(b, t)
cases := map[string]struct {
taskResults []*tfe.TaskResult
context *IntegrationContext
writer *testIntegrationOutput
expectedOutputs []string
isError bool
}{
"all-succeeded": {
taskResults: []*tfe.TaskResult{
{ID: "1", TaskName: "Mandatory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "mandatory"},
{ID: "2", TaskName: "Advisory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "advisory"},
},
writer: writer,
context: integrationContext,
expectedOutputs: []string{"Overall Result: Passed\n"},
isError: false,
},
"mandatory-failed": {
taskResults: []*tfe.TaskResult{
{ID: "1", TaskName: "Mandatory", Message: "500 Error", Status: "failed", WorkspaceTaskEnforcementLevel: "mandatory"},
{ID: "2", TaskName: "Advisory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "advisory"},
},
writer: writer,
context: integrationContext,
expectedOutputs: []string{"Passed\n", "A-OK\n", "Overall Result: Failed\n"},
isError: true,
},
"advisory-failed": {
taskResults: []*tfe.TaskResult{
{ID: "1", TaskName: "Mandatory", Message: "A-OK", Status: "passed", WorkspaceTaskEnforcementLevel: "mandatory"},
{ID: "2", TaskName: "Advisory", Message: "500 Error", Status: "failed", WorkspaceTaskEnforcementLevel: "advisory"},
},
writer: writer,
context: integrationContext,
expectedOutputs: []string{"Failed (Advisory)", "Overall Result: Passed with advisory failure"},
isError: false,
},
}
for caseName, c := range cases {
c.writer.output.Reset()
err := b.runTasksWithTaskResults(c.writer.ctx, c.writer, func(b *Cloud, stopCtx context.Context) (*tfe.TaskStage, error) {
return &tfe.TaskStage{
TaskResults: c.taskResults,
}, nil
})
if c.isError && err == nil {
t.Fatalf("Expected %s to be error", caseName)
}
if !c.isError && err != nil {
t.Errorf("Expected %s to not be error but received %s", caseName, err)
}
output := c.writer.output.String()
for _, expected := range c.expectedOutputs {
if !strings.Contains(output, expected) {
t.Fatalf("Expected output to contain '%s' but it was:\n\n%s", expected, output)
}
}
}
}

View File

@ -0,0 +1,117 @@
package cloud
import (
"context"
"fmt"
"strconv"
"time"
"github.com/hashicorp/go-tfe"
"github.com/hashicorp/terraform/internal/backend"
)
type IntegrationOutputWriter interface {
End()
OutputElapsed(message string, maxMessage int)
Output(str string)
SubOutput(str string)
}
type IntegrationContext struct {
started time.Time
B *Cloud
StopContext context.Context
CancelContext context.Context
Op *backend.Operation
Run *tfe.Run
}
type integrationCLIOutput struct {
ctx *IntegrationContext
}
var _ IntegrationOutputWriter = (*integrationCLIOutput)(nil) // Compile time check
func NewIntegrationContext(stopCtx, cancelCtx context.Context, b *Cloud, op *backend.Operation, r *tfe.Run) *IntegrationContext {
return &IntegrationContext{
B: b,
StopContext: stopCtx,
CancelContext: cancelCtx,
Op: op,
Run: r,
}
}
func (s *IntegrationContext) Poll(every func(i int) (bool, error)) error {
for i := 0; ; i++ {
select {
case <-s.StopContext.Done():
return s.StopContext.Err()
case <-s.CancelContext.Done():
return s.CancelContext.Err()
case <-time.After(backoff(backoffMin, backoffMax, i)):
// blocks for a time between min and max
}
cont, err := every(i)
if !cont {
return err
}
}
}
func (s *IntegrationContext) BeginOutput(name string) IntegrationOutputWriter {
var result IntegrationOutputWriter = &integrationCLIOutput{
ctx: s,
}
s.started = time.Now()
if s.HasCLI() {
s.B.CLI.Output("\n------------------------------------------------------------------------\n")
}
result.Output("[bold]" + name + ":\n")
return result
}
func (s *IntegrationContext) HasCLI() bool {
return s.B.CLI != nil
}
func (s *integrationCLIOutput) End() {
if !s.ctx.HasCLI() {
return
}
s.ctx.B.CLI.Output("\n------------------------------------------------------------------------\n")
}
func (s *integrationCLIOutput) Output(str string) {
if !s.ctx.HasCLI() {
return
}
s.ctx.B.CLI.Output(s.ctx.B.Colorize().Color(str))
}
func (s *integrationCLIOutput) SubOutput(str string) {
if !s.ctx.HasCLI() {
return
}
s.ctx.B.CLI.Output(s.ctx.B.Colorize().Color(fmt.Sprintf("[reset]│ %s", str)))
}
// Example pending output; the variable spacing (50 chars) allows up to 99 tasks (two digits) in each category:
// ---------------
// 13 tasks still pending, 0 passed, 0 failed ...
// 13 tasks still pending, 0 passed, 0 failed ... (8s elapsed)
// 13 tasks still pending, 0 passed, 0 failed ... (19s elapsed)
// 13 tasks still pending, 0 passed, 0 failed ... (33s elapsed)
func (s *integrationCLIOutput) OutputElapsed(message string, maxMessage int) {
if !s.ctx.HasCLI() {
return
}
elapsed := time.Since(s.ctx.started).Truncate(1 * time.Second)
s.ctx.B.CLI.Output(fmt.Sprintf("%-"+strconv.FormatInt(int64(maxMessage), 10)+"s", message) + s.ctx.B.Colorize().Color(fmt.Sprintf("[dim](%s elapsed)", elapsed)))
}