Split cluster into multiple files
This commit is contained in:
parent
622cfce1ad
commit
13e1515f7d
|
@ -3,11 +3,8 @@ package cluster
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/costela/wesher/common"
|
"github.com/costela/wesher/common"
|
||||||
|
@ -19,12 +16,6 @@ import (
|
||||||
|
|
||||||
const KeyLen = 32
|
const KeyLen = 32
|
||||||
|
|
||||||
// State keeps track of information needed to rejoin the cluster
|
|
||||||
type State struct {
|
|
||||||
ClusterKey []byte
|
|
||||||
Nodes []common.Node
|
|
||||||
}
|
|
||||||
|
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
LocalName string // used to avoid LocalNode(); should not change
|
LocalName string // used to avoid LocalNode(); should not change
|
||||||
ml *memberlist.Memberlist
|
ml *memberlist.Memberlist
|
||||||
|
@ -33,8 +24,6 @@ type Cluster struct {
|
||||||
events chan memberlist.NodeEvent
|
events chan memberlist.NodeEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
const statePath = "/var/lib/wesher/state.json"
|
|
||||||
|
|
||||||
func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsName bool) (*Cluster, error) {
|
func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsName bool) (*Cluster, error) {
|
||||||
state := &State{}
|
state := &State{}
|
||||||
if !init {
|
if !init {
|
||||||
|
@ -76,25 +65,6 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam
|
||||||
return &cluster, nil
|
return &cluster, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) NotifyConflict(node, other *memberlist.Node) {
|
|
||||||
logrus.Errorf("node name conflict detected: %s", other.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Cluster) NodeMeta(limit int) []byte {
|
|
||||||
encoded, err := c.localNode.Encode(limit)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("failed to encode local node: %s", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return encoded
|
|
||||||
}
|
|
||||||
|
|
||||||
// none of these are used
|
|
||||||
func (c *Cluster) NotifyMsg([]byte) {}
|
|
||||||
func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte { return nil }
|
|
||||||
func (c *Cluster) LocalState(join bool) []byte { return nil }
|
|
||||||
func (c *Cluster) MergeRemoteState(buf []byte, join bool) {}
|
|
||||||
|
|
||||||
func (c *Cluster) Join(addrs []string) error {
|
func (c *Cluster) Join(addrs []string) error {
|
||||||
if len(addrs) == 0 {
|
if len(addrs) == 0 {
|
||||||
for _, n := range c.state.Nodes {
|
for _, n := range c.state.Nodes {
|
||||||
|
@ -176,34 +146,3 @@ func computeClusterKey(state *State, clusterKey []byte) ([]byte, error) {
|
||||||
state.ClusterKey = clusterKey
|
state.ClusterKey = clusterKey
|
||||||
return clusterKey, nil
|
return clusterKey, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cluster) saveState() error {
|
|
||||||
if err := os.MkdirAll(path.Dir(statePath), 0700); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
stateOut, err := json.MarshalIndent(c.state, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return ioutil.WriteFile(statePath, stateOut, 0600)
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadState(cs *State) {
|
|
||||||
content, err := ioutil.ReadFile(statePath)
|
|
||||||
if err != nil {
|
|
||||||
if !os.IsNotExist(err) {
|
|
||||||
logrus.Warnf("could not open state in %s: %s", statePath, err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// avoid partially unmarshalled content by using a temp var
|
|
||||||
csTmp := &State{}
|
|
||||||
if err := json.Unmarshal(content, csTmp); err != nil {
|
|
||||||
logrus.Warnf("could not decode state: %s", err)
|
|
||||||
} else {
|
|
||||||
*cs = *csTmp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/memberlist"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *Cluster) NotifyConflict(node, other *memberlist.Node) {
|
||||||
|
logrus.Errorf("node name conflict detected: %s", other.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cluster) NodeMeta(limit int) []byte {
|
||||||
|
encoded, err := c.localNode.Encode(limit)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("failed to encode local node: %s", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return encoded
|
||||||
|
}
|
||||||
|
|
||||||
|
// none of these are used
|
||||||
|
func (c *Cluster) NotifyMsg([]byte) {}
|
||||||
|
func (c *Cluster) GetBroadcasts(overhead, limit int) [][]byte { return nil }
|
||||||
|
func (c *Cluster) LocalState(join bool) []byte { return nil }
|
||||||
|
func (c *Cluster) MergeRemoteState(buf []byte, join bool) {}
|
|
@ -0,0 +1,50 @@
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
|
||||||
|
"github.com/costela/wesher/common"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// State keeps track of information needed to rejoin the cluster
|
||||||
|
type State struct {
|
||||||
|
ClusterKey []byte
|
||||||
|
Nodes []common.Node
|
||||||
|
}
|
||||||
|
|
||||||
|
const statePath = "/var/lib/wesher/state.json"
|
||||||
|
|
||||||
|
func (c *Cluster) saveState() error {
|
||||||
|
if err := os.MkdirAll(path.Dir(statePath), 0700); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
stateOut, err := json.MarshalIndent(c.state, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ioutil.WriteFile(statePath, stateOut, 0600)
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadState(cs *State) {
|
||||||
|
content, err := ioutil.ReadFile(statePath)
|
||||||
|
if err != nil {
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
logrus.Warnf("could not open state in %s: %s", statePath, err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// avoid partially unmarshalled content by using a temp var
|
||||||
|
csTmp := &State{}
|
||||||
|
if err := json.Unmarshal(content, csTmp); err != nil {
|
||||||
|
logrus.Warnf("could not decode state: %s", err)
|
||||||
|
} else {
|
||||||
|
*cs = *csTmp
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue