provider/nomad: Update jobspec parser

Updates vendored Nomad jobspec parser such that parameterized nomad job files can no be parsed and used with Terraform.
Also fixes tests to adhere to new jobspec version, and update documentation to reflect such as well.
This commit is contained in:
Jake Champlin 2017-02-03 18:19:23 -05:00
parent b6636159ad
commit 79c117877e
No known key found for this signature in database
GPG Key ID: DC31F41958EF4AC2
6 changed files with 503 additions and 196 deletions

View File

@ -1,6 +1,7 @@
package nomad
import (
"errors"
"fmt"
"strings"
"testing"
@ -16,7 +17,7 @@ func TestResourceJob_basic(t *testing.T) {
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
r.TestStep{
{
Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck,
},
@ -31,14 +32,14 @@ func TestResourceJob_refresh(t *testing.T) {
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
r.TestStep{
{
Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck,
},
// This should successfully cause the job to be recreated,
// testing the Exists function.
r.TestStep{
{
PreConfig: testResourceJob_deregister(t, "foo"),
Config: testResourceJob_initialConfig,
},
@ -51,20 +52,20 @@ func TestResourceJob_disableDestroyDeregister(t *testing.T) {
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
r.TestStep{
{
Config: testResourceJob_noDestroy,
Check: testResourceJob_initialCheck,
},
// Destroy with our setting set
r.TestStep{
{
Destroy: true,
Config: testResourceJob_noDestroy,
Check: testResourceJob_checkExists,
},
// Re-apply without the setting set
r.TestStep{
{
Config: testResourceJob_initialConfig,
Check: testResourceJob_checkExists,
},
@ -77,13 +78,13 @@ func TestResourceJob_idChange(t *testing.T) {
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
r.TestStep{
{
Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck,
},
// Change our ID
r.TestStep{
{
Config: testResourceJob_updateConfig,
Check: testResourceJob_updateCheck,
},
@ -91,6 +92,19 @@ func TestResourceJob_idChange(t *testing.T) {
})
}
func TestResourceJob_parameterizedJob(t *testing.T) {
r.Test(t, r.TestCase{
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
{
Config: testResourceJob_parameterizedJob,
Check: testResourceJob_initialCheck,
},
},
})
}
var testResourceJob_initialConfig = `
resource "nomad_job" "test" {
jobspec = <<EOT
@ -108,7 +122,6 @@ job "foo" {
resources {
cpu = 20
memory = 10
disk = 100
}
logs {
@ -140,7 +153,6 @@ job "foo" {
resources {
cpu = 20
memory = 10
disk = 100
}
logs {
@ -157,12 +169,12 @@ EOT
func testResourceJob_initialCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["nomad_job.test"]
if resourceState == nil {
return fmt.Errorf("resource not found in state")
return errors.New("resource not found in state")
}
instanceState := resourceState.Primary
if instanceState == nil {
return fmt.Errorf("resource has no primary instance")
return errors.New("resource has no primary instance")
}
jobID := instanceState.ID
@ -200,7 +212,7 @@ func testResourceJob_checkDestroy(jobID string) r.TestCheckFunc {
return nil
}
if err == nil {
err = fmt.Errorf("not destroyed")
err = errors.New("not destroyed")
}
return err
@ -234,7 +246,6 @@ job "bar" {
resources {
cpu = 20
memory = 10
disk = 100
}
logs {
@ -251,12 +262,12 @@ EOT
func testResourceJob_updateCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["nomad_job.test"]
if resourceState == nil {
return fmt.Errorf("resource not found in state")
return errors.New("resource not found in state")
}
instanceState := resourceState.Primary
if instanceState == nil {
return fmt.Errorf("resource has no primary instance")
return errors.New("resource has no primary instance")
}
jobID := instanceState.ID
@ -275,9 +286,41 @@ func testResourceJob_updateCheck(s *terraform.State) error {
// Verify foo doesn't exist
_, _, err := client.Jobs().Info("foo", nil)
if err == nil {
return fmt.Errorf("reading foo success")
return errors.New("reading foo success")
}
}
return nil
}
var testResourceJob_parameterizedJob = `
resource "nomad_job" "test" {
jobspec = <<EOT
job "bar" {
datacenters = ["dc1"]
type = "batch"
parameterized {
payload = "required"
}
group "foo" {
task "foo" {
driver = "raw_exec"
config {
command = "/bin/sleep"
args = ["1"]
}
resources {
cpu = 20
memory = 10
}
logs {
max_files = 3
max_file_size = 10
}
}
}
}
EOT
}
`

View File

@ -83,10 +83,13 @@ func ParseFile(path string) (*structs.Job, error) {
}
func parseJob(result *structs.Job, list *ast.ObjectList) error {
list = list.Children()
if len(list.Items) != 1 {
return fmt.Errorf("only one 'job' block allowed")
}
list = list.Children()
if len(list.Items) != 1 {
return fmt.Errorf("'job' block missing name")
}
// Get our job object
obj := list.Items[0]
@ -101,6 +104,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
delete(m, "update")
delete(m, "periodic")
delete(m, "vault")
delete(m, "parameterized")
// Set the ID and name to the object key
result.ID = obj.Keys[0].Token.Value().(string)
@ -126,19 +130,20 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
// Check for invalid keys
valid := []string{
"id",
"name",
"region",
"all_at_once",
"type",
"priority",
"datacenters",
"constraint",
"update",
"periodic",
"meta",
"task",
"datacenters",
"parameterized",
"group",
"id",
"meta",
"name",
"periodic",
"priority",
"region",
"task",
"type",
"update",
"vault",
"vault_token",
}
@ -167,6 +172,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
}
}
// If we have a parameterized definition, then parse that
if o := listVal.Filter("parameterized"); len(o.Items) > 0 {
if err := parseParameterizedJob(&result.ParameterizedJob, o); err != nil {
return multierror.Prefix(err, "parameterized ->")
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
@ -551,6 +563,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"artifact",
"config",
"constraint",
"dispatch_payload",
"driver",
"env",
"kill_timeout",
@ -573,6 +586,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
delete(m, "artifact")
delete(m, "config")
delete(m, "constraint")
delete(m, "dispatch_payload")
delete(m, "env")
delete(m, "logs")
delete(m, "meta")
@ -716,6 +730,32 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
t.Vault = v
}
// If we have a dispatch_payload block parse that
if o := listVal.Filter("dispatch_payload"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return fmt.Errorf("only one dispatch_payload block is allowed in a task. Number of dispatch_payload blocks found: %d", len(o.Items))
}
var m map[string]interface{}
dispatchBlock := o.Items[0]
// Check for invalid keys
valid := []string{
"file",
}
if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', dispatch_payload ->", n))
}
if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil {
return err
}
t.DispatchPayload = &structs.DispatchPayloadConfig{}
if err := mapstructure.WeakDecode(m, t.DispatchPayload); err != nil {
return err
}
}
*result = append(*result, &t)
}
@ -797,13 +837,13 @@ func parseTemplates(result *[]*structs.Template, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys
valid := []string{
"source",
"destination",
"data",
"change_mode",
"change_signal",
"data",
"destination",
"perms",
"source",
"splay",
"once",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
@ -1188,6 +1228,40 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
return nil
}
func parseParameterizedJob(result **structs.ParameterizedJobConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'parameterized' block allowed per job")
}
// Get our resource object
o := list.Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
// Check for invalid keys
valid := []string{
"payload",
"meta_required",
"meta_optional",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}
// Build the parameterized job block
var d structs.ParameterizedJobConfig
if err := mapstructure.WeakDecode(m, &d); err != nil {
return err
}
*result = &d
return nil
}
func checkHCLKeys(node ast.Node, valid []string) error {
var list *ast.ObjectList
switch n := node.(type) {

View File

@ -130,6 +130,11 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
diff.Objects = append(diff.Objects, pDiff)
}
// ParameterizedJob diff
if cDiff := parameterizedJobDiff(j.ParameterizedJob, other.ParameterizedJob, contextual); cDiff != nil {
diff.Objects = append(diff.Objects, cDiff)
}
return diff, nil
}
@ -370,6 +375,12 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, lDiff)
}
// Dispatch payload diff
dDiff := primitiveObjectDiff(t.DispatchPayload, other.DispatchPayload, nil, "DispatchPayload", contextual)
if dDiff != nil {
diff.Objects = append(diff.Objects, dDiff)
}
// Artifacts diff
diffs := primitiveObjectSetDiff(
interfaceSlice(t.Artifacts),
@ -629,6 +640,44 @@ func vaultDiff(old, new *Vault, contextual bool) *ObjectDiff {
return diff
}
// parameterizedJobDiff returns the diff of two parameterized job objects. If
// contextual diff is enabled, all fields will be returned, even if no diff
// occurred.
func parameterizedJobDiff(old, new *ParameterizedJobConfig, contextual bool) *ObjectDiff {
diff := &ObjectDiff{Type: DiffTypeNone, Name: "ParameterizedJob"}
var oldPrimitiveFlat, newPrimitiveFlat map[string]string
if reflect.DeepEqual(old, new) {
return nil
} else if old == nil {
old = &ParameterizedJobConfig{}
diff.Type = DiffTypeAdded
newPrimitiveFlat = flatmap.Flatten(new, nil, true)
} else if new == nil {
new = &ParameterizedJobConfig{}
diff.Type = DiffTypeDeleted
oldPrimitiveFlat = flatmap.Flatten(old, nil, true)
} else {
diff.Type = DiffTypeEdited
oldPrimitiveFlat = flatmap.Flatten(old, nil, true)
newPrimitiveFlat = flatmap.Flatten(new, nil, true)
}
// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual)
// Meta diffs
if optionalDiff := stringSetDiff(old.MetaOptional, new.MetaOptional, "MetaOptional", contextual); optionalDiff != nil {
diff.Objects = append(diff.Objects, optionalDiff)
}
if requiredDiff := stringSetDiff(old.MetaRequired, new.MetaRequired, "MetaRequired", contextual); requiredDiff != nil {
diff.Objects = append(diff.Objects, requiredDiff)
}
return diff
}
// Diff returns a diff of two resource objects. If contextual diff is enabled,
// non-changed fields will still be returned.
func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff {

View File

@ -169,72 +169,6 @@ func GenerateUUID() string {
buf[10:16])
}
// Helpers for copying generic structures.
func CopyMapStringString(m map[string]string) map[string]string {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]string, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringInt(m map[string]int) map[string]int {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]int, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringFloat64(m map[string]float64) map[string]float64 {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]float64, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopySliceString(s []string) []string {
l := len(s)
if l == 0 {
return nil
}
c := make([]string, l)
for i, v := range s {
c[i] = v
}
return c
}
func CopySliceInt(s []int) []int {
l := len(s)
if l == 0 {
return nil
}
c := make([]int, l)
for i, v := range s {
c[i] = v
}
return c
}
func CopySliceConstraints(s []*Constraint) []*Constraint {
l := len(s)
if l == 0 {
@ -248,27 +182,6 @@ func CopySliceConstraints(s []*Constraint) []*Constraint {
return c
}
// SliceStringIsSubset returns whether the smaller set of strings is a subset of
// the larger. If the smaller slice is not a subset, the offending elements are
// returned.
func SliceStringIsSubset(larger, smaller []string) (bool, []string) {
largerSet := make(map[string]struct{}, len(larger))
for _, l := range larger {
largerSet[l] = struct{}{}
}
subset := true
var offending []string
for _, s := range smaller {
if _, ok := largerSet[s]; !ok {
subset = false
offending = append(offending, s)
}
}
return subset, offending
}
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
@ -288,19 +201,3 @@ func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
}
return flattened
}
// MapStringStringSliceValueSet returns the set of values in a map[string][]string
func MapStringStringSliceValueSet(m map[string][]string) []string {
set := make(map[string]struct{})
for _, slice := range m {
for _, v := range slice {
set[v] = struct{}{}
}
}
flat := make([]string, 0, len(set))
for k := range set {
flat = append(flat, k)
}
return flat
}

View File

@ -24,6 +24,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
@ -249,7 +250,8 @@ type JobEvaluateRequest struct {
// JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct {
JobID string
JobID string
AllAllocs bool
QueryOptions
}
@ -272,6 +274,14 @@ type JobSummaryRequest struct {
QueryOptions
}
// JobDispatchRequest is used to dispatch a job based on a parameterized job
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
WriteRequest
}
// NodeListRequest is used to parameterize a list request
type NodeListRequest struct {
QueryOptions
@ -525,6 +535,14 @@ type JobSummaryResponse struct {
QueryMeta
}
type JobDispatchResponse struct {
DispatchedJobID string
EvalID string
EvalCreateIndex uint64
JobCreateIndex uint64
QueryMeta
}
// JobListResponse is used for a list request
type JobListResponse struct {
Jobs []*JobListStub
@ -746,11 +764,11 @@ func (n *Node) Copy() *Node {
}
nn := new(Node)
*nn = *n
nn.Attributes = CopyMapStringString(nn.Attributes)
nn.Attributes = helper.CopyMapStringString(nn.Attributes)
nn.Resources = nn.Resources.Copy()
nn.Reserved = nn.Reserved.Copy()
nn.Links = CopyMapStringString(nn.Links)
nn.Meta = CopyMapStringString(nn.Meta)
nn.Links = helper.CopyMapStringString(nn.Links)
nn.Meta = helper.CopyMapStringString(nn.Meta)
return nn
}
@ -1062,39 +1080,6 @@ const (
CoreJobPriority = JobMaxPriority * 2
)
// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// Copy returns a new copy of JobSummary
func (js *JobSummary) Copy() *JobSummary {
newJobSummary := new(JobSummary)
*newJobSummary = *js
newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary))
for k, v := range js.Summary {
newTGSummary[k] = v
}
newJobSummary.Summary = newTGSummary
return newJobSummary
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}
// Job is the scope of a scheduling request to Nomad. It is the largest
// scoped object, and is a named collection of task groups. Each task group
// is further composed of tasks. A task group (TG) is the unit of scheduling
@ -1146,6 +1131,13 @@ type Job struct {
// Periodic is used to define the interval the job is run at.
Periodic *PeriodicConfig
// ParameterizedJob is used to specify the job as a parameterized job
// for dispatching.
ParameterizedJob *ParameterizedJobConfig
// Payload is the payload supplied when the job was dispatched.
Payload []byte
// Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad.
Meta map[string]string
@ -1179,6 +1171,10 @@ func (j *Job) Canonicalize() {
for _, tg := range j.TaskGroups {
tg.Canonicalize(j)
}
if j.ParameterizedJob != nil {
j.ParameterizedJob.Canonicalize()
}
}
// Copy returns a deep copy of the Job. It is expected that callers use recover.
@ -1189,7 +1185,7 @@ func (j *Job) Copy() *Job {
}
nj := new(Job)
*nj = *j
nj.Datacenters = CopySliceString(nj.Datacenters)
nj.Datacenters = helper.CopySliceString(nj.Datacenters)
nj.Constraints = CopySliceConstraints(nj.Constraints)
if j.TaskGroups != nil {
@ -1201,7 +1197,8 @@ func (j *Job) Copy() *Job {
}
nj.Periodic = nj.Periodic.Copy()
nj.Meta = CopyMapStringString(nj.Meta)
nj.Meta = helper.CopyMapStringString(nj.Meta)
nj.ParameterizedJob = nj.ParameterizedJob.Copy()
return nj
}
@ -1276,6 +1273,17 @@ func (j *Job) Validate() error {
}
}
if j.IsParameterized() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch))
}
if err := j.ParameterizedJob.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
@ -1289,6 +1297,42 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
return nil
}
// CombinedTaskMeta takes a TaskGroup and Task name and returns the combined
// meta data for the task. When joining Job, Group and Task Meta, the precedence
// is by deepest scope (Task > Group > Job).
func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
group := j.LookupTaskGroup(groupName)
if group == nil {
return nil
}
task := group.LookupTask(taskName)
if task == nil {
return nil
}
meta := helper.CopyMapStringString(task.Meta)
if meta == nil {
meta = make(map[string]string, len(group.Meta)+len(j.Meta))
}
// Add the group specific meta
for k, v := range group.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
// Add the job specific meta
for k, v := range j.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
return meta
}
// Stub is used to return a summary of the job
func (j *Job) Stub(summary *JobSummary) *JobListStub {
return &JobListStub{
@ -1311,6 +1355,11 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil
}
// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
}
// VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string]*Vault {
policies := make(map[string]map[string]*Vault, len(j.TaskGroups))
@ -1399,6 +1448,63 @@ type JobListStub struct {
JobModifyIndex uint64
}
// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
// Summmary contains the summary per task group for the Job
Summary map[string]TaskGroupSummary
// Children contains a summary for the children of this job.
Children *JobChildrenSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// Copy returns a new copy of JobSummary
func (js *JobSummary) Copy() *JobSummary {
newJobSummary := new(JobSummary)
*newJobSummary = *js
newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary))
for k, v := range js.Summary {
newTGSummary[k] = v
}
newJobSummary.Summary = newTGSummary
newJobSummary.Children = newJobSummary.Children.Copy()
return newJobSummary
}
// JobChildrenSummary contains the summary of children job statuses
type JobChildrenSummary struct {
Pending int64
Running int64
Dead int64
}
// Copy returns a new copy of a JobChildrenSummary
func (jc *JobChildrenSummary) Copy() *JobChildrenSummary {
if jc == nil {
return nil
}
njc := new(JobChildrenSummary)
*njc = *jc
return njc
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}
// UpdateStrategy is used to modify how updates are done
type UpdateStrategy struct {
// Stagger is the amount of time between the updates
@ -1525,6 +1631,96 @@ type PeriodicLaunch struct {
ModifyIndex uint64
}
const (
DispatchPayloadForbidden = "forbidden"
DispatchPayloadOptional = "optional"
DispatchPayloadRequired = "required"
// DispatchLaunchSuffix is the string appended to the parameterized job's ID
// when dispatching instances of it.
DispatchLaunchSuffix = "/dispatch-"
)
// ParameterizedJobConfig is used to configure the parameterized job
type ParameterizedJobConfig struct {
// Payload configure the payload requirements
Payload string
// MetaRequired is metadata keys that must be specified by the dispatcher
MetaRequired []string `mapstructure:"meta_required"`
// MetaOptional is metadata keys that may be specified by the dispatcher
MetaOptional []string `mapstructure:"meta_optional"`
}
func (d *ParameterizedJobConfig) Validate() error {
var mErr multierror.Error
switch d.Payload {
case DispatchPayloadOptional, DispatchPayloadRequired, DispatchPayloadForbidden:
default:
multierror.Append(&mErr, fmt.Errorf("Unknown payload requirement: %q", d.Payload))
}
// Check that the meta configurations are disjoint sets
disjoint, offending := helper.SliceSetDisjoint(d.MetaRequired, d.MetaOptional)
if !disjoint {
multierror.Append(&mErr, fmt.Errorf("Required and optional meta keys should be disjoint. Following keys exist in both: %v", offending))
}
return mErr.ErrorOrNil()
}
func (d *ParameterizedJobConfig) Canonicalize() {
if d.Payload == "" {
d.Payload = DispatchPayloadOptional
}
}
func (d *ParameterizedJobConfig) Copy() *ParameterizedJobConfig {
if d == nil {
return nil
}
nd := new(ParameterizedJobConfig)
*nd = *d
nd.MetaOptional = helper.CopySliceString(nd.MetaOptional)
nd.MetaRequired = helper.CopySliceString(nd.MetaRequired)
return nd
}
// DispatchedID returns an ID appropriate for a job dispatched against a
// particular parameterized job
func DispatchedID(templateID string, t time.Time) string {
u := GenerateUUID()[:8]
return fmt.Sprintf("%s%s%d-%s", templateID, DispatchLaunchSuffix, t.Unix(), u)
}
// DispatchPayloadConfig configures how a task gets its input from a job dispatch
type DispatchPayloadConfig struct {
// File specifies a relative path to where the input data should be written
File string
}
func (d *DispatchPayloadConfig) Copy() *DispatchPayloadConfig {
if d == nil {
return nil
}
nd := new(DispatchPayloadConfig)
*nd = *d
return nd
}
func (d *DispatchPayloadConfig) Validate() error {
// Verify the destination doesn't escape
escaped, err := PathEscapesAllocDir("task/local/", d.File)
if err != nil {
return fmt.Errorf("invalid destination path: %v", err)
} else if escaped {
return fmt.Errorf("destination escapes allocation directory")
}
return nil
}
var (
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
@ -1656,7 +1852,7 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.Tasks = tasks
}
ntg.Meta = CopyMapStringString(ntg.Meta)
ntg.Meta = helper.CopyMapStringString(ntg.Meta)
if tg.EphemeralDisk != nil {
ntg.EphemeralDisk = tg.EphemeralDisk.Copy()
@ -1919,7 +2115,7 @@ func (s *Service) Copy() *Service {
}
ns := new(Service)
*ns = *s
ns.Tags = CopySliceString(ns.Tags)
ns.Tags = helper.CopySliceString(ns.Tags)
if s.Checks != nil {
checks := make([]*ServiceCheck, len(ns.Checks))
@ -2076,6 +2272,9 @@ type Task struct {
// Resources is the resources needed by this task
Resources *Resources
// DispatchPayload configures how the task retrieves its input from a dispatch
DispatchPayload *DispatchPayloadConfig
// Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad.
Meta map[string]string
@ -2098,7 +2297,7 @@ func (t *Task) Copy() *Task {
}
nt := new(Task)
*nt = *t
nt.Env = CopyMapStringString(nt.Env)
nt.Env = helper.CopyMapStringString(nt.Env)
if t.Services != nil {
services := make([]*Service, len(nt.Services))
@ -2112,7 +2311,8 @@ func (t *Task) Copy() *Task {
nt.Vault = nt.Vault.Copy()
nt.Resources = nt.Resources.Copy()
nt.Meta = CopyMapStringString(nt.Meta)
nt.Meta = helper.CopyMapStringString(nt.Meta)
nt.DispatchPayload = nt.DispatchPayload.Copy()
if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))
@ -2277,6 +2477,13 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error {
}
}
// Validate the dispatch payload block if there
if t.DispatchPayload != nil {
if err := t.DispatchPayload.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Dispatch Payload validation failed: %v", err))
}
}
return mErr.ErrorOrNil()
}
@ -2294,10 +2501,13 @@ func validateServices(t *Task) error {
outer := fmt.Errorf("service[%d] %+q validation failed: %s", i, service.Name, err)
mErr.Errors = append(mErr.Errors, outer)
}
if _, ok := knownServices[service.Name]; ok {
// Ensure that services with the same name are not being registered for
// the same port
if _, ok := knownServices[service.Name+service.PortLabel]; ok {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service %q is duplicate", service.Name))
}
knownServices[service.Name] = struct{}{}
knownServices[service.Name+service.PortLabel] = struct{}{}
if service.PortLabel != "" {
servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name)
@ -2379,6 +2589,9 @@ type Template struct {
// random wait between 0 and the given splay value before signalling the
// application of a change
Splay time.Duration `mapstructure:"splay"`
// Perms is the permission the file should be written out with.
Perms string `mapstructure:"perms"`
}
// DefaultTemplate returns a default template.
@ -2386,6 +2599,7 @@ func DefaultTemplate() *Template {
return &Template{
ChangeMode: TemplateChangeModeRestart,
Splay: 5 * time.Second,
Perms: "0644",
}
}
@ -2418,7 +2632,7 @@ func (t *Template) Validate() error {
}
// Verify the destination doesn't escape
escaped, err := PathEscapesAllocDir(t.DestPath)
escaped, err := PathEscapesAllocDir("task", t.DestPath)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err))
} else if escaped {
@ -2441,6 +2655,13 @@ func (t *Template) Validate() error {
multierror.Append(&mErr, fmt.Errorf("Must specify positive splay value"))
}
// Verify the permissions
if t.Perms != "" {
if _, err := strconv.ParseUint(t.Perms, 8, 12); err != nil {
multierror.Append(&mErr, fmt.Errorf("Failed to parse %q as octal: %v", t.Perms, err))
}
}
return mErr.ErrorOrNil()
}
@ -2555,6 +2776,11 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@ -2613,6 +2839,9 @@ type TaskEvent struct {
// TaskSignal is the signal that was sent to the task
TaskSignal string
// DriverMessage indicates a driver action being taken.
DriverMessage string
}
func (te *TaskEvent) GoString() string {
@ -2741,6 +2970,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent {
return e
}
func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent {
e.DriverMessage = m
return e
}
// TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter
@ -2761,7 +2995,7 @@ func (ta *TaskArtifact) Copy() *TaskArtifact {
}
nta := new(TaskArtifact)
*nta = *ta
nta.GetterOptions = CopyMapStringString(ta.GetterOptions)
nta.GetterOptions = helper.CopyMapStringString(ta.GetterOptions)
return nta
}
@ -2770,14 +3004,16 @@ func (ta *TaskArtifact) GoString() string {
}
// PathEscapesAllocDir returns if the given path escapes the allocation
// directory
func PathEscapesAllocDir(path string) (bool, error) {
// directory. The prefix allows adding a prefix if the path will be joined, for
// example a "task/local" prefix may be provided if the path will be joined
// against that prefix.
func PathEscapesAllocDir(prefix, path string) (bool, error) {
// Verify the destination doesn't escape the tasks directory
alloc, err := filepath.Abs(filepath.Join("/", "foo/", "bar/"))
alloc, err := filepath.Abs(filepath.Join("/", "alloc-dir/", "alloc-id/"))
if err != nil {
return false, err
}
abs, err := filepath.Abs(filepath.Join(alloc, path))
abs, err := filepath.Abs(filepath.Join(alloc, prefix, path))
if err != nil {
return false, err
}
@ -2796,11 +3032,11 @@ func (ta *TaskArtifact) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified"))
}
escaped, err := PathEscapesAllocDir(ta.RelativeDest)
escaped, err := PathEscapesAllocDir("task", ta.RelativeDest)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err))
} else if escaped {
mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes task's directory"))
mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes allocation directory"))
}
// Verify the checksum
@ -3311,12 +3547,12 @@ func (a *AllocMetric) Copy() *AllocMetric {
}
na := new(AllocMetric)
*na = *a
na.NodesAvailable = CopyMapStringInt(na.NodesAvailable)
na.ClassFiltered = CopyMapStringInt(na.ClassFiltered)
na.ConstraintFiltered = CopyMapStringInt(na.ConstraintFiltered)
na.ClassExhausted = CopyMapStringInt(na.ClassExhausted)
na.DimensionExhausted = CopyMapStringInt(na.DimensionExhausted)
na.Scores = CopyMapStringFloat64(na.Scores)
na.NodesAvailable = helper.CopyMapStringInt(na.NodesAvailable)
na.ClassFiltered = helper.CopyMapStringInt(na.ClassFiltered)
na.ConstraintFiltered = helper.CopyMapStringInt(na.ConstraintFiltered)
na.ClassExhausted = helper.CopyMapStringInt(na.ClassExhausted)
na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted)
na.Scores = helper.CopyMapStringFloat64(na.Scores)
return na
}
@ -3829,7 +4065,7 @@ type RecoverableError struct {
// NewRecoverableError is used to wrap an error and mark it as recoverable or
// not.
func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func NewRecoverableError(e error, recoverable bool) error {
if e == nil {
return nil
}
@ -3843,3 +4079,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func (r *RecoverableError) Error() string {
return r.Err
}
// IsRecoverable returns true if error is a RecoverableError with
// Recoverable=true. Otherwise false is returned.
func IsRecoverable(e error) bool {
if re, ok := e.(*RecoverableError); ok {
return re.Recoverable
}
return false
}

View File

@ -50,7 +50,6 @@ job "foo" {
resources {
cpu = 20
memory = 10
disk = 100
}
logs {