diff --git a/cluster/cluster.go b/cluster/cluster.go index c8a0904..5077df5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -21,6 +21,7 @@ const KeyLen = 32 type Cluster struct { LocalName string // used to avoid LocalNode(); should not change ml *memberlist.Memberlist + mlConfig *memberlist.Config localNode common.Node state *State events chan memberlist.NodeEvent @@ -57,15 +58,13 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam cluster := Cluster{ LocalName: ml.LocalNode().Name, ml: ml, + mlConfig: mlConfig, // The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23 // More than this many simultaneous events will deadlock cluster.members() events: make(chan memberlist.NodeEvent, 100), state: state, } - mlConfig.Conflict = &cluster - mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: cluster.events} - mlConfig.Delegate = &cluster - + cluster.setupDelegate() return &cluster, nil } @@ -101,6 +100,7 @@ func (c *Cluster) Leave() { // configuration func (c *Cluster) Update(localNode common.Node) { c.localNode = localNode + c.setupDelegate() c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation } @@ -144,6 +144,13 @@ func (c *Cluster) Members() <-chan []common.Node { return changes } +func (c *Cluster) setupDelegate() { + delegate := delegateNode{&c.localNode} + c.mlConfig.Conflict = &delegate + c.mlConfig.Delegate = &delegate + c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events} +} + func computeClusterKey(state *State, clusterKey []byte) ([]byte, error) { if len(clusterKey) == 0 { clusterKey = state.ClusterKey diff --git a/cluster/delegate.go b/cluster/delegate.go index fa78b09..7c55577 100644 --- a/cluster/delegate.go +++ b/cluster/delegate.go @@ -1,20 +1,26 @@ package cluster import ( + "github.com/costela/wesher/common" "github.com/hashicorp/memberlist" "github.com/sirupsen/logrus" ) +// DelegateNode implements the memberlist delegation interface +type delegateNode struct { + *common.Node +} + // NotifyConflict implements the memberlist deletage interface -func (c *Cluster) NotifyConflict(node, other *memberlist.Node) { +func (n *delegateNode) NotifyConflict(node, other *memberlist.Node) { logrus.Errorf("node name conflict detected: %s", other.Name) } // NodeMeta implements the memberlist deletage interface // Metadata is provided by the local node settings, encoding is handled // by the node implementation directly -func (c *Cluster) NodeMeta(limit int) []byte { - encoded, err := c.localNode.Encode(limit) +func (n *delegateNode) NodeMeta(limit int) []byte { + encoded, err := n.Encode(limit) if err != nil { logrus.Errorf("failed to encode local node: %s", err) return nil @@ -23,13 +29,13 @@ func (c *Cluster) NodeMeta(limit int) []byte { } // NotifyMsg implements the memberlist deletage interface -func (c *Cluster) NotifyMsg([]byte) {} +func (n *delegateNode) NotifyMsg([]byte) {} // GetBroadcasts implements the memberlist deletage interface -func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte { return nil } +func (n *delegateNode) GetBroadcasts(overhead, limit int) [][]byte { return nil } // LocalState implements the memberlist deletage interface -func (c *Cluster) LocalState(join bool) []byte { return nil } +func (n *delegateNode) LocalState(join bool) []byte { return nil } // MergeRemoteState implements the memberlist deletage interface -func (c *Cluster) MergeRemoteState(buf []byte, join bool) {} +func (n *delegateNode) MergeRemoteState(buf []byte, join bool) {}