Merge pull request #11691 from hashicorp/f-update-nomad-jobspec-parser

provider/nomad: Update jobspec parser
This commit is contained in:
Jake Champlin 2017-02-06 12:02:37 -05:00 committed by GitHub
commit 17dfa0c8e3
16 changed files with 1500 additions and 196 deletions

View File

@ -1,6 +1,7 @@
package nomad package nomad
import ( import (
"errors"
"fmt" "fmt"
"strings" "strings"
"testing" "testing"
@ -16,7 +17,7 @@ func TestResourceJob_basic(t *testing.T) {
Providers: testProviders, Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) }, PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{ Steps: []r.TestStep{
r.TestStep{ {
Config: testResourceJob_initialConfig, Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck, Check: testResourceJob_initialCheck,
}, },
@ -31,14 +32,14 @@ func TestResourceJob_refresh(t *testing.T) {
Providers: testProviders, Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) }, PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{ Steps: []r.TestStep{
r.TestStep{ {
Config: testResourceJob_initialConfig, Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck, Check: testResourceJob_initialCheck,
}, },
// This should successfully cause the job to be recreated, // This should successfully cause the job to be recreated,
// testing the Exists function. // testing the Exists function.
r.TestStep{ {
PreConfig: testResourceJob_deregister(t, "foo"), PreConfig: testResourceJob_deregister(t, "foo"),
Config: testResourceJob_initialConfig, Config: testResourceJob_initialConfig,
}, },
@ -51,20 +52,20 @@ func TestResourceJob_disableDestroyDeregister(t *testing.T) {
Providers: testProviders, Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) }, PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{ Steps: []r.TestStep{
r.TestStep{ {
Config: testResourceJob_noDestroy, Config: testResourceJob_noDestroy,
Check: testResourceJob_initialCheck, Check: testResourceJob_initialCheck,
}, },
// Destroy with our setting set // Destroy with our setting set
r.TestStep{ {
Destroy: true, Destroy: true,
Config: testResourceJob_noDestroy, Config: testResourceJob_noDestroy,
Check: testResourceJob_checkExists, Check: testResourceJob_checkExists,
}, },
// Re-apply without the setting set // Re-apply without the setting set
r.TestStep{ {
Config: testResourceJob_initialConfig, Config: testResourceJob_initialConfig,
Check: testResourceJob_checkExists, Check: testResourceJob_checkExists,
}, },
@ -77,13 +78,13 @@ func TestResourceJob_idChange(t *testing.T) {
Providers: testProviders, Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) }, PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{ Steps: []r.TestStep{
r.TestStep{ {
Config: testResourceJob_initialConfig, Config: testResourceJob_initialConfig,
Check: testResourceJob_initialCheck, Check: testResourceJob_initialCheck,
}, },
// Change our ID // Change our ID
r.TestStep{ {
Config: testResourceJob_updateConfig, Config: testResourceJob_updateConfig,
Check: testResourceJob_updateCheck, Check: testResourceJob_updateCheck,
}, },
@ -91,6 +92,19 @@ func TestResourceJob_idChange(t *testing.T) {
}) })
} }
func TestResourceJob_parameterizedJob(t *testing.T) {
r.Test(t, r.TestCase{
Providers: testProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []r.TestStep{
{
Config: testResourceJob_parameterizedJob,
Check: testResourceJob_initialCheck,
},
},
})
}
var testResourceJob_initialConfig = ` var testResourceJob_initialConfig = `
resource "nomad_job" "test" { resource "nomad_job" "test" {
jobspec = <<EOT jobspec = <<EOT
@ -108,7 +122,6 @@ job "foo" {
resources { resources {
cpu = 20 cpu = 20
memory = 10 memory = 10
disk = 100
} }
logs { logs {
@ -140,7 +153,6 @@ job "foo" {
resources { resources {
cpu = 20 cpu = 20
memory = 10 memory = 10
disk = 100
} }
logs { logs {
@ -157,12 +169,12 @@ EOT
func testResourceJob_initialCheck(s *terraform.State) error { func testResourceJob_initialCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["nomad_job.test"] resourceState := s.Modules[0].Resources["nomad_job.test"]
if resourceState == nil { if resourceState == nil {
return fmt.Errorf("resource not found in state") return errors.New("resource not found in state")
} }
instanceState := resourceState.Primary instanceState := resourceState.Primary
if instanceState == nil { if instanceState == nil {
return fmt.Errorf("resource has no primary instance") return errors.New("resource has no primary instance")
} }
jobID := instanceState.ID jobID := instanceState.ID
@ -200,7 +212,7 @@ func testResourceJob_checkDestroy(jobID string) r.TestCheckFunc {
return nil return nil
} }
if err == nil { if err == nil {
err = fmt.Errorf("not destroyed") err = errors.New("not destroyed")
} }
return err return err
@ -234,7 +246,6 @@ job "bar" {
resources { resources {
cpu = 20 cpu = 20
memory = 10 memory = 10
disk = 100
} }
logs { logs {
@ -251,12 +262,12 @@ EOT
func testResourceJob_updateCheck(s *terraform.State) error { func testResourceJob_updateCheck(s *terraform.State) error {
resourceState := s.Modules[0].Resources["nomad_job.test"] resourceState := s.Modules[0].Resources["nomad_job.test"]
if resourceState == nil { if resourceState == nil {
return fmt.Errorf("resource not found in state") return errors.New("resource not found in state")
} }
instanceState := resourceState.Primary instanceState := resourceState.Primary
if instanceState == nil { if instanceState == nil {
return fmt.Errorf("resource has no primary instance") return errors.New("resource has no primary instance")
} }
jobID := instanceState.ID jobID := instanceState.ID
@ -275,9 +286,41 @@ func testResourceJob_updateCheck(s *terraform.State) error {
// Verify foo doesn't exist // Verify foo doesn't exist
_, _, err := client.Jobs().Info("foo", nil) _, _, err := client.Jobs().Info("foo", nil)
if err == nil { if err == nil {
return fmt.Errorf("reading foo success") return errors.New("reading foo success")
} }
} }
return nil return nil
} }
var testResourceJob_parameterizedJob = `
resource "nomad_job" "test" {
jobspec = <<EOT
job "bar" {
datacenters = ["dc1"]
type = "batch"
parameterized {
payload = "required"
}
group "foo" {
task "foo" {
driver = "raw_exec"
config {
command = "/bin/sleep"
args = ["1"]
}
resources {
cpu = 20
memory = 10
}
logs {
max_files = 3
max_file_size = 10
}
}
}
}
EOT
}
`

View File

@ -0,0 +1,60 @@
package discover
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime"
"github.com/kardianos/osext"
)
// Checks the current executable, then $GOPATH/bin, and finally the CWD, in that
// order. If it can't be found, an error is returned.
func NomadExecutable() (string, error) {
nomadExe := "nomad"
if runtime.GOOS == "windows" {
nomadExe = "nomad.exe"
}
// Check the current executable.
bin, err := osext.Executable()
if err != nil {
return "", fmt.Errorf("Failed to determine the nomad executable: %v", err)
}
if filepath.Base(bin) == nomadExe {
return bin, nil
}
// Check the $PATH
if bin, err := exec.LookPath(nomadExe); err == nil {
return bin, nil
}
// Check the $GOPATH.
bin = filepath.Join(os.Getenv("GOPATH"), "bin", nomadExe)
if _, err := os.Stat(bin); err == nil {
return bin, nil
}
// Check the CWD.
pwd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("Could not find Nomad executable (%v): %v", nomadExe, err)
}
bin = filepath.Join(pwd, nomadExe)
if _, err := os.Stat(bin); err == nil {
return bin, nil
}
// Check CWD/bin
bin = filepath.Join(pwd, "bin", nomadExe)
if _, err := os.Stat(bin); err == nil {
return bin, nil
}
return "", fmt.Errorf("Could not find Nomad executable (%v)", nomadExe)
}

169
vendor/github.com/hashicorp/nomad/helper/fields/data.go generated vendored Normal file
View File

@ -0,0 +1,169 @@
package fields
import (
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/mitchellh/mapstructure"
)
// FieldData contains the raw data and the schema that the data should adhere to
type FieldData struct {
Raw map[string]interface{}
Schema map[string]*FieldSchema
}
// Validate cycles through the raw data and validates conversions in the schema.
// It also checks for the existence and value of required fields.
func (d *FieldData) Validate() error {
var result *multierror.Error
// Scan for missing required fields
for field, schema := range d.Schema {
if schema.Required {
_, ok := d.Raw[field]
if !ok {
result = multierror.Append(result, fmt.Errorf(
"field %q is required", field))
}
}
}
// Validate field type and value
for field, value := range d.Raw {
schema, ok := d.Schema[field]
if !ok {
result = multierror.Append(result, fmt.Errorf(
"%q is an invalid field", field))
continue
}
switch schema.Type {
case TypeBool, TypeInt, TypeMap, TypeArray, TypeString:
val, _, err := d.getPrimitive(field, schema)
if err != nil {
result = multierror.Append(result, fmt.Errorf(
"field %q with input %q doesn't seem to be of type %s",
field, value, schema.Type))
}
// Check that we don't have an empty value for required fields
if schema.Required && val == schema.Type.Zero() {
result = multierror.Append(result, fmt.Errorf(
"field %q is required, but no value was found", field))
}
default:
result = multierror.Append(result, fmt.Errorf(
"unknown field type %s for field %s", schema.Type, field))
}
}
return result.ErrorOrNil()
}
// Get gets the value for the given field. If the key is an invalid field,
// FieldData will panic. If you want a safer version of this method, use
// GetOk. If the field k is not set, the default value (if set) will be
// returned, otherwise the zero value will be returned.
func (d *FieldData) Get(k string) interface{} {
schema, ok := d.Schema[k]
if !ok {
panic(fmt.Sprintf("field %s not in the schema", k))
}
value, ok := d.GetOk(k)
if !ok {
value = schema.DefaultOrZero()
}
return value
}
// GetOk gets the value for the given field. The second return value
// will be false if the key is invalid or the key is not set at all.
func (d *FieldData) GetOk(k string) (interface{}, bool) {
schema, ok := d.Schema[k]
if !ok {
return nil, false
}
result, ok, err := d.GetOkErr(k)
if err != nil {
panic(fmt.Sprintf("error reading %s: %s", k, err))
}
if ok && result == nil {
result = schema.DefaultOrZero()
}
return result, ok
}
// GetOkErr is the most conservative of all the Get methods. It returns
// whether key is set or not, but also an error value. The error value is
// non-nil if the field doesn't exist or there was an error parsing the
// field value.
func (d *FieldData) GetOkErr(k string) (interface{}, bool, error) {
schema, ok := d.Schema[k]
if !ok {
return nil, false, fmt.Errorf("unknown field: %s", k)
}
switch schema.Type {
case TypeBool, TypeInt, TypeMap, TypeArray, TypeString:
return d.getPrimitive(k, schema)
default:
return nil, false,
fmt.Errorf("unknown field type %s for field %s", schema.Type, k)
}
}
// getPrimitive tries to convert the raw value of a field to its data type as
// defined in the schema. It does strict type checking, so the value will need
// to be able to convert to the appropriate type directly.
func (d *FieldData) getPrimitive(
k string, schema *FieldSchema) (interface{}, bool, error) {
raw, ok := d.Raw[k]
if !ok {
return nil, false, nil
}
switch schema.Type {
case TypeBool:
var result bool
if err := mapstructure.Decode(raw, &result); err != nil {
return nil, true, err
}
return result, true, nil
case TypeInt:
var result int
if err := mapstructure.Decode(raw, &result); err != nil {
return nil, true, err
}
return result, true, nil
case TypeString:
var result string
if err := mapstructure.Decode(raw, &result); err != nil {
return nil, true, err
}
return result, true, nil
case TypeMap:
var result map[string]interface{}
if err := mapstructure.Decode(raw, &result); err != nil {
return nil, true, err
}
return result, true, nil
case TypeArray:
var result []interface{}
if err := mapstructure.Decode(raw, &result); err != nil {
return nil, true, err
}
return result, true, nil
default:
panic(fmt.Sprintf("Unknown type: %s", schema.Type))
}
}

View File

@ -0,0 +1,19 @@
package fields
// FieldSchema is a basic schema to describe the format of a configuration field
type FieldSchema struct {
Type FieldType
Default interface{}
Description string
Required bool
}
// DefaultOrZero returns the default value if it is set, or otherwise
// the zero value of the type.
func (s *FieldSchema) DefaultOrZero() interface{} {
if s.Default != nil {
return s.Default
}
return s.Type.Zero()
}

View File

@ -0,0 +1,47 @@
package fields
// FieldType is the enum of types that a field can be.
type FieldType uint
const (
TypeInvalid FieldType = 0
TypeString FieldType = iota
TypeInt
TypeBool
TypeMap
TypeArray
)
func (t FieldType) String() string {
switch t {
case TypeString:
return "string"
case TypeInt:
return "integer"
case TypeBool:
return "boolean"
case TypeMap:
return "map"
case TypeArray:
return "array"
default:
return "unknown type"
}
}
func (t FieldType) Zero() interface{} {
switch t {
case TypeString:
return ""
case TypeInt:
return 0
case TypeBool:
return false
case TypeMap:
return map[string]interface{}{}
case TypeArray:
return []interface{}{}
default:
panic("unknown type: " + t.String())
}
}

View File

@ -0,0 +1,60 @@
package flaghelper
import (
"strconv"
"strings"
"time"
)
// StringFlag implements the flag.Value interface and allows multiple
// calls to the same variable to append a list.
type StringFlag []string
func (s *StringFlag) String() string {
return strings.Join(*s, ",")
}
func (s *StringFlag) Set(value string) error {
*s = append(*s, value)
return nil
}
// FuncVar is a type of flag that accepts a function that is the string
// given
// by the user.
type FuncVar func(s string) error
func (f FuncVar) Set(s string) error { return f(s) }
func (f FuncVar) String() string { return "" }
func (f FuncVar) IsBoolFlag() bool { return false }
// FuncBoolVar is a type of flag that accepts a function, converts the
// user's
// value to a bool, and then calls the given function.
type FuncBoolVar func(b bool) error
func (f FuncBoolVar) Set(s string) error {
v, err := strconv.ParseBool(s)
if err != nil {
return err
}
return f(v)
}
func (f FuncBoolVar) String() string { return "" }
func (f FuncBoolVar) IsBoolFlag() bool { return true }
// FuncDurationVar is a type of flag that
// accepts a function, converts the
// user's value to a duration, and then
// calls the given function.
type FuncDurationVar func(d time.Duration) error
func (f FuncDurationVar) Set(s string) error {
v, err := time.ParseDuration(s)
if err != nil {
return err
}
return f(v)
}
func (f FuncDurationVar) String() string { return "" }
func (f FuncDurationVar) IsBoolFlag() bool { return false }

156
vendor/github.com/hashicorp/nomad/helper/funcs.go generated vendored Normal file
View File

@ -0,0 +1,156 @@
package helper
import "regexp"
// validUUID is used to check if a given string looks like a UUID
var validUUID = regexp.MustCompile(`(?i)^[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}$`)
// IsUUID returns true if the given string is a valid UUID.
func IsUUID(str string) bool {
const uuidLen = 36
if len(str) != uuidLen {
return false
}
return validUUID.MatchString(str)
}
// boolToPtr returns the pointer to a boolean
func BoolToPtr(b bool) *bool {
return &b
}
// MapStringStringSliceValueSet returns the set of values in a map[string][]string
func MapStringStringSliceValueSet(m map[string][]string) []string {
set := make(map[string]struct{})
for _, slice := range m {
for _, v := range slice {
set[v] = struct{}{}
}
}
flat := make([]string, 0, len(set))
for k := range set {
flat = append(flat, k)
}
return flat
}
func SliceStringToSet(s []string) map[string]struct{} {
m := make(map[string]struct{}, (len(s)+1)/2)
for _, k := range s {
m[k] = struct{}{}
}
return m
}
// SliceStringIsSubset returns whether the smaller set of strings is a subset of
// the larger. If the smaller slice is not a subset, the offending elements are
// returned.
func SliceStringIsSubset(larger, smaller []string) (bool, []string) {
largerSet := make(map[string]struct{}, len(larger))
for _, l := range larger {
largerSet[l] = struct{}{}
}
subset := true
var offending []string
for _, s := range smaller {
if _, ok := largerSet[s]; !ok {
subset = false
offending = append(offending, s)
}
}
return subset, offending
}
func SliceSetDisjoint(first, second []string) (bool, []string) {
contained := make(map[string]struct{}, len(first))
for _, k := range first {
contained[k] = struct{}{}
}
offending := make(map[string]struct{})
for _, k := range second {
if _, ok := contained[k]; ok {
offending[k] = struct{}{}
}
}
if len(offending) == 0 {
return true, nil
}
flattened := make([]string, 0, len(offending))
for k := range offending {
flattened = append(flattened, k)
}
return false, flattened
}
// Helpers for copying generic structures.
func CopyMapStringString(m map[string]string) map[string]string {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]string, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringInt(m map[string]int) map[string]int {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]int, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringFloat64(m map[string]float64) map[string]float64 {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]float64, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopySliceString(s []string) []string {
l := len(s)
if l == 0 {
return nil
}
c := make([]string, l)
for i, v := range s {
c[i] = v
}
return c
}
func CopySliceInt(s []int) []int {
l := len(s)
if l == 0 {
return nil
}
c := make([]int, l)
for i, v := range s {
c[i] = v
}
return c
}

View File

@ -0,0 +1,43 @@
package gatedwriter
import (
"io"
"sync"
)
// Writer is an io.Writer implementation that buffers all of its
// data into an internal buffer until it is told to let data through.
type Writer struct {
Writer io.Writer
buf [][]byte
flush bool
lock sync.RWMutex
}
// Flush tells the Writer to flush any buffered data and to stop
// buffering.
func (w *Writer) Flush() {
w.lock.Lock()
w.flush = true
w.lock.Unlock()
for _, p := range w.buf {
w.Write(p)
}
w.buf = nil
}
func (w *Writer) Write(p []byte) (n int, err error) {
w.lock.RLock()
defer w.lock.RUnlock()
if w.flush {
return w.Writer.Write(p)
}
p2 := make([]byte, len(p))
copy(p2, p)
w.buf = append(w.buf, p2)
return len(p), nil
}

67
vendor/github.com/hashicorp/nomad/helper/stats/cpu.go generated vendored Normal file
View File

@ -0,0 +1,67 @@
package stats
import (
"fmt"
"math"
"sync"
"github.com/shirou/gopsutil/cpu"
)
var (
cpuMhzPerCore float64
cpuModelName string
cpuNumCores int
cpuTotalTicks float64
onceLer sync.Once
)
func Init() error {
var err error
onceLer.Do(func() {
if cpuNumCores, err = cpu.Counts(true); err != nil {
err = fmt.Errorf("Unable to determine the number of CPU cores available: %v", err)
return
}
var cpuInfo []cpu.InfoStat
if cpuInfo, err = cpu.Info(); err != nil {
err = fmt.Errorf("Unable to obtain CPU information: %v", err)
return
}
for _, cpu := range cpuInfo {
cpuModelName = cpu.ModelName
cpuMhzPerCore = cpu.Mhz
break
}
// Floor all of the values such that small difference don't cause the
// node to fall into a unique computed node class
cpuMhzPerCore = math.Floor(cpuMhzPerCore)
cpuTotalTicks = math.Floor(float64(cpuNumCores) * cpuMhzPerCore)
})
return err
}
// CPUModelName returns the number of CPU cores available
func CPUNumCores() int {
return cpuNumCores
}
// CPUMHzPerCore returns the MHz per CPU core
func CPUMHzPerCore() float64 {
return cpuMhzPerCore
}
// CPUModelName returns the model name of the CPU
func CPUModelName() string {
return cpuModelName
}
// TotalTicksAvailable calculates the total frequency available across all
// cores
func TotalTicksAvailable() float64 {
return cpuTotalTicks
}

View File

@ -0,0 +1,118 @@
// Package testtask implements a portable set of commands useful as stand-ins
// for user tasks.
package testtask
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"time"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kardianos/osext"
)
// Path returns the path to the currently running executable.
func Path() string {
path, err := osext.Executable()
if err != nil {
panic(err)
}
return path
}
// SetEnv configures the environment of the task so that Run executes a testtask
// script when called from within cmd.
func SetEnv(env *env.TaskEnvironment) {
env.AppendEnvvars(map[string]string{"TEST_TASK": "execute"})
}
// SetCmdEnv configures the environment of cmd so that Run executes a testtask
// script when called from within cmd.
func SetCmdEnv(cmd *exec.Cmd) {
cmd.Env = append(os.Environ(), "TEST_TASK=execute")
}
// SetTaskEnv configures the environment of t so that Run executes a testtask
// script when called from within t.
func SetTaskEnv(t *structs.Task) {
if t.Env == nil {
t.Env = map[string]string{}
}
t.Env["TEST_TASK"] = "execute"
}
// Run interprets os.Args as a testtask script if the current program was
// launched with an environment configured by SetCmdEnv or SetTaskEnv. It
// returns false if the environment was not set by this package.
func Run() bool {
switch tm := os.Getenv("TEST_TASK"); tm {
case "":
return false
case "execute":
execute()
return true
default:
fmt.Fprintf(os.Stderr, "unexpected value for TEST_TASK, \"%s\"\n", tm)
os.Exit(1)
return true
}
}
func execute() {
if len(os.Args) < 2 {
fmt.Fprintln(os.Stderr, "no command provided")
os.Exit(1)
}
args := os.Args[1:]
// popArg removes the first argument from args and returns it.
popArg := func() string {
s := args[0]
args = args[1:]
return s
}
// execute a sequence of operations from args
for len(args) > 0 {
switch cmd := popArg(); cmd {
case "sleep":
// sleep <dur>: sleep for a duration indicated by the first
// argument
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "expected arg for sleep")
os.Exit(1)
}
dur, err := time.ParseDuration(popArg())
if err != nil {
fmt.Fprintf(os.Stderr, "could not parse sleep time: %v", err)
os.Exit(1)
}
time.Sleep(dur)
case "echo":
// echo <msg>: write the msg followed by a newline to stdout.
fmt.Println(popArg())
case "write":
// write <msg> <file>: write a message to a file. The first
// argument is the msg. The second argument is the path to the
// target file.
if len(args) < 2 {
fmt.Fprintln(os.Stderr, "expected two args for write")
os.Exit(1)
}
msg := popArg()
file := popArg()
ioutil.WriteFile(file, []byte(msg), 0666)
default:
fmt.Fprintln(os.Stderr, "unknown command:", cmd)
os.Exit(1)
}
}
}

View File

@ -0,0 +1,258 @@
package tlsutil
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"time"
)
// RegionSpecificWrapper is used to invoke a static Region and turns a
// RegionWrapper into a Wrapper type.
func RegionSpecificWrapper(region string, tlsWrap RegionWrapper) Wrapper {
if tlsWrap == nil {
return nil
}
return func(conn net.Conn) (net.Conn, error) {
return tlsWrap(region, conn)
}
}
// RegionWrapper is a function that is used to wrap a non-TLS connection and
// returns an appropriate TLS connection or error. This takes a Region as an
// argument.
type RegionWrapper func(region string, conn net.Conn) (net.Conn, error)
// Wrapper wraps a connection and enables TLS on it.
type Wrapper func(conn net.Conn) (net.Conn, error)
// Config used to create tls.Config
type Config struct {
// VerifyIncoming is used to verify the authenticity of incoming connections.
// This means that TCP requests are forbidden, only allowing for TLS. TLS connections
// must match a provided certificate authority. This can be used to force client auth.
VerifyIncoming bool
// VerifyOutgoing is used to verify the authenticity of outgoing connections.
// This means that TLS requests are used, and TCP requests are not made. TLS connections
// must match a provided certificate authority. This is used to verify authenticity of
// server nodes.
VerifyOutgoing bool
// VerifyServerHostname is used to enable hostname verification of servers. This
// ensures that the certificate presented is valid for server.<datacenter>.<domain>.
// This prevents a compromised client from being restarted as a server, and then
// intercepting request traffic as well as being added as a raft peer. This should be
// enabled by default with VerifyOutgoing, but for legacy reasons we cannot break
// existing clients.
VerifyServerHostname bool
// CAFile is a path to a certificate authority file. This is used with VerifyIncoming
// or VerifyOutgoing to verify the TLS connection.
CAFile string
// CertFile is used to provide a TLS certificate that is used for serving TLS connections.
// Must be provided to serve TLS connections.
CertFile string
// KeyFile is used to provide a TLS key that is used for serving TLS connections.
// Must be provided to serve TLS connections.
KeyFile string
}
// AppendCA opens and parses the CA file and adds the certificates to
// the provided CertPool.
func (c *Config) AppendCA(pool *x509.CertPool) error {
if c.CAFile == "" {
return nil
}
// Read the file
data, err := ioutil.ReadFile(c.CAFile)
if err != nil {
return fmt.Errorf("Failed to read CA file: %v", err)
}
if !pool.AppendCertsFromPEM(data) {
return fmt.Errorf("Failed to parse any CA certificates")
}
return nil
}
// KeyPair is used to open and parse a certificate and key file
func (c *Config) KeyPair() (*tls.Certificate, error) {
if c.CertFile == "" || c.KeyFile == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
if err != nil {
return nil, fmt.Errorf("Failed to load cert/key pair: %v", err)
}
return &cert, err
}
// OutgoingTLSConfig generates a TLS configuration for outgoing
// requests. It will return a nil config if this configuration should
// not use TLS for outgoing connections.
func (c *Config) OutgoingTLSConfig() (*tls.Config, error) {
// If VerifyServerHostname is true, that implies VerifyOutgoing
if c.VerifyServerHostname {
c.VerifyOutgoing = true
}
if !c.VerifyOutgoing {
return nil, nil
}
// Create the tlsConfig
tlsConfig := &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: true,
}
if c.VerifyServerHostname {
tlsConfig.InsecureSkipVerify = false
}
// Ensure we have a CA if VerifyOutgoing is set
if c.VerifyOutgoing && c.CAFile == "" {
return nil, fmt.Errorf("VerifyOutgoing set, and no CA certificate provided!")
}
// Parse the CA cert if any
err := c.AppendCA(tlsConfig.RootCAs)
if err != nil {
return nil, err
}
// Add cert/key
cert, err := c.KeyPair()
if err != nil {
return nil, err
} else if cert != nil {
tlsConfig.Certificates = []tls.Certificate{*cert}
}
return tlsConfig, nil
}
// OutgoingTLSWrapper returns a a Wrapper based on the OutgoingTLS
// configuration. If hostname verification is on, the wrapper
// will properly generate the dynamic server name for verification.
func (c *Config) OutgoingTLSWrapper() (RegionWrapper, error) {
// Get the TLS config
tlsConfig, err := c.OutgoingTLSConfig()
if err != nil {
return nil, err
}
// Check if TLS is not enabled
if tlsConfig == nil {
return nil, nil
}
// Generate the wrapper based on hostname verification
if c.VerifyServerHostname {
wrapper := func(region string, conn net.Conn) (net.Conn, error) {
conf := *tlsConfig
conf.ServerName = "server." + region + ".nomad"
return WrapTLSClient(conn, &conf)
}
return wrapper, nil
} else {
wrapper := func(dc string, c net.Conn) (net.Conn, error) {
return WrapTLSClient(c, tlsConfig)
}
return wrapper, nil
}
}
// Wrap a net.Conn into a client tls connection, performing any
// additional verification as needed.
//
// As of go 1.3, crypto/tls only supports either doing no certificate
// verification, or doing full verification including of the peer's
// DNS name. For consul, we want to validate that the certificate is
// signed by a known CA, but because consul doesn't use DNS names for
// node names, we don't verify the certificate DNS names. Since go 1.3
// no longer supports this mode of operation, we have to do it
// manually.
func WrapTLSClient(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) {
var err error
var tlsConn *tls.Conn
tlsConn = tls.Client(conn, tlsConfig)
// If crypto/tls is doing verification, there's no need to do
// our own.
if tlsConfig.InsecureSkipVerify == false {
return tlsConn, nil
}
if err = tlsConn.Handshake(); err != nil {
tlsConn.Close()
return nil, err
}
// The following is lightly-modified from the doFullHandshake
// method in crypto/tls's handshake_client.go.
opts := x509.VerifyOptions{
Roots: tlsConfig.RootCAs,
CurrentTime: time.Now(),
DNSName: "",
Intermediates: x509.NewCertPool(),
}
certs := tlsConn.ConnectionState().PeerCertificates
for i, cert := range certs {
if i == 0 {
continue
}
opts.Intermediates.AddCert(cert)
}
_, err = certs[0].Verify(opts)
if err != nil {
tlsConn.Close()
return nil, err
}
return tlsConn, err
}
// IncomingTLSConfig generates a TLS configuration for incoming requests
func (c *Config) IncomingTLSConfig() (*tls.Config, error) {
// Create the tlsConfig
tlsConfig := &tls.Config{
ClientCAs: x509.NewCertPool(),
ClientAuth: tls.NoClientCert,
}
// Parse the CA cert if any
err := c.AppendCA(tlsConfig.ClientCAs)
if err != nil {
return nil, err
}
// Add cert/key
cert, err := c.KeyPair()
if err != nil {
return nil, err
} else if cert != nil {
tlsConfig.Certificates = []tls.Certificate{*cert}
}
// Check if we require verification
if c.VerifyIncoming {
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
if c.CAFile == "" {
return nil, fmt.Errorf("VerifyIncoming set, and no CA certificate provided!")
}
if cert == nil {
return nil, fmt.Errorf("VerifyIncoming set, and no Cert/Key pair provided!")
}
}
return tlsConfig, nil
}

View File

@ -83,10 +83,13 @@ func ParseFile(path string) (*structs.Job, error) {
} }
func parseJob(result *structs.Job, list *ast.ObjectList) error { func parseJob(result *structs.Job, list *ast.ObjectList) error {
list = list.Children()
if len(list.Items) != 1 { if len(list.Items) != 1 {
return fmt.Errorf("only one 'job' block allowed") return fmt.Errorf("only one 'job' block allowed")
} }
list = list.Children()
if len(list.Items) != 1 {
return fmt.Errorf("'job' block missing name")
}
// Get our job object // Get our job object
obj := list.Items[0] obj := list.Items[0]
@ -101,6 +104,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
delete(m, "update") delete(m, "update")
delete(m, "periodic") delete(m, "periodic")
delete(m, "vault") delete(m, "vault")
delete(m, "parameterized")
// Set the ID and name to the object key // Set the ID and name to the object key
result.ID = obj.Keys[0].Token.Value().(string) result.ID = obj.Keys[0].Token.Value().(string)
@ -126,19 +130,20 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
// Check for invalid keys // Check for invalid keys
valid := []string{ valid := []string{
"id",
"name",
"region",
"all_at_once", "all_at_once",
"type",
"priority",
"datacenters",
"constraint", "constraint",
"update", "datacenters",
"periodic", "parameterized",
"meta",
"task",
"group", "group",
"id",
"meta",
"name",
"periodic",
"priority",
"region",
"task",
"type",
"update",
"vault", "vault",
"vault_token", "vault_token",
} }
@ -167,6 +172,13 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
} }
} }
// If we have a parameterized definition, then parse that
if o := listVal.Filter("parameterized"); len(o.Items) > 0 {
if err := parseParameterizedJob(&result.ParameterizedJob, o); err != nil {
return multierror.Prefix(err, "parameterized ->")
}
}
// Parse out meta fields. These are in HCL as a list so we need // Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them. // to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
@ -551,6 +563,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"artifact", "artifact",
"config", "config",
"constraint", "constraint",
"dispatch_payload",
"driver", "driver",
"env", "env",
"kill_timeout", "kill_timeout",
@ -573,6 +586,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
delete(m, "artifact") delete(m, "artifact")
delete(m, "config") delete(m, "config")
delete(m, "constraint") delete(m, "constraint")
delete(m, "dispatch_payload")
delete(m, "env") delete(m, "env")
delete(m, "logs") delete(m, "logs")
delete(m, "meta") delete(m, "meta")
@ -716,6 +730,32 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
t.Vault = v t.Vault = v
} }
// If we have a dispatch_payload block parse that
if o := listVal.Filter("dispatch_payload"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return fmt.Errorf("only one dispatch_payload block is allowed in a task. Number of dispatch_payload blocks found: %d", len(o.Items))
}
var m map[string]interface{}
dispatchBlock := o.Items[0]
// Check for invalid keys
valid := []string{
"file",
}
if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', dispatch_payload ->", n))
}
if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil {
return err
}
t.DispatchPayload = &structs.DispatchPayloadConfig{}
if err := mapstructure.WeakDecode(m, t.DispatchPayload); err != nil {
return err
}
}
*result = append(*result, &t) *result = append(*result, &t)
} }
@ -797,13 +837,13 @@ func parseTemplates(result *[]*structs.Template, list *ast.ObjectList) error {
for _, o := range list.Elem().Items { for _, o := range list.Elem().Items {
// Check for invalid keys // Check for invalid keys
valid := []string{ valid := []string{
"source",
"destination",
"data",
"change_mode", "change_mode",
"change_signal", "change_signal",
"data",
"destination",
"perms",
"source",
"splay", "splay",
"once",
} }
if err := checkHCLKeys(o.Val, valid); err != nil { if err := checkHCLKeys(o.Val, valid); err != nil {
return err return err
@ -1188,6 +1228,40 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
return nil return nil
} }
func parseParameterizedJob(result **structs.ParameterizedJobConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'parameterized' block allowed per job")
}
// Get our resource object
o := list.Items[0]
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
// Check for invalid keys
valid := []string{
"payload",
"meta_required",
"meta_optional",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}
// Build the parameterized job block
var d structs.ParameterizedJobConfig
if err := mapstructure.WeakDecode(m, &d); err != nil {
return err
}
*result = &d
return nil
}
func checkHCLKeys(node ast.Node, valid []string) error { func checkHCLKeys(node ast.Node, valid []string) error {
var list *ast.ObjectList var list *ast.ObjectList
switch n := node.(type) { switch n := node.(type) {

View File

@ -130,6 +130,11 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
diff.Objects = append(diff.Objects, pDiff) diff.Objects = append(diff.Objects, pDiff)
} }
// ParameterizedJob diff
if cDiff := parameterizedJobDiff(j.ParameterizedJob, other.ParameterizedJob, contextual); cDiff != nil {
diff.Objects = append(diff.Objects, cDiff)
}
return diff, nil return diff, nil
} }
@ -370,6 +375,12 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, lDiff) diff.Objects = append(diff.Objects, lDiff)
} }
// Dispatch payload diff
dDiff := primitiveObjectDiff(t.DispatchPayload, other.DispatchPayload, nil, "DispatchPayload", contextual)
if dDiff != nil {
diff.Objects = append(diff.Objects, dDiff)
}
// Artifacts diff // Artifacts diff
diffs := primitiveObjectSetDiff( diffs := primitiveObjectSetDiff(
interfaceSlice(t.Artifacts), interfaceSlice(t.Artifacts),
@ -629,6 +640,44 @@ func vaultDiff(old, new *Vault, contextual bool) *ObjectDiff {
return diff return diff
} }
// parameterizedJobDiff returns the diff of two parameterized job objects. If
// contextual diff is enabled, all fields will be returned, even if no diff
// occurred.
func parameterizedJobDiff(old, new *ParameterizedJobConfig, contextual bool) *ObjectDiff {
diff := &ObjectDiff{Type: DiffTypeNone, Name: "ParameterizedJob"}
var oldPrimitiveFlat, newPrimitiveFlat map[string]string
if reflect.DeepEqual(old, new) {
return nil
} else if old == nil {
old = &ParameterizedJobConfig{}
diff.Type = DiffTypeAdded
newPrimitiveFlat = flatmap.Flatten(new, nil, true)
} else if new == nil {
new = &ParameterizedJobConfig{}
diff.Type = DiffTypeDeleted
oldPrimitiveFlat = flatmap.Flatten(old, nil, true)
} else {
diff.Type = DiffTypeEdited
oldPrimitiveFlat = flatmap.Flatten(old, nil, true)
newPrimitiveFlat = flatmap.Flatten(new, nil, true)
}
// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual)
// Meta diffs
if optionalDiff := stringSetDiff(old.MetaOptional, new.MetaOptional, "MetaOptional", contextual); optionalDiff != nil {
diff.Objects = append(diff.Objects, optionalDiff)
}
if requiredDiff := stringSetDiff(old.MetaRequired, new.MetaRequired, "MetaRequired", contextual); requiredDiff != nil {
diff.Objects = append(diff.Objects, requiredDiff)
}
return diff
}
// Diff returns a diff of two resource objects. If contextual diff is enabled, // Diff returns a diff of two resource objects. If contextual diff is enabled,
// non-changed fields will still be returned. // non-changed fields will still be returned.
func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff { func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff {

View File

@ -169,72 +169,6 @@ func GenerateUUID() string {
buf[10:16]) buf[10:16])
} }
// Helpers for copying generic structures.
func CopyMapStringString(m map[string]string) map[string]string {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]string, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringInt(m map[string]int) map[string]int {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]int, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopyMapStringFloat64(m map[string]float64) map[string]float64 {
l := len(m)
if l == 0 {
return nil
}
c := make(map[string]float64, l)
for k, v := range m {
c[k] = v
}
return c
}
func CopySliceString(s []string) []string {
l := len(s)
if l == 0 {
return nil
}
c := make([]string, l)
for i, v := range s {
c[i] = v
}
return c
}
func CopySliceInt(s []int) []int {
l := len(s)
if l == 0 {
return nil
}
c := make([]int, l)
for i, v := range s {
c[i] = v
}
return c
}
func CopySliceConstraints(s []*Constraint) []*Constraint { func CopySliceConstraints(s []*Constraint) []*Constraint {
l := len(s) l := len(s)
if l == 0 { if l == 0 {
@ -248,27 +182,6 @@ func CopySliceConstraints(s []*Constraint) []*Constraint {
return c return c
} }
// SliceStringIsSubset returns whether the smaller set of strings is a subset of
// the larger. If the smaller slice is not a subset, the offending elements are
// returned.
func SliceStringIsSubset(larger, smaller []string) (bool, []string) {
largerSet := make(map[string]struct{}, len(larger))
for _, l := range larger {
largerSet[l] = struct{}{}
}
subset := true
var offending []string
for _, s := range smaller {
if _, ok := largerSet[s]; !ok {
subset = false
offending = append(offending, s)
}
}
return subset, offending
}
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns // VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies // the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
@ -288,19 +201,3 @@ func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {
} }
return flattened return flattened
} }
// MapStringStringSliceValueSet returns the set of values in a map[string][]string
func MapStringStringSliceValueSet(m map[string][]string) []string {
set := make(map[string]struct{})
for _, slice := range m {
for _, v := range slice {
set[v] = struct{}{}
}
}
flat := make([]string, 0, len(set))
for k := range set {
flat = append(flat, k)
}
return flat
}

View File

@ -24,6 +24,7 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args" "github.com/hashicorp/nomad/helper/args"
"github.com/mitchellh/copystructure" "github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec" "github.com/ugorji/go/codec"
@ -250,6 +251,7 @@ type JobEvaluateRequest struct {
// JobSpecificRequest is used when we just need to specify a target job // JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct { type JobSpecificRequest struct {
JobID string JobID string
AllAllocs bool
QueryOptions QueryOptions
} }
@ -272,6 +274,14 @@ type JobSummaryRequest struct {
QueryOptions QueryOptions
} }
// JobDispatchRequest is used to dispatch a job based on a parameterized job
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
WriteRequest
}
// NodeListRequest is used to parameterize a list request // NodeListRequest is used to parameterize a list request
type NodeListRequest struct { type NodeListRequest struct {
QueryOptions QueryOptions
@ -525,6 +535,14 @@ type JobSummaryResponse struct {
QueryMeta QueryMeta
} }
type JobDispatchResponse struct {
DispatchedJobID string
EvalID string
EvalCreateIndex uint64
JobCreateIndex uint64
QueryMeta
}
// JobListResponse is used for a list request // JobListResponse is used for a list request
type JobListResponse struct { type JobListResponse struct {
Jobs []*JobListStub Jobs []*JobListStub
@ -746,11 +764,11 @@ func (n *Node) Copy() *Node {
} }
nn := new(Node) nn := new(Node)
*nn = *n *nn = *n
nn.Attributes = CopyMapStringString(nn.Attributes) nn.Attributes = helper.CopyMapStringString(nn.Attributes)
nn.Resources = nn.Resources.Copy() nn.Resources = nn.Resources.Copy()
nn.Reserved = nn.Reserved.Copy() nn.Reserved = nn.Reserved.Copy()
nn.Links = CopyMapStringString(nn.Links) nn.Links = helper.CopyMapStringString(nn.Links)
nn.Meta = CopyMapStringString(nn.Meta) nn.Meta = helper.CopyMapStringString(nn.Meta)
return nn return nn
} }
@ -1062,39 +1080,6 @@ const (
CoreJobPriority = JobMaxPriority * 2 CoreJobPriority = JobMaxPriority * 2
) )
// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// Copy returns a new copy of JobSummary
func (js *JobSummary) Copy() *JobSummary {
newJobSummary := new(JobSummary)
*newJobSummary = *js
newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary))
for k, v := range js.Summary {
newTGSummary[k] = v
}
newJobSummary.Summary = newTGSummary
return newJobSummary
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}
// Job is the scope of a scheduling request to Nomad. It is the largest // Job is the scope of a scheduling request to Nomad. It is the largest
// scoped object, and is a named collection of task groups. Each task group // scoped object, and is a named collection of task groups. Each task group
// is further composed of tasks. A task group (TG) is the unit of scheduling // is further composed of tasks. A task group (TG) is the unit of scheduling
@ -1146,6 +1131,13 @@ type Job struct {
// Periodic is used to define the interval the job is run at. // Periodic is used to define the interval the job is run at.
Periodic *PeriodicConfig Periodic *PeriodicConfig
// ParameterizedJob is used to specify the job as a parameterized job
// for dispatching.
ParameterizedJob *ParameterizedJobConfig
// Payload is the payload supplied when the job was dispatched.
Payload []byte
// Meta is used to associate arbitrary metadata with this // Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad. // job. This is opaque to Nomad.
Meta map[string]string Meta map[string]string
@ -1179,6 +1171,10 @@ func (j *Job) Canonicalize() {
for _, tg := range j.TaskGroups { for _, tg := range j.TaskGroups {
tg.Canonicalize(j) tg.Canonicalize(j)
} }
if j.ParameterizedJob != nil {
j.ParameterizedJob.Canonicalize()
}
} }
// Copy returns a deep copy of the Job. It is expected that callers use recover. // Copy returns a deep copy of the Job. It is expected that callers use recover.
@ -1189,7 +1185,7 @@ func (j *Job) Copy() *Job {
} }
nj := new(Job) nj := new(Job)
*nj = *j *nj = *j
nj.Datacenters = CopySliceString(nj.Datacenters) nj.Datacenters = helper.CopySliceString(nj.Datacenters)
nj.Constraints = CopySliceConstraints(nj.Constraints) nj.Constraints = CopySliceConstraints(nj.Constraints)
if j.TaskGroups != nil { if j.TaskGroups != nil {
@ -1201,7 +1197,8 @@ func (j *Job) Copy() *Job {
} }
nj.Periodic = nj.Periodic.Copy() nj.Periodic = nj.Periodic.Copy()
nj.Meta = CopyMapStringString(nj.Meta) nj.Meta = helper.CopyMapStringString(nj.Meta)
nj.ParameterizedJob = nj.ParameterizedJob.Copy()
return nj return nj
} }
@ -1276,6 +1273,17 @@ func (j *Job) Validate() error {
} }
} }
if j.IsParameterized() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch))
}
if err := j.ParameterizedJob.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
@ -1289,6 +1297,42 @@ func (j *Job) LookupTaskGroup(name string) *TaskGroup {
return nil return nil
} }
// CombinedTaskMeta takes a TaskGroup and Task name and returns the combined
// meta data for the task. When joining Job, Group and Task Meta, the precedence
// is by deepest scope (Task > Group > Job).
func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
group := j.LookupTaskGroup(groupName)
if group == nil {
return nil
}
task := group.LookupTask(taskName)
if task == nil {
return nil
}
meta := helper.CopyMapStringString(task.Meta)
if meta == nil {
meta = make(map[string]string, len(group.Meta)+len(j.Meta))
}
// Add the group specific meta
for k, v := range group.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
// Add the job specific meta
for k, v := range j.Meta {
if _, ok := meta[k]; !ok {
meta[k] = v
}
}
return meta
}
// Stub is used to return a summary of the job // Stub is used to return a summary of the job
func (j *Job) Stub(summary *JobSummary) *JobListStub { func (j *Job) Stub(summary *JobSummary) *JobListStub {
return &JobListStub{ return &JobListStub{
@ -1311,6 +1355,11 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil return j.Periodic != nil
} }
// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
}
// VaultPolicies returns the set of Vault policies per task group, per task // VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string]*Vault { func (j *Job) VaultPolicies() map[string]map[string]*Vault {
policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) policies := make(map[string]map[string]*Vault, len(j.TaskGroups))
@ -1399,6 +1448,63 @@ type JobListStub struct {
JobModifyIndex uint64 JobModifyIndex uint64
} }
// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
// Summmary contains the summary per task group for the Job
Summary map[string]TaskGroupSummary
// Children contains a summary for the children of this job.
Children *JobChildrenSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// Copy returns a new copy of JobSummary
func (js *JobSummary) Copy() *JobSummary {
newJobSummary := new(JobSummary)
*newJobSummary = *js
newTGSummary := make(map[string]TaskGroupSummary, len(js.Summary))
for k, v := range js.Summary {
newTGSummary[k] = v
}
newJobSummary.Summary = newTGSummary
newJobSummary.Children = newJobSummary.Children.Copy()
return newJobSummary
}
// JobChildrenSummary contains the summary of children job statuses
type JobChildrenSummary struct {
Pending int64
Running int64
Dead int64
}
// Copy returns a new copy of a JobChildrenSummary
func (jc *JobChildrenSummary) Copy() *JobChildrenSummary {
if jc == nil {
return nil
}
njc := new(JobChildrenSummary)
*njc = *jc
return njc
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}
// UpdateStrategy is used to modify how updates are done // UpdateStrategy is used to modify how updates are done
type UpdateStrategy struct { type UpdateStrategy struct {
// Stagger is the amount of time between the updates // Stagger is the amount of time between the updates
@ -1525,6 +1631,96 @@ type PeriodicLaunch struct {
ModifyIndex uint64 ModifyIndex uint64
} }
const (
DispatchPayloadForbidden = "forbidden"
DispatchPayloadOptional = "optional"
DispatchPayloadRequired = "required"
// DispatchLaunchSuffix is the string appended to the parameterized job's ID
// when dispatching instances of it.
DispatchLaunchSuffix = "/dispatch-"
)
// ParameterizedJobConfig is used to configure the parameterized job
type ParameterizedJobConfig struct {
// Payload configure the payload requirements
Payload string
// MetaRequired is metadata keys that must be specified by the dispatcher
MetaRequired []string `mapstructure:"meta_required"`
// MetaOptional is metadata keys that may be specified by the dispatcher
MetaOptional []string `mapstructure:"meta_optional"`
}
func (d *ParameterizedJobConfig) Validate() error {
var mErr multierror.Error
switch d.Payload {
case DispatchPayloadOptional, DispatchPayloadRequired, DispatchPayloadForbidden:
default:
multierror.Append(&mErr, fmt.Errorf("Unknown payload requirement: %q", d.Payload))
}
// Check that the meta configurations are disjoint sets
disjoint, offending := helper.SliceSetDisjoint(d.MetaRequired, d.MetaOptional)
if !disjoint {
multierror.Append(&mErr, fmt.Errorf("Required and optional meta keys should be disjoint. Following keys exist in both: %v", offending))
}
return mErr.ErrorOrNil()
}
func (d *ParameterizedJobConfig) Canonicalize() {
if d.Payload == "" {
d.Payload = DispatchPayloadOptional
}
}
func (d *ParameterizedJobConfig) Copy() *ParameterizedJobConfig {
if d == nil {
return nil
}
nd := new(ParameterizedJobConfig)
*nd = *d
nd.MetaOptional = helper.CopySliceString(nd.MetaOptional)
nd.MetaRequired = helper.CopySliceString(nd.MetaRequired)
return nd
}
// DispatchedID returns an ID appropriate for a job dispatched against a
// particular parameterized job
func DispatchedID(templateID string, t time.Time) string {
u := GenerateUUID()[:8]
return fmt.Sprintf("%s%s%d-%s", templateID, DispatchLaunchSuffix, t.Unix(), u)
}
// DispatchPayloadConfig configures how a task gets its input from a job dispatch
type DispatchPayloadConfig struct {
// File specifies a relative path to where the input data should be written
File string
}
func (d *DispatchPayloadConfig) Copy() *DispatchPayloadConfig {
if d == nil {
return nil
}
nd := new(DispatchPayloadConfig)
*nd = *d
return nd
}
func (d *DispatchPayloadConfig) Validate() error {
// Verify the destination doesn't escape
escaped, err := PathEscapesAllocDir("task/local/", d.File)
if err != nil {
return fmt.Errorf("invalid destination path: %v", err)
} else if escaped {
return fmt.Errorf("destination escapes allocation directory")
}
return nil
}
var ( var (
defaultServiceJobRestartPolicy = RestartPolicy{ defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second, Delay: 15 * time.Second,
@ -1656,7 +1852,7 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.Tasks = tasks ntg.Tasks = tasks
} }
ntg.Meta = CopyMapStringString(ntg.Meta) ntg.Meta = helper.CopyMapStringString(ntg.Meta)
if tg.EphemeralDisk != nil { if tg.EphemeralDisk != nil {
ntg.EphemeralDisk = tg.EphemeralDisk.Copy() ntg.EphemeralDisk = tg.EphemeralDisk.Copy()
@ -1919,7 +2115,7 @@ func (s *Service) Copy() *Service {
} }
ns := new(Service) ns := new(Service)
*ns = *s *ns = *s
ns.Tags = CopySliceString(ns.Tags) ns.Tags = helper.CopySliceString(ns.Tags)
if s.Checks != nil { if s.Checks != nil {
checks := make([]*ServiceCheck, len(ns.Checks)) checks := make([]*ServiceCheck, len(ns.Checks))
@ -2076,6 +2272,9 @@ type Task struct {
// Resources is the resources needed by this task // Resources is the resources needed by this task
Resources *Resources Resources *Resources
// DispatchPayload configures how the task retrieves its input from a dispatch
DispatchPayload *DispatchPayloadConfig
// Meta is used to associate arbitrary metadata with this // Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad. // task. This is opaque to Nomad.
Meta map[string]string Meta map[string]string
@ -2098,7 +2297,7 @@ func (t *Task) Copy() *Task {
} }
nt := new(Task) nt := new(Task)
*nt = *t *nt = *t
nt.Env = CopyMapStringString(nt.Env) nt.Env = helper.CopyMapStringString(nt.Env)
if t.Services != nil { if t.Services != nil {
services := make([]*Service, len(nt.Services)) services := make([]*Service, len(nt.Services))
@ -2112,7 +2311,8 @@ func (t *Task) Copy() *Task {
nt.Vault = nt.Vault.Copy() nt.Vault = nt.Vault.Copy()
nt.Resources = nt.Resources.Copy() nt.Resources = nt.Resources.Copy()
nt.Meta = CopyMapStringString(nt.Meta) nt.Meta = helper.CopyMapStringString(nt.Meta)
nt.DispatchPayload = nt.DispatchPayload.Copy()
if t.Artifacts != nil { if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts)) artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))
@ -2277,6 +2477,13 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error {
} }
} }
// Validate the dispatch payload block if there
if t.DispatchPayload != nil {
if err := t.DispatchPayload.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Dispatch Payload validation failed: %v", err))
}
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
@ -2294,10 +2501,13 @@ func validateServices(t *Task) error {
outer := fmt.Errorf("service[%d] %+q validation failed: %s", i, service.Name, err) outer := fmt.Errorf("service[%d] %+q validation failed: %s", i, service.Name, err)
mErr.Errors = append(mErr.Errors, outer) mErr.Errors = append(mErr.Errors, outer)
} }
if _, ok := knownServices[service.Name]; ok {
// Ensure that services with the same name are not being registered for
// the same port
if _, ok := knownServices[service.Name+service.PortLabel]; ok {
mErr.Errors = append(mErr.Errors, fmt.Errorf("service %q is duplicate", service.Name)) mErr.Errors = append(mErr.Errors, fmt.Errorf("service %q is duplicate", service.Name))
} }
knownServices[service.Name] = struct{}{} knownServices[service.Name+service.PortLabel] = struct{}{}
if service.PortLabel != "" { if service.PortLabel != "" {
servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name) servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name)
@ -2379,6 +2589,9 @@ type Template struct {
// random wait between 0 and the given splay value before signalling the // random wait between 0 and the given splay value before signalling the
// application of a change // application of a change
Splay time.Duration `mapstructure:"splay"` Splay time.Duration `mapstructure:"splay"`
// Perms is the permission the file should be written out with.
Perms string `mapstructure:"perms"`
} }
// DefaultTemplate returns a default template. // DefaultTemplate returns a default template.
@ -2386,6 +2599,7 @@ func DefaultTemplate() *Template {
return &Template{ return &Template{
ChangeMode: TemplateChangeModeRestart, ChangeMode: TemplateChangeModeRestart,
Splay: 5 * time.Second, Splay: 5 * time.Second,
Perms: "0644",
} }
} }
@ -2418,7 +2632,7 @@ func (t *Template) Validate() error {
} }
// Verify the destination doesn't escape // Verify the destination doesn't escape
escaped, err := PathEscapesAllocDir(t.DestPath) escaped, err := PathEscapesAllocDir("task", t.DestPath)
if err != nil { if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err))
} else if escaped { } else if escaped {
@ -2441,6 +2655,13 @@ func (t *Template) Validate() error {
multierror.Append(&mErr, fmt.Errorf("Must specify positive splay value")) multierror.Append(&mErr, fmt.Errorf("Must specify positive splay value"))
} }
// Verify the permissions
if t.Perms != "" {
if _, err := strconv.ParseUint(t.Perms, 8, 12); err != nil {
multierror.Append(&mErr, fmt.Errorf("Failed to parse %q as octal: %v", t.Perms, err))
}
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
@ -2555,6 +2776,11 @@ const (
// TaskSiblingFailed indicates that a sibling task in the task group has // TaskSiblingFailed indicates that a sibling task in the task group has
// failed. // failed.
TaskSiblingFailed = "Sibling task failed" TaskSiblingFailed = "Sibling task failed"
// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"
) )
// TaskEvent is an event that effects the state of a task and contains meta-data // TaskEvent is an event that effects the state of a task and contains meta-data
@ -2613,6 +2839,9 @@ type TaskEvent struct {
// TaskSignal is the signal that was sent to the task // TaskSignal is the signal that was sent to the task
TaskSignal string TaskSignal string
// DriverMessage indicates a driver action being taken.
DriverMessage string
} }
func (te *TaskEvent) GoString() string { func (te *TaskEvent) GoString() string {
@ -2741,6 +2970,11 @@ func (e *TaskEvent) SetVaultRenewalError(err error) *TaskEvent {
return e return e
} }
func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent {
e.DriverMessage = m
return e
}
// TaskArtifact is an artifact to download before running the task. // TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct { type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter // GetterSource is the source to download an artifact using go-getter
@ -2761,7 +2995,7 @@ func (ta *TaskArtifact) Copy() *TaskArtifact {
} }
nta := new(TaskArtifact) nta := new(TaskArtifact)
*nta = *ta *nta = *ta
nta.GetterOptions = CopyMapStringString(ta.GetterOptions) nta.GetterOptions = helper.CopyMapStringString(ta.GetterOptions)
return nta return nta
} }
@ -2770,14 +3004,16 @@ func (ta *TaskArtifact) GoString() string {
} }
// PathEscapesAllocDir returns if the given path escapes the allocation // PathEscapesAllocDir returns if the given path escapes the allocation
// directory // directory. The prefix allows adding a prefix if the path will be joined, for
func PathEscapesAllocDir(path string) (bool, error) { // example a "task/local" prefix may be provided if the path will be joined
// against that prefix.
func PathEscapesAllocDir(prefix, path string) (bool, error) {
// Verify the destination doesn't escape the tasks directory // Verify the destination doesn't escape the tasks directory
alloc, err := filepath.Abs(filepath.Join("/", "foo/", "bar/")) alloc, err := filepath.Abs(filepath.Join("/", "alloc-dir/", "alloc-id/"))
if err != nil { if err != nil {
return false, err return false, err
} }
abs, err := filepath.Abs(filepath.Join(alloc, path)) abs, err := filepath.Abs(filepath.Join(alloc, prefix, path))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -2796,11 +3032,11 @@ func (ta *TaskArtifact) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified")) mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified"))
} }
escaped, err := PathEscapesAllocDir(ta.RelativeDest) escaped, err := PathEscapesAllocDir("task", ta.RelativeDest)
if err != nil { if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err)) mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid destination path: %v", err))
} else if escaped { } else if escaped {
mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes task's directory")) mErr.Errors = append(mErr.Errors, fmt.Errorf("destination escapes allocation directory"))
} }
// Verify the checksum // Verify the checksum
@ -3311,12 +3547,12 @@ func (a *AllocMetric) Copy() *AllocMetric {
} }
na := new(AllocMetric) na := new(AllocMetric)
*na = *a *na = *a
na.NodesAvailable = CopyMapStringInt(na.NodesAvailable) na.NodesAvailable = helper.CopyMapStringInt(na.NodesAvailable)
na.ClassFiltered = CopyMapStringInt(na.ClassFiltered) na.ClassFiltered = helper.CopyMapStringInt(na.ClassFiltered)
na.ConstraintFiltered = CopyMapStringInt(na.ConstraintFiltered) na.ConstraintFiltered = helper.CopyMapStringInt(na.ConstraintFiltered)
na.ClassExhausted = CopyMapStringInt(na.ClassExhausted) na.ClassExhausted = helper.CopyMapStringInt(na.ClassExhausted)
na.DimensionExhausted = CopyMapStringInt(na.DimensionExhausted) na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted)
na.Scores = CopyMapStringFloat64(na.Scores) na.Scores = helper.CopyMapStringFloat64(na.Scores)
return na return na
} }
@ -3829,7 +4065,7 @@ type RecoverableError struct {
// NewRecoverableError is used to wrap an error and mark it as recoverable or // NewRecoverableError is used to wrap an error and mark it as recoverable or
// not. // not.
func NewRecoverableError(e error, recoverable bool) *RecoverableError { func NewRecoverableError(e error, recoverable bool) error {
if e == nil { if e == nil {
return nil return nil
} }
@ -3843,3 +4079,12 @@ func NewRecoverableError(e error, recoverable bool) *RecoverableError {
func (r *RecoverableError) Error() string { func (r *RecoverableError) Error() string {
return r.Err return r.Err
} }
// IsRecoverable returns true if error is a RecoverableError with
// Recoverable=true. Otherwise false is returned.
func IsRecoverable(e error) bool {
if re, ok := e.(*RecoverableError); ok {
return re.Recoverable
}
return false
}

View File

@ -50,7 +50,6 @@ job "foo" {
resources { resources {
cpu = 20 cpu = 20
memory = 10 memory = 10
disk = 100
} }
logs { logs {