Implement the delegate interface on Node

Implementing on Node makes a bit more sense than implementing on Cluster
even if the difference in code is insignificant.
This commit is contained in:
kaiyou 2020-05-09 16:34:12 +02:00 committed by Leo Antunes
parent f715cfa787
commit faf1c35d96
2 changed files with 24 additions and 11 deletions

View File

@ -21,6 +21,7 @@ const KeyLen = 32
type Cluster struct { type Cluster struct {
LocalName string // used to avoid LocalNode(); should not change LocalName string // used to avoid LocalNode(); should not change
ml *memberlist.Memberlist ml *memberlist.Memberlist
mlConfig *memberlist.Config
localNode common.Node localNode common.Node
state *State state *State
events chan memberlist.NodeEvent events chan memberlist.NodeEvent
@ -57,15 +58,13 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam
cluster := Cluster{ cluster := Cluster{
LocalName: ml.LocalNode().Name, LocalName: ml.LocalNode().Name,
ml: ml, ml: ml,
mlConfig: mlConfig,
// The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23 // The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23
// More than this many simultaneous events will deadlock cluster.members() // More than this many simultaneous events will deadlock cluster.members()
events: make(chan memberlist.NodeEvent, 100), events: make(chan memberlist.NodeEvent, 100),
state: state, state: state,
} }
mlConfig.Conflict = &cluster cluster.setupDelegate()
mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: cluster.events}
mlConfig.Delegate = &cluster
return &cluster, nil return &cluster, nil
} }
@ -101,6 +100,7 @@ func (c *Cluster) Leave() {
// configuration // configuration
func (c *Cluster) Update(localNode common.Node) { func (c *Cluster) Update(localNode common.Node) {
c.localNode = localNode c.localNode = localNode
c.setupDelegate()
c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation
} }
@ -144,6 +144,13 @@ func (c *Cluster) Members() <-chan []common.Node {
return changes return changes
} }
func (c *Cluster) setupDelegate() {
delegate := delegateNode{&c.localNode}
c.mlConfig.Conflict = &delegate
c.mlConfig.Delegate = &delegate
c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events}
}
func computeClusterKey(state *State, clusterKey []byte) ([]byte, error) { func computeClusterKey(state *State, clusterKey []byte) ([]byte, error) {
if len(clusterKey) == 0 { if len(clusterKey) == 0 {
clusterKey = state.ClusterKey clusterKey = state.ClusterKey

View File

@ -1,20 +1,26 @@
package cluster package cluster
import ( import (
"github.com/costela/wesher/common"
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// DelegateNode implements the memberlist delegation interface
type delegateNode struct {
*common.Node
}
// NotifyConflict implements the memberlist deletage interface // NotifyConflict implements the memberlist deletage interface
func (c *Cluster) NotifyConflict(node, other *memberlist.Node) { func (n *delegateNode) NotifyConflict(node, other *memberlist.Node) {
logrus.Errorf("node name conflict detected: %s", other.Name) logrus.Errorf("node name conflict detected: %s", other.Name)
} }
// NodeMeta implements the memberlist deletage interface // NodeMeta implements the memberlist deletage interface
// Metadata is provided by the local node settings, encoding is handled // Metadata is provided by the local node settings, encoding is handled
// by the node implementation directly // by the node implementation directly
func (c *Cluster) NodeMeta(limit int) []byte { func (n *delegateNode) NodeMeta(limit int) []byte {
encoded, err := c.localNode.Encode(limit) encoded, err := n.Encode(limit)
if err != nil { if err != nil {
logrus.Errorf("failed to encode local node: %s", err) logrus.Errorf("failed to encode local node: %s", err)
return nil return nil
@ -23,13 +29,13 @@ func (c *Cluster) NodeMeta(limit int) []byte {
} }
// NotifyMsg implements the memberlist deletage interface // NotifyMsg implements the memberlist deletage interface
func (c *Cluster) NotifyMsg([]byte) {} func (n *delegateNode) NotifyMsg([]byte) {}
// GetBroadcasts implements the memberlist deletage interface // GetBroadcasts implements the memberlist deletage interface
func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte { return nil } func (n *delegateNode) GetBroadcasts(overhead, limit int) [][]byte { return nil }
// LocalState implements the memberlist deletage interface // LocalState implements the memberlist deletage interface
func (c *Cluster) LocalState(join bool) []byte { return nil } func (n *delegateNode) LocalState(join bool) []byte { return nil }
// MergeRemoteState implements the memberlist deletage interface // MergeRemoteState implements the memberlist deletage interface
func (c *Cluster) MergeRemoteState(buf []byte, join bool) {} func (n *delegateNode) MergeRemoteState(buf []byte, join bool) {}