Merge pull request #16490 from hashicorp/jbardin/update-consul

update consul packages
This commit is contained in:
James Bardin 2017-10-30 11:19:34 -04:00 committed by GitHub
commit 4a01bf0b97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 471 additions and 136 deletions

View File

@ -15,14 +15,28 @@ func TestBackend_impl(t *testing.T) {
var _ backend.Backend = new(Backend) var _ backend.Backend = new(Backend)
} }
func newConsulTestServer(t *testing.T) *testutil.TestServer { var srv *testutil.TestServer
skip := os.Getenv("TF_ACC") == "" && os.Getenv("TF_CONSUL_TEST") == ""
if skip { func TestMain(m *testing.M) {
t.Log("consul server tests require setting TF_ACC or TF_CONSUL_TEST") if os.Getenv("TF_ACC") == "" && os.Getenv("TF_CONSUL_TEST") == "" {
t.Skip() fmt.Println("consul server tests require setting TF_ACC or TF_CONSUL_TEST")
return
} }
srv, _ := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { var err error
srv, err = newConsulTestServer()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
rc := m.Run()
srv.Stop()
os.Exit(rc)
}
func newConsulTestServer() (*testutil.TestServer, error) {
srv, err := testutil.NewTestServerConfig(func(c *testutil.TestServerConfig) {
c.LogLevel = "warn" c.LogLevel = "warn"
if !testing.Verbose() { if !testing.Verbose() {
@ -31,13 +45,10 @@ func newConsulTestServer(t *testing.T) *testutil.TestServer {
} }
}) })
return srv return srv, err
} }
func TestBackend(t *testing.T) { func TestBackend(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
path := fmt.Sprintf("tf-unit/%s", time.Now().String()) path := fmt.Sprintf("tf-unit/%s", time.Now().String())
// Get the backend. We need two to test locking. // Get the backend. We need two to test locking.
@ -56,9 +67,6 @@ func TestBackend(t *testing.T) {
} }
func TestBackend_lockDisabled(t *testing.T) { func TestBackend_lockDisabled(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
path := fmt.Sprintf("tf-unit/%s", time.Now().String()) path := fmt.Sprintf("tf-unit/%s", time.Now().String())
// Get the backend. We need two to test locking. // Get the backend. We need two to test locking.
@ -79,9 +87,6 @@ func TestBackend_lockDisabled(t *testing.T) {
} }
func TestBackend_gzip(t *testing.T) { func TestBackend_gzip(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
// Get the backend // Get the backend
b := backend.TestBackendConfig(t, New(), map[string]interface{}{ b := backend.TestBackendConfig(t, New(), map[string]interface{}{
"address": srv.HTTPAddr, "address": srv.HTTPAddr,

View File

@ -19,9 +19,6 @@ func TestRemoteClient_impl(t *testing.T) {
} }
func TestRemoteClient(t *testing.T) { func TestRemoteClient(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
// Get the backend // Get the backend
b := backend.TestBackendConfig(t, New(), map[string]interface{}{ b := backend.TestBackendConfig(t, New(), map[string]interface{}{
"address": srv.HTTPAddr, "address": srv.HTTPAddr,
@ -40,9 +37,6 @@ func TestRemoteClient(t *testing.T) {
// test the gzip functionality of the client // test the gzip functionality of the client
func TestRemoteClient_gzipUpgrade(t *testing.T) { func TestRemoteClient_gzipUpgrade(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
statePath := fmt.Sprintf("tf-unit/%s", time.Now().String()) statePath := fmt.Sprintf("tf-unit/%s", time.Now().String())
// Get the backend // Get the backend
@ -78,9 +72,6 @@ func TestRemoteClient_gzipUpgrade(t *testing.T) {
} }
func TestConsul_stateLock(t *testing.T) { func TestConsul_stateLock(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
path := fmt.Sprintf("tf-unit/%s", time.Now().String()) path := fmt.Sprintf("tf-unit/%s", time.Now().String())
// create 2 instances to get 2 remote.Clients // create 2 instances to get 2 remote.Clients
@ -104,9 +95,6 @@ func TestConsul_stateLock(t *testing.T) {
} }
func TestConsul_destroyLock(t *testing.T) { func TestConsul_destroyLock(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
// Get the backend // Get the backend
b := backend.TestBackendConfig(t, New(), map[string]interface{}{ b := backend.TestBackendConfig(t, New(), map[string]interface{}{
"address": srv.HTTPAddr, "address": srv.HTTPAddr,
@ -144,9 +132,6 @@ func TestConsul_destroyLock(t *testing.T) {
} }
func TestConsul_lostLock(t *testing.T) { func TestConsul_lostLock(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
path := fmt.Sprintf("tf-unit/%s", time.Now().String()) path := fmt.Sprintf("tf-unit/%s", time.Now().String())
// create 2 instances to get 2 remote.Clients // create 2 instances to get 2 remote.Clients
@ -194,9 +179,6 @@ func TestConsul_lostLock(t *testing.T) {
} }
func TestConsul_lostLockConnection(t *testing.T) { func TestConsul_lostLockConnection(t *testing.T) {
srv := newConsulTestServer(t)
defer srv.Stop()
// create an "unreliable" network by closing all the consul client's // create an "unreliable" network by closing all the consul client's
// network connections // network connections
conns := &unreliableConns{} conns := &unreliableConns{}
@ -225,31 +207,17 @@ func TestConsul_lostLockConnection(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// set a callback to know when the monitor loop re-connects // kill the connection a few times
dialed := make(chan struct{}) for i := 0; i < 3; i++ {
conns.dialCallback = func() { dialed := conns.dialedDone()
close(dialed)
conns.dialCallback = nil
}
// kill any open connections // kill any open connections
// once the consul client is fixed, we should loop over this a few time to
// be sure, since we can't hook into the client's internal lock monitor
// loop.
conns.Kill() conns.Kill()
// wait for a new connection to be dialed, and kill it again // wait for a new connection to be dialed, and kill it again
<-dialed <-dialed
conns.Kill() }
// since the lock monitor loop is hidden in the consul api client, we can if err := s.Unlock(id); err != nil {
// only wait a bit to make sure we were notified of the failure t.Fatal("unlock error:", err)
time.Sleep(time.Second)
// once the consul client can reconnect properly, there will no longer be
// an error here
//if err := s.Unlock(id); err != nil {
if err := s.Unlock(id); err != lostLockErr {
t.Fatalf("expected lost lock error, got %v", err)
} }
} }
@ -278,6 +246,18 @@ func (u *unreliableConns) DialContext(ctx context.Context, netw, addr string) (n
return conn, nil return conn, nil
} }
func (u *unreliableConns) dialedDone() chan struct{} {
u.Lock()
defer u.Unlock()
dialed := make(chan struct{})
u.dialCallback = func() {
defer close(dialed)
u.dialCallback = nil
}
return dialed
}
// Kill these with a deadline, just to make sure we don't end up with any EOFs // Kill these with a deadline, just to make sure we don't end up with any EOFs
// that get ignored. // that get ignored.
func (u *unreliableConns) Kill() { func (u *unreliableConns) Kill() {

View File

@ -42,6 +42,24 @@ func (c *Client) ACL() *ACL {
return &ACL{c} return &ACL{c}
} }
// Bootstrap is used to perform a one-time ACL bootstrap operation on a cluster
// to get the first management token.
func (a *ACL) Bootstrap() (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/bootstrap")
rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// Create is used to generate a new token with the given parameters // Create is used to generate a new token with the given parameters
func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) { func (a *ACL) Create(acl *ACLEntry, q *WriteOptions) (string, *WriteMeta, error) {
r := a.c.newRequest("PUT", "/v1/acl/create") r := a.c.newRequest("PUT", "/v1/acl/create")

View File

@ -44,6 +44,19 @@ type AgentMember struct {
DelegateCur uint8 DelegateCur uint8
} }
// AllSegments is used to select for all segments in MembersOpts.
const AllSegments = "_all"
// MembersOpts is used for querying member information.
type MembersOpts struct {
// WAN is whether to show members from the WAN.
WAN bool
// Segment is the LAN segment to show members for. Setting this to the
// AllSegments value above will show members in all segments.
Segment string
}
// AgentServiceRegistration is used to register a new service // AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct { type AgentServiceRegistration struct {
ID string `json:",omitempty"` ID string `json:",omitempty"`
@ -67,7 +80,8 @@ type AgentCheckRegistration struct {
// AgentServiceCheck is used to define a node or service level check // AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct { type AgentServiceCheck struct {
Script string `json:",omitempty"` Args []string `json:"ScriptArgs,omitempty"`
Script string `json:",omitempty"` // Deprecated, use Args.
DockerContainerID string `json:",omitempty"` DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker. Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"` Interval string `json:",omitempty"`
@ -91,6 +105,47 @@ type AgentServiceCheck struct {
} }
type AgentServiceChecks []*AgentServiceCheck type AgentServiceChecks []*AgentServiceCheck
// AgentToken is used when updating ACL tokens for an agent.
type AgentToken struct {
Token string
}
// Metrics info is used to store different types of metric values from the agent.
type MetricsInfo struct {
Timestamp string
Gauges []GaugeValue
Points []PointValue
Counters []SampledValue
Samples []SampledValue
}
// GaugeValue stores one value that is updated as time goes on, such as
// the amount of memory allocated.
type GaugeValue struct {
Name string
Value float32
Labels map[string]string
}
// PointValue holds a series of points for a metric.
type PointValue struct {
Name string
Points []float32
}
// SampledValue stores info about a metric that is incremented over time,
// such as the number of requests to an HTTP endpoint.
type SampledValue struct {
Name string
Count int
Sum float64
Min float64
Max float64
Mean float64
Stddev float64
Labels map[string]string
}
// Agent can be used to query the Agent endpoints // Agent can be used to query the Agent endpoints
type Agent struct { type Agent struct {
c *Client c *Client
@ -121,6 +176,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil return out, nil
} }
// Metrics is used to query the agent we are speaking to for
// its current internal metric data
func (a *Agent) Metrics() (*MetricsInfo, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out *MetricsInfo
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// Reload triggers a configuration reload for the agent we are connected to. // Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error { func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload") r := a.c.newRequest("PUT", "/v1/agent/reload")
@ -198,6 +270,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
return out, nil return out, nil
} }
// MembersOpts returns the known gossip members and can be passed
// additional options for WAN/segment filtering.
func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members")
r.params.Set("segment", opts.Segment)
if opts.WAN {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*AgentMember
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// ServiceRegister is used to register a new service with // ServiceRegister is used to register a new service with
// the local agent // the local agent
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {
@ -441,7 +535,8 @@ func (a *Agent) DisableNodeMaintenance() error {
// Monitor returns a channel which will receive streaming logs from the agent // Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop the // Providing a non-nil stopCh can be used to close the connection and stop the
// log stream // log stream. An empty string will be sent down the given channel when there's
// nothing left to stream, after which the caller should close the stopCh.
func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) {
r := a.c.newRequest("GET", "/v1/agent/monitor") r := a.c.newRequest("GET", "/v1/agent/monitor")
r.setQueryOptions(q) r.setQueryOptions(q)
@ -466,10 +561,61 @@ func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions
default: default:
} }
if scanner.Scan() { if scanner.Scan() {
logCh <- scanner.Text() // An empty string signals to the caller that
// the scan is done, so make sure we only emit
// that when the scanner says it's done, not if
// we happen to ingest an empty line.
if text := scanner.Text(); text != "" {
logCh <- text
} else {
logCh <- " "
}
} else {
logCh <- ""
} }
} }
}() }()
return logCh, nil return logCh, nil
} }
// UpdateACLToken updates the agent's "acl_token". See updateToken for more
// details.
func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_token", token, q)
}
// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken
// for more details.
func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_token", token, q)
}
// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See
// updateToken for more details.
func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_agent_master_token", token, q)
}
// UpdateACLReplicationToken updates the agent's "acl_replication_token". See
// updateToken for more details.
func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
return c.updateToken("acl_replication_token", token, q)
}
// updateToken can be used to update an agent's ACL token after the agent has
// started. The tokens are not persisted, so will need to be updated again if
// the agent is restarted.
func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
r.setWriteOptions(q)
r.obj = &AgentToken{Token: token}
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, err
}
resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
return wm, nil
}

View File

@ -446,6 +446,7 @@ func NewClient(config *Config) (*Client, error) {
if len(parts) == 2 { if len(parts) == 2 {
switch parts[0] { switch parts[0] {
case "http": case "http":
config.Scheme = "http"
case "https": case "https":
config.Scheme = "https" config.Scheme = "https"
case "unix": case "unix":
@ -462,10 +463,11 @@ func NewClient(config *Config) (*Client, error) {
config.Address = parts[1] config.Address = parts[1]
} }
client := &Client{ if config.Token == "" {
config: *config, config.Token = defConfig.Token
} }
return client, nil
return &Client{config: *config}, nil
} }
// NewHttpClient returns an http client configured with the given Transport and TLS // NewHttpClient returns an http client configured with the given Transport and TLS
@ -553,13 +555,20 @@ func durToMsec(dur time.Duration) string {
// serverError is a string we look for to detect 500 errors. // serverError is a string we look for to detect 500 errors.
const serverError = "Unexpected response code: 500" const serverError = "Unexpected response code: 500"
// IsServerError returns true for 500 errors from the Consul servers, these are // IsRetryableError returns true for 500 errors from the Consul servers, and
// usually retryable at a later time. // network connection errors. These are usually retryable at a later time.
func IsServerError(err error) bool { // This applies to reads but NOT to writes. This may return true for errors
// on writes that may have still gone through, so do not use this to retry
// any write operations.
func IsRetryableError(err error) bool {
if err == nil { if err == nil {
return false return false
} }
if _, ok := err.(net.Error); ok {
return true
}
// TODO (slackpad) - Make a real error type here instead of using // TODO (slackpad) - Make a real error type here instead of using
// a string check. // a string check.
return strings.Contains(err.Error(), serverError) return strings.Contains(err.Error(), serverError)
@ -652,7 +661,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
} }
start := time.Now() start := time.Now()
resp, err := c.config.HttpClient.Do(req) resp, err := c.config.HttpClient.Do(req)
diff := time.Now().Sub(start) diff := time.Since(start)
return diff, resp, err return diff, resp, err
} }

View File

@ -7,6 +7,7 @@ import (
// CoordinateEntry represents a node and its associated network coordinate. // CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct { type CoordinateEntry struct {
Node string Node string
Segment string
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
} }

View File

@ -252,7 +252,7 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti
if _, err := io.Copy(&buf, resp.Body); err != nil { if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err) return false, nil, fmt.Errorf("Failed to read response: %v", err)
} }
res := strings.Contains(string(buf.Bytes()), "true") res := strings.Contains(buf.String(), "true")
return res, qm, nil return res, qm, nil
} }
@ -296,7 +296,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
if _, err := io.Copy(&buf, resp.Body); err != nil { if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err) return false, nil, fmt.Errorf("Failed to read response: %v", err)
} }
res := strings.Contains(string(buf.Bytes()), "true") res := strings.Contains(buf.String(), "true")
return res, qm, nil return res, qm, nil
} }

View File

@ -180,7 +180,7 @@ WAIT:
// Handle the one-shot mode. // Handle the one-shot mode.
if l.opts.LockTryOnce && attempts > 0 { if l.opts.LockTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start) elapsed := time.Since(start)
if elapsed > qOpts.WaitTime { if elapsed > qOpts.WaitTime {
return nil, nil return nil, nil
} }
@ -370,7 +370,7 @@ RETRY:
// by doing retries. Note that we have to attempt the retry in a non- // by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry // blocking fashion so that we have a clean place to reset the retry
// counter if service is restored. // counter if service is restored.
if retries > 0 && IsServerError(err) { if retries > 0 && IsRetryableError(err) {
time.Sleep(l.opts.MonitorRetryTime) time.Sleep(l.opts.MonitorRetryTime)
retries-- retries--
opts.WaitIndex = 0 opts.WaitIndex = 0

View File

@ -25,6 +25,10 @@ type Area struct {
// RetryJoin specifies the address of Consul servers to join to, such as // RetryJoin specifies the address of Consul servers to join to, such as
// an IPs or hostnames with an optional port number. This is optional. // an IPs or hostnames with an optional port number. This is optional.
RetryJoin []string RetryJoin []string
// UseTLS specifies whether gossip over this area should be encrypted with TLS
// if possible.
UseTLS bool
} }
// AreaJoinResponse is returned when a join occurs and gives the result for each // AreaJoinResponse is returned when a join occurs and gives the result for each
@ -100,6 +104,27 @@ func (op *Operator) AreaCreate(area *Area, q *WriteOptions) (string, *WriteMeta,
return out.ID, wm, nil return out.ID, wm, nil
} }
// AreaUpdate will update the configuration of the network area with the given ID.
func (op *Operator) AreaUpdate(areaID string, area *Area, q *WriteOptions) (string, *WriteMeta, error) {
r := op.c.newRequest("PUT", "/v1/operator/area/"+areaID)
r.setWriteOptions(q)
r.obj = area
rtt, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{}
wm.RequestTime = rtt
var out struct{ ID string }
if err := decodeBody(resp, &out); err != nil {
return "", nil, err
}
return out.ID, wm, nil
}
// AreaGet returns a single network area. // AreaGet returns a single network area.
func (op *Operator) AreaGet(areaID string, q *QueryOptions) ([]*Area, *QueryMeta, error) { func (op *Operator) AreaGet(areaID string, q *QueryOptions) ([]*Area, *QueryMeta, error) {
var out []*Area var out []*Area

View File

@ -196,7 +196,7 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
if _, err := io.Copy(&buf, resp.Body); err != nil { if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err) return false, fmt.Errorf("Failed to read response: %v", err)
} }
res := strings.Contains(string(buf.Bytes()), "true") res := strings.Contains(buf.String(), "true")
return res, nil return res, nil
} }

View File

@ -13,6 +13,9 @@ type KeyringResponse struct {
// The datacenter name this request corresponds to // The datacenter name this request corresponds to
Datacenter string Datacenter string
// Segment has the network segment this request corresponds to.
Segment string
// A map of the encryption keys to the number of nodes they're installed on // A map of the encryption keys to the number of nodes they're installed on
Keys map[string]int Keys map[string]int

View File

@ -17,6 +17,9 @@ type RaftServer struct {
// Leader is true if this server is the current cluster leader. // Leader is true if this server is the current cluster leader.
Leader bool Leader bool
// Protocol version is the raft protocol version used by the server
ProtocolVersion string
// Voter is true if this server has a vote in the cluster. This might // Voter is true if this server has a vote in the cluster. This might
// be false if the server is staging and still coming online, or if // be false if the server is staging and still coming online, or if
// it's a non-voting server, which will be added in a future release of // it's a non-voting server, which will be added in a future release of
@ -24,7 +27,7 @@ type RaftServer struct {
Voter bool Voter bool
} }
// RaftConfigration is returned when querying for the current Raft configuration. // RaftConfiguration is returned when querying for the current Raft configuration.
type RaftConfiguration struct { type RaftConfiguration struct {
// Servers has the list of servers in the Raft configuration. // Servers has the list of servers in the Raft configuration.
Servers []*RaftServer Servers []*RaftServer

View File

@ -0,0 +1,11 @@
package api
// SegmentList returns all the available LAN segments.
func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) {
var out []string
qm, err := op.c.query("/v1/operator/segment", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -198,7 +198,7 @@ WAIT:
// Handle the one-shot mode. // Handle the one-shot mode.
if s.opts.SemaphoreTryOnce && attempts > 0 { if s.opts.SemaphoreTryOnce && attempts > 0 {
elapsed := time.Now().Sub(start) elapsed := time.Since(start)
if elapsed > qOpts.WaitTime { if elapsed > qOpts.WaitTime {
return nil, nil return nil, nil
} }
@ -492,7 +492,7 @@ RETRY:
// by doing retries. Note that we have to attempt the retry in a non- // by doing retries. Note that we have to attempt the retry in a non-
// blocking fashion so that we have a clean place to reset the retry // blocking fashion so that we have a clean place to reset the retry
// counter if service is restored. // counter if service is restored.
if retries > 0 && IsServerError(err) { if retries > 0 && IsRetryableError(err) {
time.Sleep(s.opts.MonitorRetryTime) time.Sleep(s.opts.MonitorRetryTime)
retries-- retries--
opts.WaitIndex = 0 opts.WaitIndex = 0

View File

@ -0,0 +1,139 @@
// Package freeport provides a helper for allocating free ports across multiple
// processes on the same machine.
package freeport
import (
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/mitchellh/go-testing-interface"
)
const (
// blockSize is the size of the allocated port block. ports are given out
// consecutively from that block with roll-over for the lifetime of the
// application/test run.
blockSize = 500
// maxBlocks is the number of available port blocks.
// lowPort + maxBlocks * blockSize must be less than 65535.
maxBlocks = 30
// lowPort is the lowest port number that should be used.
lowPort = 10000
// attempts is how often we try to allocate a port block
// before giving up.
attempts = 10
)
var (
// firstPort is the first port of the allocated block.
firstPort int
// lockLn is the system-wide mutex for the port block.
lockLn net.Listener
// mu guards nextPort
mu sync.Mutex
// once is used to do the initialization on the first call to retrieve free
// ports
once sync.Once
// port is the last allocated port.
port int
)
// initialize is used to initialize freeport.
func initialize() {
if lowPort+maxBlocks*blockSize > 65535 {
panic("freeport: block size too big or too many blocks requested")
}
rand.Seed(time.Now().UnixNano())
firstPort, lockLn = alloc()
}
// alloc reserves a port block for exclusive use for the lifetime of the
// application. lockLn serves as a system-wide mutex for the port block and is
// implemented as a TCP listener which is bound to the firstPort and which will
// be automatically released when the application terminates.
func alloc() (int, net.Listener) {
for i := 0; i < attempts; i++ {
block := int(rand.Int31n(int32(maxBlocks)))
firstPort := lowPort + block*blockSize
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", firstPort))
if err != nil {
continue
}
// log.Printf("[DEBUG] freeport: allocated port block %d (%d-%d)", block, firstPort, firstPort+blockSize-1)
return firstPort, ln
}
panic("freeport: cannot allocate port block")
}
func tcpAddr(ip string, port int) *net.TCPAddr {
return &net.TCPAddr{IP: net.ParseIP(ip), Port: port}
}
// Get wraps the Free function and panics on any failure retrieving ports.
func Get(n int) (ports []int) {
ports, err := Free(n)
if err != nil {
panic(err)
}
return ports
}
// GetT is suitable for use when retrieving unused ports in tests. If there is
// an error retrieving free ports, the test will be failed.
func GetT(t testing.T, n int) (ports []int) {
ports, err := Free(n)
if err != nil {
t.Fatalf("Failed retrieving free port: %v", err)
}
return ports
}
// Free returns a list of free ports from the allocated port block. It is safe
// to call this method concurrently. Ports have been tested to be available on
// 127.0.0.1 TCP but there is no guarantee that they will remain free in the
// future.
func Free(n int) (ports []int, err error) {
mu.Lock()
defer mu.Unlock()
if n > blockSize-1 {
return nil, fmt.Errorf("freeport: block size too small")
}
// Reserve a port block
once.Do(initialize)
for len(ports) < n {
port++
// roll-over the port
if port < firstPort+1 || port >= firstPort+blockSize {
port = firstPort + 1
}
// if the port is in use then skip it
ln, err := net.ListenTCP("tcp", tcpAddr("127.0.0.1", port))
if err != nil {
// log.Println("[DEBUG] freeport: port already in use: ", port)
continue
}
ln.Close()
ports = append(ports, port)
}
// log.Println("[DEBUG] freeport: free ports:", ports)
return ports, nil
}

View File

@ -27,6 +27,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
@ -47,9 +48,6 @@ type TestPortConfig struct {
SerfLan int `json:"serf_lan,omitempty"` SerfLan int `json:"serf_lan,omitempty"`
SerfWan int `json:"serf_wan,omitempty"` SerfWan int `json:"serf_wan,omitempty"`
Server int `json:"server,omitempty"` Server int `json:"server,omitempty"`
// Deprecated
RPC int `json:"rpc,omitempty"`
} }
// TestAddressConfig contains the bind addresses for various // TestAddressConfig contains the bind addresses for various
@ -58,6 +56,14 @@ type TestAddressConfig struct {
HTTP string `json:"http,omitempty"` HTTP string `json:"http,omitempty"`
} }
// TestNetworkSegment contains the configuration for a network segment.
type TestNetworkSegment struct {
Name string `json:"name"`
Bind string `json:"bind"`
Port int `json:"port"`
Advertise string `json:"advertise"`
}
// TestServerConfig is the main server configuration struct. // TestServerConfig is the main server configuration struct.
type TestServerConfig struct { type TestServerConfig struct {
NodeName string `json:"node_name"` NodeName string `json:"node_name"`
@ -68,6 +74,7 @@ type TestServerConfig struct {
Server bool `json:"server,omitempty"` Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"` DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"` Datacenter string `json:"datacenter,omitempty"`
Segments []TestNetworkSegment `json:"segments"`
DisableCheckpoint bool `json:"disable_update_check"` DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"` LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"` Bind string `json:"bind_addr,omitempty"`
@ -104,8 +111,9 @@ func defaultServerConfig() *TestServerConfig {
panic(err) panic(err)
} }
ports := freeport.Get(6)
return &TestServerConfig{ return &TestServerConfig{
NodeName: fmt.Sprintf("node%d", randomPort()), NodeName: "node-" + nodeID,
NodeID: nodeID, NodeID: nodeID,
DisableCheckpoint: true, DisableCheckpoint: true,
Performance: &TestPerformanceConfig{ Performance: &TestPerformanceConfig{
@ -117,28 +125,17 @@ func defaultServerConfig() *TestServerConfig {
Bind: "127.0.0.1", Bind: "127.0.0.1",
Addresses: &TestAddressConfig{}, Addresses: &TestAddressConfig{},
Ports: &TestPortConfig{ Ports: &TestPortConfig{
DNS: randomPort(), DNS: ports[0],
HTTP: randomPort(), HTTP: ports[1],
HTTPS: randomPort(), HTTPS: ports[2],
SerfLan: randomPort(), SerfLan: ports[3],
SerfWan: randomPort(), SerfWan: ports[4],
Server: randomPort(), Server: ports[5],
RPC: randomPort(),
}, },
ReadyTimeout: 10 * time.Second, ReadyTimeout: 10 * time.Second,
} }
} }
// randomPort asks the kernel for a random port to use.
func randomPort() int {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// TestService is used to serialize a service definition. // TestService is used to serialize a service definition.
type TestService struct { type TestService struct {
ID string `json:",omitempty"` ID string `json:",omitempty"`
@ -191,15 +188,7 @@ func NewTestServerConfig(cb ServerConfigCallback) (*TestServer, error) {
// configuring or starting the server, the server will NOT be running when the // configuring or starting the server, the server will NOT be running when the
// function returns (thus you do not need to stop it). // function returns (thus you do not need to stop it).
func NewTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) { func NewTestServerConfigT(t *testing.T, cb ServerConfigCallback) (*TestServer, error) {
var server *TestServer return newTestServerConfigT(t, cb)
retry.Run(t, func(r *retry.R) {
var err error
server, err = newTestServerConfigT(t, cb)
if err != nil {
r.Fatalf("failed starting test server: %v", err)
}
})
return server, nil
} }
// newTestServerConfigT is the internal helper for NewTestServerConfigT. // newTestServerConfigT is the internal helper for NewTestServerConfigT.

View File

@ -25,13 +25,13 @@ const (
// JoinLAN is used to join local datacenters together. // JoinLAN is used to join local datacenters together.
func (s *TestServer) JoinLAN(t *testing.T, addr string) { func (s *TestServer) JoinLAN(t *testing.T, addr string) {
resp := s.get(t, "/v1/agent/join/"+addr) resp := s.put(t, "/v1/agent/join/"+addr, nil)
defer resp.Body.Close() defer resp.Body.Close()
} }
// JoinWAN is used to join remote datacenters together. // JoinWAN is used to join remote datacenters together.
func (s *TestServer) JoinWAN(t *testing.T, addr string) { func (s *TestServer) JoinWAN(t *testing.T, addr string) {
resp := s.get(t, "/v1/agent/join/"+addr+"?wan=1") resp := s.put(t, "/v1/agent/join/"+addr+"?wan=1", nil)
resp.Body.Close() resp.Body.Close()
} }

34
vendor/vendor.json vendored
View File

@ -1421,29 +1421,35 @@
"revisionTime": "2016-11-07T20:49:10Z" "revisionTime": "2016-11-07T20:49:10Z"
}, },
{ {
"checksumSHA1": "IYuLg7xUzsf/P9rMpdEh1n9rbIY=", "checksumSHA1": "4hc6jGp9/1m7dp5ACRcGnspjO5E=",
"comment": "v0.6.3-28-g3215b87", "comment": "v0.6.3-28-g3215b87",
"path": "github.com/hashicorp/consul/api", "path": "github.com/hashicorp/consul/api",
"revision": "b79d951ced8c5f18fe73d35b2806f3435e40cd64", "revision": "610f3c86a089817b5bd5729a3b8c2db33a9ae2b0",
"revisionTime": "2017-07-20T03:19:26Z", "revisionTime": "2017-10-26T17:59:57Z",
"version": "v0.9.0", "version": "master",
"versionExact": "v0.9.0" "versionExact": "master"
}, },
{ {
"checksumSHA1": "++0PVBxbpylmllyCxSa7cdc6dDc=", "checksumSHA1": "hDJiPli3EEGJE4vAezMi05oOC7o=",
"path": "github.com/hashicorp/consul/lib/freeport",
"revision": "610f3c86a089817b5bd5729a3b8c2db33a9ae2b0",
"revisionTime": "2017-10-26T17:59:57Z"
},
{
"checksumSHA1": "gjN5oVYQR1K7hCbUq2ED4KBWuys=",
"path": "github.com/hashicorp/consul/testutil", "path": "github.com/hashicorp/consul/testutil",
"revision": "b79d951ced8c5f18fe73d35b2806f3435e40cd64", "revision": "610f3c86a089817b5bd5729a3b8c2db33a9ae2b0",
"revisionTime": "2017-07-20T03:19:26Z", "revisionTime": "2017-10-26T17:59:57Z",
"version": "v0.9.0", "version": "master",
"versionExact": "v0.9.0" "versionExact": "master"
}, },
{ {
"checksumSHA1": "J8TTDc84MvAyXE/FrfgS+xc/b6s=", "checksumSHA1": "J8TTDc84MvAyXE/FrfgS+xc/b6s=",
"path": "github.com/hashicorp/consul/testutil/retry", "path": "github.com/hashicorp/consul/testutil/retry",
"revision": "b79d951ced8c5f18fe73d35b2806f3435e40cd64", "revision": "610f3c86a089817b5bd5729a3b8c2db33a9ae2b0",
"revisionTime": "2017-07-20T03:19:26Z", "revisionTime": "2017-10-26T17:59:57Z",
"version": "v0.9.0", "version": "master",
"versionExact": "v0.9.0" "versionExact": "master"
}, },
{ {
"checksumSHA1": "cdOCt0Yb+hdErz8NAQqayxPmRsY=", "checksumSHA1": "cdOCt0Yb+hdErz8NAQqayxPmRsY=",