Merge pull request #19047 from hashicorp/f-watch-queues

backend/remote: print status updates while waiting for the run to start
This commit is contained in:
Sander van Harmelen 2018-10-11 20:31:07 +02:00 committed by GitHub
commit c3e9d78873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 347 additions and 32 deletions

View File

@ -32,7 +32,7 @@ plugin-dev: generate
# we run this one package at a time here because running the entire suite in
# one command creates memory usage issues when running in Travis-CI.
test: fmtcheck generate
go list $(TEST) | xargs -t -n4 go test $(TESTARGS) -timeout=60s -parallel=4
go list $(TEST) | xargs -t -n4 go test $(TESTARGS) -timeout=2m -parallel=4
# testacc runs acceptance tests
testacc: fmtcheck generate

View File

@ -148,19 +148,23 @@ func (b *Remote) opApply(stopCtx, cancelCtx context.Context, op *backend.Operati
}
}
r, err = b.waitForRun(stopCtx, cancelCtx, op, "apply", r, w)
if err != nil {
return r, err
}
if b.CLI != nil {
// Insert a blank line to separate the ouputs.
b.CLI.Output("")
}
logs, err := b.client.Applies.Logs(stopCtx, r.Apply.ID)
if err != nil {
return r, generalError("error retrieving logs", err)
}
scanner := bufio.NewScanner(logs)
skip := 0
for scanner.Scan() {
// Skip the first 3 lines to prevent duplicate output.
if skip < 3 {
skip++
continue
}
if b.CLI != nil {
b.CLI.Output(b.Colorize().Color(scanner.Text()))
}
@ -369,6 +373,4 @@ will cancel the remote apply if its still pending. If the apply started it
will stop streaming the logs, but will not stop the apply running remotely.
To view this run in a browser, visit:
https://%s/app/%s/%s/runs/%s[reset]
Waiting for the apply to start...
`

View File

@ -310,6 +310,36 @@ func (m *mockOrganizations) Delete(ctx context.Context, name string) error {
return nil
}
func (m *mockOrganizations) Capacity(ctx context.Context, name string) (*tfe.Capacity, error) {
var pending, running int
for _, r := range m.client.Runs.runs {
if r.Status == tfe.RunPending {
pending++
continue
}
running++
}
return &tfe.Capacity{Pending: pending, Running: running}, nil
}
func (m *mockOrganizations) RunQueue(ctx context.Context, name string, options tfe.RunQueueOptions) (*tfe.RunQueue, error) {
rq := &tfe.RunQueue{}
for _, r := range m.client.Runs.runs {
rq.Items = append(rq.Items, r)
}
rq.Pagination = &tfe.Pagination{
CurrentPage: 1,
NextPage: 1,
PreviousPage: 1,
TotalPages: 1,
TotalCount: len(rq.Items),
}
return rq, nil
}
type mockPlans struct {
client *mockClient
logs map[string]string
@ -629,6 +659,14 @@ func (m *mockRuns) Create(ctx context.Context, options tfe.RunCreateOptions) (*t
r.IsDestroy = *options.IsDestroy
}
w, ok := m.client.Workspaces.workspaceIDs[options.Workspace.ID]
if !ok {
return nil, tfe.ErrResourceNotFound
}
if w.CurrentRun == nil {
w.CurrentRun = r
}
m.runs[r.ID] = r
m.workspaces[options.Workspace.ID] = append(m.workspaces[options.Workspace.ID], r)
@ -691,6 +729,10 @@ func (m *mockRuns) Cancel(ctx context.Context, runID string, options tfe.RunCanc
panic("not implemented")
}
func (m *mockRuns) ForceCancel(ctx context.Context, runID string, options tfe.RunForceCancelOptions) error {
panic("not implemented")
}
func (m *mockRuns) Discard(ctx context.Context, runID string, options tfe.RunDiscardOptions) error {
panic("not implemented")
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"strings"
@ -181,11 +182,6 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation,
}()
}
r, err = b.client.Runs.Read(stopCtx, r.ID)
if err != nil {
return r, generalError("error retrieving run", err)
}
if b.CLI != nil {
header := planDefaultHeader
if op.Type == backend.OperationTypeApply {
@ -195,6 +191,16 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation,
header, b.hostname, b.organization, op.Workspace, r.ID)) + "\n"))
}
r, err = b.waitForRun(stopCtx, cancelCtx, op, "plan", r, w)
if err != nil {
return r, err
}
if b.CLI != nil {
// Insert a blank line to separate the ouputs.
b.CLI.Output("")
}
logs, err := b.client.Plans.Logs(stopCtx, r.Plan.ID)
if err != nil {
return r, generalError("error retrieving logs", err)
@ -213,6 +219,178 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation,
return r, nil
}
// backoff will perform exponential backoff based on the iteration and
// limited by the provided min and max (in milliseconds) durations.
func backoff(min, max float64, iter int) time.Duration {
backoff := math.Pow(2, float64(iter)/5) * min
if backoff > max {
backoff = max
}
return time.Duration(backoff) * time.Millisecond
}
func (b *Remote) waitForRun(stopCtx, cancelCtx context.Context, op *backend.Operation, opType string, r *tfe.Run, w *tfe.Workspace) (*tfe.Run, error) {
started := time.Now()
updated := started
for i := 0; ; i++ {
select {
case <-stopCtx.Done():
return r, stopCtx.Err()
case <-cancelCtx.Done():
return r, cancelCtx.Err()
case <-time.After(backoff(1000, 3000, i)):
// Timer up, show status
}
// Retrieve the run to get its current status.
r, err := b.client.Runs.Read(stopCtx, r.ID)
if err != nil {
return r, generalError("error retrieving run", err)
}
// Return if the run is no longer pending.
if r.Status != tfe.RunPending && r.Status != tfe.RunConfirmed {
if i == 0 && b.CLI != nil {
b.CLI.Output(b.Colorize().Color(fmt.Sprintf("Waiting for the %s to start...", opType)))
}
return r, nil
}
// Check if 30 seconds have passed since the last update.
current := time.Now()
if b.CLI != nil && (i == 0 || current.Sub(updated).Seconds() > 30) {
updated = current
position := 0
elapsed := ""
// Calculate and set the elapsed time.
if i > 0 {
elapsed = fmt.Sprintf(
" (%s elapsed)", current.Sub(started).Truncate(30*time.Second))
}
// Retrieve the workspace used to run this operation in.
w, err = b.client.Workspaces.Read(stopCtx, b.organization, w.Name)
if err != nil {
return nil, generalError("error retrieving workspace", err)
}
// If the workspace is locked the run will not be queued and we can
// update the status without making any expensive calls.
if w.Locked && w.CurrentRun != nil {
cr, err := b.client.Runs.Read(stopCtx, w.CurrentRun.ID)
if err != nil {
return r, generalError("error retrieving current run", err)
}
if cr.Status == tfe.RunPending {
b.CLI.Output(b.Colorize().Color(
"Waiting for the manually locked workspace to be unlocked..." + elapsed))
continue
}
}
// Skip checking the workspace queue when we are the current run.
if w.CurrentRun == nil || w.CurrentRun.ID != r.ID {
found := false
options := tfe.RunListOptions{}
runlist:
for {
rl, err := b.client.Runs.List(stopCtx, w.ID, options)
if err != nil {
return r, generalError("error retrieving run list", err)
}
// Loop through all runs to calculate the workspace queue position.
for _, item := range rl.Items {
if !found {
if r.ID == item.ID {
found = true
}
continue
}
// If the run is in a final state, ignore it and continue.
switch item.Status {
case tfe.RunApplied, tfe.RunCanceled, tfe.RunDiscarded, tfe.RunErrored:
continue
case tfe.RunPlanned:
if op.Type == backend.OperationTypePlan {
continue
}
}
// Increase the workspace queue position.
position++
// Stop searching when we reached the current run.
if w.CurrentRun != nil && w.CurrentRun.ID == item.ID {
break runlist
}
}
// Exit the loop when we've seen all pages.
if rl.CurrentPage >= rl.TotalPages {
break
}
// Update the page number to get the next page.
options.PageNumber = rl.NextPage
}
if position > 0 {
b.CLI.Output(b.Colorize().Color(fmt.Sprintf(
"Waiting for %d run(s) to finish before being queued...%s",
position,
elapsed,
)))
continue
}
}
options := tfe.RunQueueOptions{}
search:
for {
rq, err := b.client.Organizations.RunQueue(stopCtx, b.organization, options)
if err != nil {
return r, generalError("error retrieving queue", err)
}
// Search through all queued items to find our run.
for _, item := range rq.Items {
if r.ID == item.ID {
position = item.PositionInQueue
break search
}
}
// Exit the loop when we've seen all pages.
if rq.CurrentPage >= rq.TotalPages {
break
}
// Update the page number to get the next page.
options.PageNumber = rq.NextPage
}
if position > 0 {
c, err := b.client.Organizations.Capacity(stopCtx, b.organization)
if err != nil {
return r, generalError("error retrieving capacity", err)
}
b.CLI.Output(b.Colorize().Color(fmt.Sprintf(
"Waiting for %d queued run(s) to finish before starting...%s",
position-c.Running,
elapsed,
)))
continue
}
b.CLI.Output(b.Colorize().Color(fmt.Sprintf(
"Waiting for the %s to start...%s", opType, elapsed)))
}
}
}
const planErrNoQueueRunRights = `
Insufficient rights to generate a plan!
@ -289,8 +467,6 @@ const planDefaultHeader = `
will stop streaming the logs, but will not stop the plan running remotely.
To view this run in a browser, visit:
https://%s/app/%s/%s/runs/%s[reset]
Waiting for the plan to start...
`
// The newline in this error is to make it look good in the CLI!

View File

@ -31,6 +31,12 @@ type Organizations interface {
// Delete an organization by its name.
Delete(ctx context.Context, organization string) error
// Capacity shows the current run capacity of an organization.
Capacity(ctx context.Context, organization string) (*Capacity, error)
// RunQueue shows the current run queue of an organization.
RunQueue(ctx context.Context, organization string, options RunQueueOptions) (*RunQueue, error)
}
// organizations implements Organizations.
@ -80,6 +86,19 @@ type Organization struct {
TwoFactorConformant bool `jsonapi:"attr,two-factor-conformant"`
}
// Capacity represents the current run capacity of an organization.
type Capacity struct {
Organization string `jsonapi:"primary,organization-capacity"`
Pending int `jsonapi:"attr,pending"`
Running int `jsonapi:"attr,running"`
}
// RunQueue represents the current run queue of an organization.
type RunQueue struct {
*Pagination
Items []*Run
}
// OrganizationPermissions represents the organization permissions.
type OrganizationPermissions struct {
CanCreateTeam bool `json:"can-create-team"`
@ -242,3 +261,50 @@ func (s *organizations) Delete(ctx context.Context, organization string) error {
return s.client.do(ctx, req, nil)
}
// Capacity shows the currently used capacity of an organization.
func (s *organizations) Capacity(ctx context.Context, organization string) (*Capacity, error) {
if !validStringID(&organization) {
return nil, errors.New("Invalid value for organization")
}
u := fmt.Sprintf("organizations/%s/capacity", url.QueryEscape(organization))
req, err := s.client.newRequest("GET", u, nil)
if err != nil {
return nil, err
}
c := &Capacity{}
err = s.client.do(ctx, req, c)
if err != nil {
return nil, err
}
return c, nil
}
// RunQueueOptions represents the options for showing the queue.
type RunQueueOptions struct {
ListOptions
}
// RunQueue shows the current run queue of an organization.
func (s *organizations) RunQueue(ctx context.Context, organization string, options RunQueueOptions) (*RunQueue, error) {
if !validStringID(&organization) {
return nil, errors.New("Invalid value for organization")
}
u := fmt.Sprintf("organizations/%s/runs/queue", url.QueryEscape(organization))
req, err := s.client.newRequest("GET", u, &options)
if err != nil {
return nil, err
}
rq := &RunQueue{}
err = s.client.do(ctx, req, rq)
if err != nil {
return nil, err
}
return rq, nil
}

View File

@ -31,6 +31,9 @@ type Runs interface {
// Cancel a run by its ID.
Cancel(ctx context.Context, runID string, options RunCancelOptions) error
// Force-cancel a run by its ID.
ForceCancel(ctx context.Context, runID string, options RunForceCancelOptions) error
// Discard a run by its ID.
Discard(ctx context.Context, runID string, options RunDiscardOptions) error
}
@ -77,16 +80,18 @@ type RunList struct {
// Run represents a Terraform Enterprise run.
type Run struct {
ID string `jsonapi:"primary,runs"`
Actions *RunActions `jsonapi:"attr,actions"`
CreatedAt time.Time `jsonapi:"attr,created-at,iso8601"`
HasChanges bool `jsonapi:"attr,has-changes"`
IsDestroy bool `jsonapi:"attr,is-destroy"`
Message string `jsonapi:"attr,message"`
Permissions *RunPermissions `jsonapi:"attr,permissions"`
Source RunSource `jsonapi:"attr,source"`
Status RunStatus `jsonapi:"attr,status"`
StatusTimestamps *RunStatusTimestamps `jsonapi:"attr,status-timestamps"`
ID string `jsonapi:"primary,runs"`
Actions *RunActions `jsonapi:"attr,actions"`
CreatedAt time.Time `jsonapi:"attr,created-at,iso8601"`
ForceCancelAvailableAt time.Time `jsonapi:"attr,force-cancel-available-at,iso8601"`
HasChanges bool `jsonapi:"attr,has-changes"`
IsDestroy bool `jsonapi:"attr,is-destroy"`
Message string `jsonapi:"attr,message"`
Permissions *RunPermissions `jsonapi:"attr,permissions"`
PositionInQueue int `jsonapi:"attr,position-in-queue"`
Source RunSource `jsonapi:"attr,source"`
Status RunStatus `jsonapi:"attr,status"`
StatusTimestamps *RunStatusTimestamps `jsonapi:"attr,status-timestamps"`
// Relations
Apply *Apply `jsonapi:"relation,apply"`
@ -98,9 +103,10 @@ type Run struct {
// RunActions represents the run actions.
type RunActions struct {
IsCancelable bool `json:"is-cancelable"`
IsConfirmable bool `json:"is-confirmable"`
IsDiscardable bool `json:"is-discardable"`
IsCancelable bool `json:"is-cancelable"`
IsConfirmable bool `json:"is-confirmable"`
IsDiscardable bool `json:"is-discardable"`
IsForceCancelable bool `json:"is-force-cancelable"`
}
// RunPermissions represents the run permissions.
@ -108,6 +114,7 @@ type RunPermissions struct {
CanApply bool `json:"can-apply"`
CanCancel bool `json:"can-cancel"`
CanDiscard bool `json:"can-discard"`
CanForceCancel bool `json:"can-force-cancel"`
CanForceExecute bool `json:"can-force-execute"`
}
@ -259,6 +266,27 @@ func (s *runs) Cancel(ctx context.Context, runID string, options RunCancelOption
return s.client.do(ctx, req, nil)
}
// RunCancelOptions represents the options for force-canceling a run.
type RunForceCancelOptions struct {
// An optional comment explaining the reason for the force-cancel.
Comment *string `json:"comment,omitempty"`
}
// ForceCancel is used to forcefully cancel a run by its ID.
func (s *runs) ForceCancel(ctx context.Context, runID string, options RunForceCancelOptions) error {
if !validStringID(&runID) {
return errors.New("Invalid value for run ID")
}
u := fmt.Sprintf("runs/%s/actions/force-cancel", url.QueryEscape(runID))
req, err := s.client.newRequest("POST", u, &options)
if err != nil {
return err
}
return s.client.do(ctx, req, nil)
}
// RunDiscardOptions represents the options for discarding a run.
type RunDiscardOptions struct {
// An optional explanation for why the run was discarded.

View File

@ -72,6 +72,7 @@ type Workspace struct {
WorkingDirectory string `jsonapi:"attr,working-directory"`
// Relations
CurrentRun *Run `jsonapi:"relation,current-run"`
Organization *Organization `jsonapi:"relation,organization"`
SSHKey *SSHKey `jsonapi:"relation,ssh-key"`
}

6
vendor/vendor.json vendored
View File

@ -1804,10 +1804,10 @@
"revisionTime": "2018-07-12T07:51:27Z"
},
{
"checksumSHA1": "V2A92CHPEiPIsU4Wepl0ukznka8=",
"checksumSHA1": "9EZuhp7LWTAVsTDpP9DzajjmJxg=",
"path": "github.com/hashicorp/go-tfe",
"revision": "cca0c15746d89219f9732f6c07d267827fef25cd",
"revisionTime": "2018-09-20T19:42:22Z"
"revision": "ed986a3b38aba4630ca6ae7dbc876eb0d0c95c57",
"revisionTime": "2018-10-10T13:21:10Z"
},
{
"checksumSHA1": "85XUnluYJL7F55ptcwdmN8eSOsk=",