Parse metadata outside of cluster.go
Move the calls for metadata decoding from the cluster membership management loop to the main loop. This task was not directly related to the cluster, and was adding complexity, including the need for multierr structures.
This commit is contained in:
parent
366f906d5d
commit
0e799d6074
17
cluster.go
17
cluster.go
|
@ -15,7 +15,6 @@ import (
|
||||||
"github.com/mattn/go-isatty"
|
"github.com/mattn/go-isatty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"go.uber.org/multierr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClusterState keeps track of information needed to rejoin the cluster
|
// ClusterState keeps track of information needed to rejoin the cluster
|
||||||
|
@ -120,9 +119,8 @@ func (c *cluster) update() {
|
||||||
c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation
|
c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cluster) members() (<-chan []node, <-chan error) {
|
func (c *cluster) members() <-chan []node {
|
||||||
changes := make(chan []node)
|
changes := make(chan []node)
|
||||||
errc := make(chan error, 1)
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
event := <-c.events
|
event := <-c.events
|
||||||
|
@ -140,31 +138,22 @@ func (c *cluster) members() (<-chan []node, <-chan error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := make([]node, 0)
|
nodes := make([]node, 0)
|
||||||
var errs error
|
|
||||||
for _, n := range c.ml.Members() {
|
for _, n := range c.ml.Members() {
|
||||||
if n.Name == c.localName {
|
if n.Name == c.localName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
meta, err := decodeNodeMeta(n.Meta)
|
|
||||||
if err != nil {
|
|
||||||
errs = multierr.Append(errs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
nodes = append(nodes, node{
|
nodes = append(nodes, node{
|
||||||
Name: n.Name,
|
Name: n.Name,
|
||||||
Addr: n.Addr,
|
Addr: n.Addr,
|
||||||
nodeMeta: meta,
|
Meta: n.Meta,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
c.state.Nodes = nodes
|
c.state.Nodes = nodes
|
||||||
changes <- nodes
|
changes <- nodes
|
||||||
if errs != nil {
|
|
||||||
errc <- errs
|
|
||||||
}
|
|
||||||
c.saveState()
|
c.saveState()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return changes, errc
|
return changes
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeClusterKey(state *ClusterState, clusterKey []byte) ([]byte, error) {
|
func computeClusterKey(state *ClusterState, clusterKey []byte) ([]byte, error) {
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -16,7 +16,6 @@ require (
|
||||||
github.com/stevenroose/gonfig v0.1.5
|
github.com/stevenroose/gonfig v0.1.5
|
||||||
github.com/stretchr/testify v1.4.0 // indirect
|
github.com/stretchr/testify v1.4.0 // indirect
|
||||||
github.com/vishvananda/netlink v1.1.0
|
github.com/vishvananda/netlink v1.1.0
|
||||||
go.uber.org/multierr v1.5.0
|
|
||||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200324154536-ceff61240acf
|
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200324154536-ceff61240acf
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
16
main.go
16
main.go
|
@ -49,7 +49,7 @@ func main() {
|
||||||
wg.assignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.localName)
|
wg.assignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.localName)
|
||||||
cluster.update()
|
cluster.update()
|
||||||
|
|
||||||
nodec, errc := cluster.members() // avoid deadlocks by starting before join
|
nodec := cluster.members() // avoid deadlocks by starting before join
|
||||||
if err := backoff.RetryNotify(
|
if err := backoff.RetryNotify(
|
||||||
func() error { return cluster.join(config.Join) },
|
func() error { return cluster.join(config.Join) },
|
||||||
backoff.NewExponentialBackOff(),
|
backoff.NewExponentialBackOff(),
|
||||||
|
@ -65,9 +65,17 @@ func main() {
|
||||||
logrus.Debug("waiting for cluster events")
|
logrus.Debug("waiting for cluster events")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case nodes := <-nodec:
|
case rawNodes := <-nodec:
|
||||||
logrus.Info("cluster members:\n")
|
logrus.Info("cluster members:\n")
|
||||||
for _, node := range nodes {
|
nodes := make([]node, 0)
|
||||||
|
for _, node := range rawNodes {
|
||||||
|
meta, err := decodeNodeMeta(node.Meta)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Warnf("\t addr: %s, could not decode metadata", node.Addr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
node.nodeMeta = meta
|
||||||
|
nodes = append(nodes, node)
|
||||||
logrus.Infof("\taddr: %s, overlay: %s, pubkey: %s", node.Addr, node.OverlayAddr, node.PubKey)
|
logrus.Infof("\taddr: %s, overlay: %s, pubkey: %s", node.Addr, node.OverlayAddr, node.PubKey)
|
||||||
}
|
}
|
||||||
if err := wg.setUpInterface(nodes); err != nil {
|
if err := wg.setUpInterface(nodes); err != nil {
|
||||||
|
@ -79,8 +87,6 @@ func main() {
|
||||||
logrus.WithError(err).Error("could not write hosts entries")
|
logrus.WithError(err).Error("could not write hosts entries")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case errs := <-errc:
|
|
||||||
logrus.WithError(errs).Error("could not receive node info")
|
|
||||||
case <-incomingSigs:
|
case <-incomingSigs:
|
||||||
logrus.Info("terminating...")
|
logrus.Info("terminating...")
|
||||||
cluster.leave()
|
cluster.leave()
|
||||||
|
|
Loading…
Reference in New Issue