From 79c117877ee6544bf71d6bc41039b1ab4067a593 Mon Sep 17 00:00:00 2001 From: Jake Champlin Date: Fri, 3 Feb 2017 18:19:23 -0500 Subject: [PATCH] provider/nomad: Update jobspec parser Updates vendored Nomad jobspec parser such that parameterized nomad job files can no be parsed and used with Terraform. Also fixes tests to adhere to new jobspec version, and update documentation to reflect such as well. --- builtin/providers/nomad/resource_job_test.go | 77 +++- .../hashicorp/nomad/jobspec/parse.go | 104 ++++- .../hashicorp/nomad/nomad/structs/diff.go | 49 +++ .../hashicorp/nomad/nomad/structs/funcs.go | 103 ----- .../hashicorp/nomad/nomad/structs/structs.go | 365 +++++++++++++++--- .../docs/providers/nomad/r/job.html.markdown | 1 - 6 files changed, 503 insertions(+), 196 deletions(-) diff --git a/builtin/providers/nomad/resource_job_test.go b/builtin/providers/nomad/resource_job_test.go index 6562e2988..c43f5aa1f 100644 --- a/builtin/providers/nomad/resource_job_test.go +++ b/builtin/providers/nomad/resource_job_test.go @@ -1,6 +1,7 @@ package nomad import ( + "errors" "fmt" "strings" "testing" @@ -16,7 +17,7 @@ func TestResourceJob_basic(t *testing.T) { Providers: testProviders, PreCheck: func() { testAccPreCheck(t) }, Steps: []r.TestStep{ - r.TestStep{ + { Config: testResourceJob_initialConfig, Check: testResourceJob_initialCheck, }, @@ -31,14 +32,14 @@ func TestResourceJob_refresh(t *testing.T) { Providers: testProviders, PreCheck: func() { testAccPreCheck(t) }, Steps: []r.TestStep{ - r.TestStep{ + { Config: testResourceJob_initialConfig, Check: testResourceJob_initialCheck, }, // This should successfully cause the job to be recreated, // testing the Exists function. - r.TestStep{ + { PreConfig: testResourceJob_deregister(t, "foo"), Config: testResourceJob_initialConfig, }, @@ -51,20 +52,20 @@ func TestResourceJob_disableDestroyDeregister(t *testing.T) { Providers: testProviders, PreCheck: func() { testAccPreCheck(t) }, Steps: []r.TestStep{ - r.TestStep{ + { Config: testResourceJob_noDestroy, Check: testResourceJob_initialCheck, }, // Destroy with our setting set - r.TestStep{ + { Destroy: true, Config: testResourceJob_noDestroy, Check: testResourceJob_checkExists, }, // Re-apply without the setting set - r.TestStep{ + { Config: testResourceJob_initialConfig, Check: testResourceJob_checkExists, }, @@ -77,13 +78,13 @@ func TestResourceJob_idChange(t *testing.T) { Providers: testProviders, PreCheck: func() { testAccPreCheck(t) }, Steps: []r.TestStep{ - r.TestStep{ + { Config: testResourceJob_initialConfig, Check: testResourceJob_initialCheck, }, // Change our ID - r.TestStep{ + { Config: testResourceJob_updateConfig, Check: testResourceJob_updateCheck, }, @@ -91,6 +92,19 @@ func TestResourceJob_idChange(t *testing.T) { }) } +func TestResourceJob_parameterizedJob(t *testing.T) { + r.Test(t, r.TestCase{ + Providers: testProviders, + PreCheck: func() { testAccPreCheck(t) }, + Steps: []r.TestStep{ + { + Config: testResourceJob_parameterizedJob, + Check: testResourceJob_initialCheck, + }, + }, + }) +} + var testResourceJob_initialConfig = ` resource "nomad_job" "test" { jobspec = < 0 { + if err := parseParameterizedJob(&result.ParameterizedJob, o); err != nil { + return multierror.Prefix(err, "parameterized ->") + } + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { @@ -551,6 +563,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l "artifact", "config", "constraint", + "dispatch_payload", "driver", "env", "kill_timeout", @@ -573,6 +586,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l delete(m, "artifact") delete(m, "config") delete(m, "constraint") + delete(m, "dispatch_payload") delete(m, "env") delete(m, "logs") delete(m, "meta") @@ -716,6 +730,32 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l t.Vault = v } + // If we have a dispatch_payload block parse that + if o := listVal.Filter("dispatch_payload"); len(o.Items) > 0 { + if len(o.Items) > 1 { + return fmt.Errorf("only one dispatch_payload block is allowed in a task. Number of dispatch_payload blocks found: %d", len(o.Items)) + } + var m map[string]interface{} + dispatchBlock := o.Items[0] + + // Check for invalid keys + valid := []string{ + "file", + } + if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', dispatch_payload ->", n)) + } + + if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil { + return err + } + + t.DispatchPayload = &structs.DispatchPayloadConfig{} + if err := mapstructure.WeakDecode(m, t.DispatchPayload); err != nil { + return err + } + } + *result = append(*result, &t) } @@ -797,13 +837,13 @@ func parseTemplates(result *[]*structs.Template, list *ast.ObjectList) error { for _, o := range list.Elem().Items { // Check for invalid keys valid := []string{ - "source", - "destination", - "data", "change_mode", "change_signal", + "data", + "destination", + "perms", + "source", "splay", - "once", } if err := checkHCLKeys(o.Val, valid); err != nil { return err @@ -1188,6 +1228,40 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error { return nil } +func parseParameterizedJob(result **structs.ParameterizedJobConfig, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'parameterized' block allowed per job") + } + + // Get our resource object + o := list.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + // Check for invalid keys + valid := []string{ + "payload", + "meta_required", + "meta_optional", + } + if err := checkHCLKeys(o.Val, valid); err != nil { + return err + } + + // Build the parameterized job block + var d structs.ParameterizedJobConfig + if err := mapstructure.WeakDecode(m, &d); err != nil { + return err + } + + *result = &d + return nil +} + func checkHCLKeys(node ast.Node, valid []string) error { var list *ast.ObjectList switch n := node.(type) { diff --git a/vendor/github.com/hashicorp/nomad/nomad/structs/diff.go b/vendor/github.com/hashicorp/nomad/nomad/structs/diff.go index 719e1b67f..f4ec05e69 100644 --- a/vendor/github.com/hashicorp/nomad/nomad/structs/diff.go +++ b/vendor/github.com/hashicorp/nomad/nomad/structs/diff.go @@ -130,6 +130,11 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) { diff.Objects = append(diff.Objects, pDiff) } + // ParameterizedJob diff + if cDiff := parameterizedJobDiff(j.ParameterizedJob, other.ParameterizedJob, contextual); cDiff != nil { + diff.Objects = append(diff.Objects, cDiff) + } + return diff, nil } @@ -370,6 +375,12 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) { diff.Objects = append(diff.Objects, lDiff) } + // Dispatch payload diff + dDiff := primitiveObjectDiff(t.DispatchPayload, other.DispatchPayload, nil, "DispatchPayload", contextual) + if dDiff != nil { + diff.Objects = append(diff.Objects, dDiff) + } + // Artifacts diff diffs := primitiveObjectSetDiff( interfaceSlice(t.Artifacts), @@ -629,6 +640,44 @@ func vaultDiff(old, new *Vault, contextual bool) *ObjectDiff { return diff } +// parameterizedJobDiff returns the diff of two parameterized job objects. If +// contextual diff is enabled, all fields will be returned, even if no diff +// occurred. +func parameterizedJobDiff(old, new *ParameterizedJobConfig, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "ParameterizedJob"} + var oldPrimitiveFlat, newPrimitiveFlat map[string]string + + if reflect.DeepEqual(old, new) { + return nil + } else if old == nil { + old = &ParameterizedJobConfig{} + diff.Type = DiffTypeAdded + newPrimitiveFlat = flatmap.Flatten(new, nil, true) + } else if new == nil { + new = &ParameterizedJobConfig{} + diff.Type = DiffTypeDeleted + oldPrimitiveFlat = flatmap.Flatten(old, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimitiveFlat = flatmap.Flatten(old, nil, true) + newPrimitiveFlat = flatmap.Flatten(new, nil, true) + } + + // Diff the primitive fields. + diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // Meta diffs + if optionalDiff := stringSetDiff(old.MetaOptional, new.MetaOptional, "MetaOptional", contextual); optionalDiff != nil { + diff.Objects = append(diff.Objects, optionalDiff) + } + + if requiredDiff := stringSetDiff(old.MetaRequired, new.MetaRequired, "MetaRequired", contextual); requiredDiff != nil { + diff.Objects = append(diff.Objects, requiredDiff) + } + + return diff +} + // Diff returns a diff of two resource objects. If contextual diff is enabled, // non-changed fields will still be returned. func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff { diff --git a/vendor/github.com/hashicorp/nomad/nomad/structs/funcs.go b/vendor/github.com/hashicorp/nomad/nomad/structs/funcs.go index 104bb58b4..479e61389 100644 --- a/vendor/github.com/hashicorp/nomad/nomad/structs/funcs.go +++ b/vendor/github.com/hashicorp/nomad/nomad/structs/funcs.go @@ -169,72 +169,6 @@ func GenerateUUID() string { buf[10:16]) } -// Helpers for copying generic structures. -func CopyMapStringString(m map[string]string) map[string]string { - l := len(m) - if l == 0 { - return nil - } - - c := make(map[string]string, l) - for k, v := range m { - c[k] = v - } - return c -} - -func CopyMapStringInt(m map[string]int) map[string]int { - l := len(m) - if l == 0 { - return nil - } - - c := make(map[string]int, l) - for k, v := range m { - c[k] = v - } - return c -} - -func CopyMapStringFloat64(m map[string]float64) map[string]float64 { - l := len(m) - if l == 0 { - return nil - } - - c := make(map[string]float64, l) - for k, v := range m { - c[k] = v - } - return c -} - -func CopySliceString(s []string) []string { - l := len(s) - if l == 0 { - return nil - } - - c := make([]string, l) - for i, v := range s { - c[i] = v - } - return c -} - -func CopySliceInt(s []int) []int { - l := len(s) - if l == 0 { - return nil - } - - c := make([]int, l) - for i, v := range s { - c[i] = v - } - return c -} - func CopySliceConstraints(s []*Constraint) []*Constraint { l := len(s) if l == 0 { @@ -248,27 +182,6 @@ func CopySliceConstraints(s []*Constraint) []*Constraint { return c } -// SliceStringIsSubset returns whether the smaller set of strings is a subset of -// the larger. If the smaller slice is not a subset, the offending elements are -// returned. -func SliceStringIsSubset(larger, smaller []string) (bool, []string) { - largerSet := make(map[string]struct{}, len(larger)) - for _, l := range larger { - largerSet[l] = struct{}{} - } - - subset := true - var offending []string - for _, s := range smaller { - if _, ok := largerSet[s]; !ok { - subset = false - offending = append(offending, s) - } - } - - return subset, offending -} - // VaultPoliciesSet takes the structure returned by VaultPolicies and returns // the set of required policies func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { @@ -288,19 +201,3 @@ func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { } return flattened } - -// MapStringStringSliceValueSet returns the set of values in a map[string][]string -func MapStringStringSliceValueSet(m map[string][]string) []string { - set := make(map[string]struct{}) - for _, slice := range m { - for _, v := range slice { - set[v] = struct{}{} - } - } - - flat := make([]string, 0, len(set)) - for k := range set { - flat = append(flat, k) - } - return flat -} diff --git a/vendor/github.com/hashicorp/nomad/nomad/structs/structs.go b/vendor/github.com/hashicorp/nomad/nomad/structs/structs.go index 9f94bcacf..3fe27853e 100644 --- a/vendor/github.com/hashicorp/nomad/nomad/structs/structs.go +++ b/vendor/github.com/hashicorp/nomad/nomad/structs/structs.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" "github.com/mitchellh/copystructure" "github.com/ugorji/go/codec" @@ -249,7 +250,8 @@ type JobEvaluateRequest struct { // JobSpecificRequest is used when we just need to specify a target job type JobSpecificRequest struct { - JobID string + JobID string + AllAllocs bool QueryOptions } @@ -272,6 +274,14 @@ type JobSummaryRequest struct { QueryOptions } +// JobDispatchRequest is used to dispatch a job based on a parameterized job +type JobDispatchRequest struct { + JobID string + Payload []byte + Meta map[string]string + WriteRequest +} + // NodeListRequest is used to parameterize a list request type NodeListRequest struct { QueryOptions @@ -525,6 +535,14 @@ type JobSummaryResponse struct { QueryMeta } +type JobDispatchResponse struct { + DispatchedJobID string + EvalID string + EvalCreateIndex uint64 + JobCreateIndex uint64 + QueryMeta +} + // JobListResponse is used for a list request type JobListResponse struct { Jobs []*JobListStub @@ -746,11 +764,11 @@ func (n *Node) Copy() *Node { } nn := new(Node) *nn = *n - nn.Attributes = CopyMapStringString(nn.Attributes) + nn.Attributes = helper.CopyMapStringString(nn.Attributes) nn.Resources = nn.Resources.Copy() nn.Reserved = nn.Reserved.Copy() - nn.Links = CopyMapStringString(nn.Links) - nn.Meta = CopyMapStringString(nn.Meta) + nn.Links = helper.CopyMapStringString(nn.Links) + nn.Meta = helper.CopyMapStringString(nn.Meta) return nn } @@ -1062,39 +1080,6 @@ const ( CoreJobPriority = JobMaxPriority * 2 ) -// JobSummary summarizes the state of the allocations of a job -type JobSummary struct { - JobID string - Summary map[string]TaskGroupSummary - - // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 -} - -// Copy returns a new copy of JobSummary -func (js *JobSummary) Copy() *JobSummary { - newJobSummary := new(JobSummary) - *newJobSummary = *js - newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary)) - for k, v := range js.Summary { - newTGSummary[k] = v - } - newJobSummary.Summary = newTGSummary - return newJobSummary -} - -// TaskGroup summarizes the state of all the allocations of a particular -// TaskGroup -type TaskGroupSummary struct { - Queued int - Complete int - Failed int - Running int - Starting int - Lost int -} - // Job is the scope of a scheduling request to Nomad. It is the largest // scoped object, and is a named collection of task groups. Each task group // is further composed of tasks. A task group (TG) is the unit of scheduling @@ -1146,6 +1131,13 @@ type Job struct { // Periodic is used to define the interval the job is run at. Periodic *PeriodicConfig + // ParameterizedJob is used to specify the job as a parameterized job + // for dispatching. + ParameterizedJob *ParameterizedJobConfig + + // Payload is the payload supplied when the job was dispatched. + Payload []byte + // Meta is used to associate arbitrary metadata with this // job. This is opaque to Nomad. Meta map[string]string @@ -1179,6 +1171,10 @@ func (j *Job) Canonicalize() { for _, tg := range j.TaskGroups { tg.Canonicalize(j) } + + if j.ParameterizedJob != nil { + j.ParameterizedJob.Canonicalize() + } } // Copy returns a deep copy of the Job. It is expected that callers use recover. @@ -1189,7 +1185,7 @@ func (j *Job) Copy() *Job { } nj := new(Job) *nj = *j - nj.Datacenters = CopySliceString(nj.Datacenters) + nj.Datacenters = helper.CopySliceString(nj.Datacenters) nj.Constraints = CopySliceConstraints(nj.Constraints) if j.TaskGroups != nil { @@ -1201,7 +1197,8 @@ func (j *Job) Copy() *Job { } nj.Periodic = nj.Periodic.Copy() - nj.Meta = CopyMapStringString(nj.Meta) + nj.Meta = helper.CopyMapStringString(nj.Meta) + nj.ParameterizedJob = nj.ParameterizedJob.Copy() return nj } @@ -1276,6 +1273,17 @@ func (j *Job) Validate() error { } } + if j.IsParameterized() { + if j.Type != JobTypeBatch { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch)) + } + + if err := j.ParameterizedJob.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -1289,6 +1297,42 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup { return nil } +// CombinedTaskMeta takes a TaskGroup and Task name and returns the combined +// meta data for the task. When joining Job, Group and Task Meta, the precedence +// is by deepest scope (Task > Group > Job). +func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string { + group := j.LookupTaskGroup(groupName) + if group == nil { + return nil + } + + task := group.LookupTask(taskName) + if task == nil { + return nil + } + + meta := helper.CopyMapStringString(task.Meta) + if meta == nil { + meta = make(map[string]string, len(group.Meta)+len(j.Meta)) + } + + // Add the group specific meta + for k, v := range group.Meta { + if _, ok := meta[k]; !ok { + meta[k] = v + } + } + + // Add the job specific meta + for k, v := range j.Meta { + if _, ok := meta[k]; !ok { + meta[k] = v + } + } + + return meta +} + // Stub is used to return a summary of the job func (j *Job) Stub(summary *JobSummary) *JobListStub { return &JobListStub{ @@ -1311,6 +1355,11 @@ func (j *Job) IsPeriodic() bool { return j.Periodic != nil } +// IsParameterized returns whether a job is parameterized job. +func (j *Job) IsParameterized() bool { + return j.ParameterizedJob != nil +} + // VaultPolicies returns the set of Vault policies per task group, per task func (j *Job) VaultPolicies() map[string]map[string]*Vault { policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) @@ -1399,6 +1448,63 @@ type JobListStub struct { JobModifyIndex uint64 } +// JobSummary summarizes the state of the allocations of a job +type JobSummary struct { + JobID string + + // Summmary contains the summary per task group for the Job + Summary map[string]TaskGroupSummary + + // Children contains a summary for the children of this job. + Children *JobChildrenSummary + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +// Copy returns a new copy of JobSummary +func (js *JobSummary) Copy() *JobSummary { + newJobSummary := new(JobSummary) + *newJobSummary = *js + newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary)) + for k, v := range js.Summary { + newTGSummary[k] = v + } + newJobSummary.Summary = newTGSummary + newJobSummary.Children = newJobSummary.Children.Copy() + return newJobSummary +} + +// JobChildrenSummary contains the summary of children job statuses +type JobChildrenSummary struct { + Pending int64 + Running int64 + Dead int64 +} + +// Copy returns a new copy of a JobChildrenSummary +func (jc *JobChildrenSummary) Copy() *JobChildrenSummary { + if jc == nil { + return nil + } + + njc := new(JobChildrenSummary) + *njc = *jc + return njc +} + +// TaskGroup summarizes the state of all the allocations of a particular +// TaskGroup +type TaskGroupSummary struct { + Queued int + Complete int + Failed int + Running int + Starting int + Lost int +} + // UpdateStrategy is used to modify how updates are done type UpdateStrategy struct { // Stagger is the amount of time between the updates @@ -1525,6 +1631,96 @@ type PeriodicLaunch struct { ModifyIndex uint64 } +const ( + DispatchPayloadForbidden = "forbidden" + DispatchPayloadOptional = "optional" + DispatchPayloadRequired = "required" + + // DispatchLaunchSuffix is the string appended to the parameterized job's ID + // when dispatching instances of it. + DispatchLaunchSuffix = "/dispatch-" +) + +// ParameterizedJobConfig is used to configure the parameterized job +type ParameterizedJobConfig struct { + // Payload configure the payload requirements + Payload string + + // MetaRequired is metadata keys that must be specified by the dispatcher + MetaRequired []string `mapstructure:"meta_required"` + + // MetaOptional is metadata keys that may be specified by the dispatcher + MetaOptional []string `mapstructure:"meta_optional"` +} + +func (d *ParameterizedJobConfig) Validate() error { + var mErr multierror.Error + switch d.Payload { + case DispatchPayloadOptional, DispatchPayloadRequired, DispatchPayloadForbidden: + default: + multierror.Append(&mErr, fmt.Errorf("Unknown payload requirement: %q", d.Payload)) + } + + // Check that the meta configurations are disjoint sets + disjoint, offending := helper.SliceSetDisjoint(d.MetaRequired, d.MetaOptional) + if !disjoint { + multierror.Append(&mErr, fmt.Errorf("Required and optional meta keys should be disjoint. Following keys exist in both: %v", offending)) + } + + return mErr.ErrorOrNil() +} + +func (d *ParameterizedJobConfig) Canonicalize() { + if d.Payload == "" { + d.Payload = DispatchPayloadOptional + } +} + +func (d *ParameterizedJobConfig) Copy() *ParameterizedJobConfig { + if d == nil { + return nil + } + nd := new(ParameterizedJobConfig) + *nd = *d + nd.MetaOptional = helper.CopySliceString(nd.MetaOptional) + nd.MetaRequired = helper.CopySliceString(nd.MetaRequired) + return nd +} + +// DispatchedID returns an ID appropriate for a job dispatched against a +// particular parameterized job +func DispatchedID(templateID string, t time.Time) string { + u := GenerateUUID()[:8] + return fmt.Sprintf("%s%s%d-%s", templateID, DispatchLaunchSuffix, t.Unix(), u) +} + +// DispatchPayloadConfig configures how a task gets its input from a job dispatch +type DispatchPayloadConfig struct { + // File specifies a relative path to where the input data should be written + File string +} + +func (d *DispatchPayloadConfig) Copy() *DispatchPayloadConfig { + if d == nil { + return nil + } + nd := new(DispatchPayloadConfig) + *nd = *d + return nd +} + +func (d *DispatchPayloadConfig) Validate() error { + // Verify the destination doesn't escape + escaped, err := PathEscapesAllocDir("task/local/", d.File) + if err != nil { + return fmt.Errorf("invalid destination path: %v", err) + } else if escaped { + return fmt.Errorf("destination escapes allocation directory") + } + + return nil +} + var ( defaultServiceJobRestartPolicy = RestartPolicy{ Delay: 15 * time.Second, @@ -1656,7 +1852,7 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.Tasks = tasks } - ntg.Meta = CopyMapStringString(ntg.Meta) + ntg.Meta = helper.CopyMapStringString(ntg.Meta) if tg.EphemeralDisk != nil { ntg.EphemeralDisk = tg.EphemeralDisk.Copy() @@ -1919,7 +2115,7 @@ func (s *Service) Copy() *Service { } ns := new(Service) *ns = *s - ns.Tags = CopySliceString(ns.Tags) + ns.Tags = helper.CopySliceString(ns.Tags) if s.Checks != nil { checks := make([]*ServiceCheck, len(ns.Checks)) @@ -2076,6 +2272,9 @@ type Task struct { // Resources is the resources needed by this task Resources *Resources + // DispatchPayload configures how the task retrieves its input from a dispatch + DispatchPayload *DispatchPayloadConfig + // Meta is used to associate arbitrary metadata with this // task. This is opaque to Nomad. Meta map[string]string @@ -2098,7 +2297,7 @@ func (t *Task) Copy() *Task { } nt := new(Task) *nt = *t - nt.Env = CopyMapStringString(nt.Env) + nt.Env = helper.CopyMapStringString(nt.Env) if t.Services != nil { services := make([]*Service, len(nt.Services)) @@ -2112,7 +2311,8 @@ func (t *Task) Copy() *Task { nt.Vault = nt.Vault.Copy() nt.Resources = nt.Resources.Copy() - nt.Meta = CopyMapStringString(nt.Meta) + nt.Meta = helper.CopyMapStringString(nt.Meta) + nt.DispatchPayload = nt.DispatchPayload.Copy() if t.Artifacts != nil { artifacts := make([]*TaskArtifact, 0, len(t.Artifacts)) @@ -2277,6 +2477,13 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error { } } + // Validate the dispatch payload block if there + if t.DispatchPayload != nil { + if err := t.DispatchPayload.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Dispatch Payload validation failed: %v", err)) + } + } + return mErr.ErrorOrNil() } @@ -2294,10 +2501,13 @@ func validateServices(t *Task) error { outer := fmt.Errorf("service[%d] %+q validation failed: %s", i, service.Name, err) mErr.Errors = append(mErr.Errors, outer) } - if _, ok := knownServices[service.Name]; ok { + + // Ensure that services with the same name are not being registered for + // the same port + if _, ok := knownServices[service.Name+service.PortLabel]; ok { mErr.Errors = append(mErr.Errors, fmt.Errorf("service %q is duplicate", service.Name)) } - knownServices[service.Name] = struct{}{} + knownServices[service.Name+service.PortLabel] = struct{}{} if service.PortLabel != "" { servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name) @@ -2379,6 +2589,9 @@ type Template struct { // random wait between 0 and the given splay value before signalling the // application of a change Splay time.Duration `mapstructure:"splay"` + + // Perms is the permission the file should be written out with. + Perms string `mapstructure:"perms"` } // DefaultTemplate returns a default template. @@ -2386,6 +2599,7 @@ func DefaultTemplate() *Template { return &Template{ ChangeMode: TemplateChangeModeRestart, Splay: 5 * time.Second, + Perms: "0644", } } @@ -2418,7 +2632,7 @@ func (t *Template) Validate() error { } // Verify the destination doesn't escape - escaped, err := PathEscapesAllocDir(t.DestPath) + escaped, err := PathEscapesAllocDir("task", t.DestPath) if err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) } else if escaped { @@ -2441,6 +2655,13 @@ func (t *Template) Validate() error { multierror.Append(&mErr, fmt.Errorf("Must specify positive splay value")) } + // Verify the permissions + if t.Perms != "" { + if _, err := strconv.ParseUint(t.Perms, 8, 12); err != nil { + multierror.Append(&mErr, fmt.Errorf("Failed to parse %q as octal: %v", t.Perms, err)) + } + } + return mErr.ErrorOrNil() } @@ -2555,6 +2776,11 @@ const ( // TaskSiblingFailed indicates that a sibling task in the task group has // failed. TaskSiblingFailed = "Sibling task failed" + + // TaskDriverMessage is an informational event message emitted by + // drivers such as when they're performing a long running action like + // downloading an image. + TaskDriverMessage = "Driver" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -2613,6 +2839,9 @@ type TaskEvent struct { // TaskSignal is the signal that was sent to the task TaskSignal string + + // DriverMessage indicates a driver action being taken. + DriverMessage string } func (te *TaskEvent) GoString() string { @@ -2741,6 +2970,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent { return e } +func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent { + e.DriverMessage = m + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter @@ -2761,7 +2995,7 @@ func (ta *TaskArtifact) Copy() *TaskArtifact { } nta := new(TaskArtifact) *nta = *ta - nta.GetterOptions = CopyMapStringString(ta.GetterOptions) + nta.GetterOptions = helper.CopyMapStringString(ta.GetterOptions) return nta } @@ -2770,14 +3004,16 @@ func (ta *TaskArtifact) GoString() string { } // PathEscapesAllocDir returns if the given path escapes the allocation -// directory -func PathEscapesAllocDir(path string) (bool, error) { +// directory. The prefix allows adding a prefix if the path will be joined, for +// example a "task/local" prefix may be provided if the path will be joined +// against that prefix. +func PathEscapesAllocDir(prefix, path string) (bool, error) { // Verify the destination doesn't escape the tasks directory - alloc, err := filepath.Abs(filepath.Join("/", "foo/", "bar/")) + alloc, err := filepath.Abs(filepath.Join("/", "alloc-dir/", "alloc-id/")) if err != nil { return false, err } - abs, err := filepath.Abs(filepath.Join(alloc, path)) + abs, err := filepath.Abs(filepath.Join(alloc, prefix, path)) if err != nil { return false, err } @@ -2796,11 +3032,11 @@ func (ta *TaskArtifact) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified")) } - escaped, err := PathEscapesAllocDir(ta.RelativeDest) + escaped, err := PathEscapesAllocDir("task", ta.RelativeDest) if err != nil { mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) } else if escaped { - mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes task's directory")) + mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes allocation directory")) } // Verify the checksum @@ -3311,12 +3547,12 @@ func (a *AllocMetric) Copy() *AllocMetric { } na := new(AllocMetric) *na = *a - na.NodesAvailable = CopyMapStringInt(na.NodesAvailable) - na.ClassFiltered = CopyMapStringInt(na.ClassFiltered) - na.ConstraintFiltered = CopyMapStringInt(na.ConstraintFiltered) - na.ClassExhausted = CopyMapStringInt(na.ClassExhausted) - na.DimensionExhausted = CopyMapStringInt(na.DimensionExhausted) - na.Scores = CopyMapStringFloat64(na.Scores) + na.NodesAvailable = helper.CopyMapStringInt(na.NodesAvailable) + na.ClassFiltered = helper.CopyMapStringInt(na.ClassFiltered) + na.ConstraintFiltered = helper.CopyMapStringInt(na.ConstraintFiltered) + na.ClassExhausted = helper.CopyMapStringInt(na.ClassExhausted) + na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted) + na.Scores = helper.CopyMapStringFloat64(na.Scores) return na } @@ -3829,7 +4065,7 @@ type RecoverableError struct { // NewRecoverableError is used to wrap an error and mark it as recoverable or // not. -func NewRecoverableError(e error, recoverable bool) *RecoverableError { +func NewRecoverableError(e error, recoverable bool) error { if e == nil { return nil } @@ -3843,3 +4079,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError { func (r *RecoverableError) Error() string { return r.Err } + +// IsRecoverable returns true if error is a RecoverableError with +// Recoverable=true. Otherwise false is returned. +func IsRecoverable(e error) bool { + if re, ok := e.(*RecoverableError); ok { + return re.Recoverable + } + return false +} diff --git a/website/source/docs/providers/nomad/r/job.html.markdown b/website/source/docs/providers/nomad/r/job.html.markdown index 7936d5cf6..776b8aed7 100644 --- a/website/source/docs/providers/nomad/r/job.html.markdown +++ b/website/source/docs/providers/nomad/r/job.html.markdown @@ -50,7 +50,6 @@ job "foo" { resources { cpu = 20 memory = 10 - disk = 100 } logs {