From b37a91cfbc26a4e5a0f9c553c688b3aa6de0659d Mon Sep 17 00:00:00 2001 From: Wade Simmons Date: Fri, 26 Jun 2020 13:45:48 -0400 Subject: [PATCH] add meta packet statistics (#230) This change add more metrics around "meta" (non "message" type packets). For lighthouse packets, we also record statistics around the specific lighthouse meta type. We don't keep statistics for the "message" type so that we don't slow down the fast path (and you can just look at metrics on the tun interface to find that information). --- connection_manager_test.go | 4 +- examples/config.yml | 9 ++++ handshake_ix.go | 2 + handshake_manager.go | 7 +++ hostmap.go | 9 ++++ inside.go | 9 +++- interface.go | 11 ++--- lighthouse.go | 28 ++++++++++- lighthouse_test.go | 4 +- main.go | 12 +++++ message_metrics.go | 97 ++++++++++++++++++++++++++++++++++++++ outside.go | 10 ++-- 12 files changed, 186 insertions(+), 16 deletions(-) create mode 100644 message_metrics.go diff --git a/connection_manager_test.go b/connection_manager_test.go index a59747d..b1b79de 100644 --- a/connection_manager_test.go +++ b/connection_manager_test.go @@ -28,7 +28,7 @@ func Test_NewConnectionManagerTest(t *testing.T) { rawCertificateNoKey: []byte{}, } - lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1) + lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false) ifce := &Interface{ hostMap: hostMap, inside: &Tun{}, @@ -91,7 +91,7 @@ func Test_NewConnectionManagerTest2(t *testing.T) { rawCertificateNoKey: []byte{}, } - lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1) + lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false) ifce := &Interface{ hostMap: hostMap, inside: &Tun{}, diff --git a/examples/config.yml b/examples/config.yml index b2d168e..9c43bf6 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -177,6 +177,15 @@ logging: #subsystem: nebula #interval: 10s + # enables counter metrics for meta packets + # e.g.: `messages.tx.handshake` + # NOTE: `message.{tx,rx}.recv_error` is always emitted + #message_metrics: false + + # enables detailed counter metrics for lighthouse packets + # e.g.: `lighthouse.rx.HostQuery` + #lighthouse_metrics: false + # Handshake Manger Settings #handshakes: # Total time to try a handshake = sequence of `try_interval * retries` diff --git a/handshake_ix.go b/handshake_ix.go index 32fbe57..e44df11 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -98,6 +98,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ hostinfo, _ := f.handshakeManager.pendingHostMap.QueryReverseIndex(hs.Details.InitiatorIndex) if hostinfo != nil && bytes.Equal(hostinfo.HandshakePacket[0], packet[HeaderLen:]) { if msg, ok := hostinfo.HandshakePacket[2]; ok { + f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1) err := f.outside.WriteTo(msg, addr) if err != nil { l.WithField("vpnIp", IntIp(hostinfo.hostId)).WithField("udpAddr", addr). @@ -191,6 +192,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ hostinfo.HandshakePacket[2] = make([]byte, len(msg)) copy(hostinfo.HandshakePacket[2], msg) + f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1) err := f.outside.WriteTo(msg, addr) if err != nil { l.WithField("vpnIp", IntIp(vpnIP)).WithField("udpAddr", addr). diff --git a/handshake_manager.go b/handshake_manager.go index 67e990a..1d23013 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -31,6 +31,8 @@ type HandshakeConfig struct { tryInterval time.Duration retries int waitRotation int + + messageMetrics *MessageMetrics } type HandshakeManager struct { @@ -42,6 +44,8 @@ type HandshakeManager struct { OutboundHandshakeTimer *SystemTimerWheel InboundHandshakeTimer *SystemTimerWheel + + messageMetrics *MessageMetrics } func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainHostMap *HostMap, lightHouse *LightHouse, outside *udpConn, config HandshakeConfig) *HandshakeManager { @@ -55,6 +59,8 @@ func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainH OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)), InboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)), + + messageMetrics: config.messageMetrics, } } @@ -111,6 +117,7 @@ func (c *HandshakeManager) NextOutboundHandshakeTimerTick(now time.Time, f EncWr // Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation if hostinfo.HandshakeReady && hostinfo.remote != nil { + c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1) err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote) if err != nil { hostinfo.logger().WithField("udpAddr", hostinfo.remote). diff --git a/hostmap.go b/hostmap.go index a93ffc5..1ecdb96 100644 --- a/hostmap.go +++ b/hostmap.go @@ -30,6 +30,7 @@ type HostMap struct { vpnCIDR *net.IPNet defaultRoute uint32 unsafeRoutes *CIDRTree + metricsEnabled bool } type HostInfo struct { @@ -384,8 +385,16 @@ func (hm *HostMap) PunchList() []*udpAddr { } func (hm *HostMap) Punchy(conn *udpConn) { + var metricsTxPunchy metrics.Counter + if hm.metricsEnabled { + metricsTxPunchy = metrics.GetOrRegisterCounter("messages.tx.punchy", nil) + } else { + metricsTxPunchy = metrics.NilCounter{} + } + for { for _, addr := range hm.PunchList() { + metricsTxPunchy.Inc(1) conn.WriteTo([]byte{1}, addr) } time.Sleep(time.Second * 30) diff --git a/inside.go b/inside.go index 7bba445..6e65559 100644 --- a/inside.go +++ b/inside.go @@ -46,7 +46,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket, dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs) if dropReason == nil { - f.send(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out) + f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out) if f.lightHouse != nil && *ci.messageCounter%5000 == 0 { f.lightHouse.Query(fwPacket.RemoteIP, f) } @@ -118,7 +118,7 @@ func (f *Interface) sendMessageNow(t NebulaMessageType, st NebulaMessageSubType, return } - f.send(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out) + f.sendNoMetrics(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out) if f.lightHouse != nil && *hostInfo.ConnectionState.messageCounter%5000 == 0 { f.lightHouse.Query(fp.RemoteIP, f) } @@ -175,6 +175,11 @@ func (f *Interface) sendMessageToAll(t NebulaMessageType, st NebulaMessageSubTyp } func (f *Interface) send(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) { + f.messageMetrics.Tx(t, st, 1) + f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out) +} + +func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) { if ci.eKey == nil { //TODO: log warning return diff --git a/interface.go b/interface.go index 0ecc0e8..9203f09 100644 --- a/interface.go +++ b/interface.go @@ -25,6 +25,7 @@ type InterfaceConfig struct { DropLocalBroadcast bool DropMulticast bool UDPBatchSize int + MessageMetrics *MessageMetrics } type Interface struct { @@ -45,9 +46,8 @@ type Interface struct { udpBatchSize int version string - metricRxRecvError metrics.Counter - metricTxRecvError metrics.Counter - metricHandshakes metrics.Histogram + metricHandshakes metrics.Histogram + messageMetrics *MessageMetrics } func NewInterface(c *InterfaceConfig) (*Interface, error) { @@ -80,9 +80,8 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) { dropMulticast: c.DropMulticast, udpBatchSize: c.UDPBatchSize, - metricRxRecvError: metrics.GetOrRegisterCounter("messages.rx.recv_error", nil), - metricTxRecvError: metrics.GetOrRegisterCounter("messages.tx.recv_error", nil), - metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)), + metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)), + messageMetrics: c.MessageMetrics, } ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval) diff --git a/lighthouse.go b/lighthouse.go index 5ccb4a6..9b5b1c7 100644 --- a/lighthouse.go +++ b/lighthouse.go @@ -7,6 +7,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/rcrowley/go-metrics" "github.com/slackhq/nebula/cert" ) @@ -37,6 +38,9 @@ type LightHouse struct { nebulaPort int punchBack bool punchDelay time.Duration + + metrics *MessageMetrics + metricHolepunchTx metrics.Counter } type EncWriter interface { @@ -44,7 +48,7 @@ type EncWriter interface { SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte) } -func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration) *LightHouse { +func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration, metricsEnabled bool) *LightHouse { h := LightHouse{ amLighthouse: amLighthouse, myIp: myIp, @@ -58,6 +62,14 @@ func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, n punchDelay: punchDelay, } + if metricsEnabled { + h.metrics = newLighthouseMetrics() + + h.metricHolepunchTx = metrics.GetOrRegisterCounter("messages.tx.holepunch", nil) + } else { + h.metricHolepunchTx = metrics.NilCounter{} + } + for _, ip := range ips { h.lighthouses[ip] = struct{}{} } @@ -111,6 +123,7 @@ func (lh *LightHouse) QueryServer(ip uint32, f EncWriter) { return } + lh.metricTx(NebulaMeta_HostQuery, int64(len(lh.lighthouses))) nb := make([]byte, 12, 12) out := make([]byte, mtu) for n := range lh.lighthouses { @@ -249,6 +262,7 @@ func (lh *LightHouse) LhUpdateWorker(f EncWriter) { }, } + lh.metricTx(NebulaMeta_HostUpdateNotification, int64(len(lh.lighthouses))) nb := make([]byte, 12, 12) out := make([]byte, mtu) for vpnIp := range lh.lighthouses { @@ -281,6 +295,8 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c return } + lh.metricRx(n.Type, 1) + switch n.Type { case NebulaMeta_HostQuery: // Exit if we don't answer queries @@ -308,6 +324,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply") return } + lh.metricTx(NebulaMeta_HostQueryReply, 1) f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, make([]byte, 12, 12), make([]byte, mtu)) // This signals the other side to punch some zero byte udp packets @@ -326,6 +343,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c }, } reply, _ := proto.Marshal(answer) + lh.metricTx(NebulaMeta_HostPunchNotification, 1) f.SendMessageToVpnIp(lightHouse, 0, n.Details.VpnIp, reply, make([]byte, 12, 12), make([]byte, mtu)) } //fmt.Println(reply, remoteaddr) @@ -362,6 +380,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c vpnPeer := NewUDPAddr(a.Ip, uint16(a.Port)) go func() { time.Sleep(lh.punchDelay) + lh.metricHolepunchTx.Inc(1) lh.punchConn.WriteTo(empty, vpnPeer) }() @@ -380,6 +399,13 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c } } +func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) { + lh.metrics.Rx(NebulaMessageType(t), 0, i) +} +func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) { + lh.metrics.Tx(NebulaMessageType(t), 0, i) +} + /* func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) { c := ci.messageCounter diff --git a/lighthouse_test.go b/lighthouse_test.go index 02b8ca7..d5ce880 100644 --- a/lighthouse_test.go +++ b/lighthouse_test.go @@ -52,7 +52,7 @@ func Test_lhStaticMapping(t *testing.T) { udpServer, _ := NewListener("0.0.0.0", 0, true) - meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1) + meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1, false) meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true) err := meh.ValidateLHStaticEntries() assert.Nil(t, err) @@ -60,7 +60,7 @@ func Test_lhStaticMapping(t *testing.T) { lh2 := "10.128.0.3" lh2IP := net.ParseIP(lh2) - meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1) + meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1, false) meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true) err = meh.ValidateLHStaticEntries() assert.EqualError(t, err, "Lighthouse 10.128.0.3 does not have a static_host_map entry") diff --git a/main.go b/main.go index 7088173..9eb6584 100644 --- a/main.go +++ b/main.go @@ -172,6 +172,7 @@ func Main(configPath string, configTest bool, buildVersion string) { hostMap := NewHostMap("main", tunCidr, preferredRanges) hostMap.SetDefaultRoute(ip2int(net.ParseIP(config.GetString("default_route", "0.0.0.0")))) hostMap.addUnsafeRoutes(&unsafeRoutes) + hostMap.metricsEnabled = config.GetBool("stats.message_metrics", false) l.WithField("network", hostMap.vpnCIDR).WithField("preferredRanges", hostMap.preferredRanges).Info("Main HostMap created") @@ -226,6 +227,7 @@ func Main(configPath string, configTest bool, buildVersion string) { udpServer, punchy.Respond, punchy.Delay, + config.GetBool("stats.lighthouse_metrics", false), ) remoteAllowList, err := config.GetAllowList("lighthouse.remote_allow_list", false) @@ -280,10 +282,19 @@ func Main(configPath string, configTest bool, buildVersion string) { l.WithError(err).Error("Lighthouse unreachable") } + var messageMetrics *MessageMetrics + if config.GetBool("stats.message_metrics", false) { + messageMetrics = newMessageMetrics() + } else { + messageMetrics = newMessageMetricsOnlyRecvError() + } + handshakeConfig := HandshakeConfig{ tryInterval: config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval), retries: config.GetInt("handshakes.retries", DefaultHandshakeRetries), waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation), + + messageMetrics: messageMetrics, } handshakeManager := NewHandshakeManager(tunCidr, preferredRanges, hostMap, lightHouse, udpServer, handshakeConfig) @@ -310,6 +321,7 @@ func Main(configPath string, configTest bool, buildVersion string) { DropLocalBroadcast: config.GetBool("tun.drop_local_broadcast", false), DropMulticast: config.GetBool("tun.drop_multicast", false), UDPBatchSize: config.GetInt("listen.batch", 64), + MessageMetrics: messageMetrics, } switch ifConfig.Cipher { diff --git a/message_metrics.go b/message_metrics.go new file mode 100644 index 0000000..ccd0207 --- /dev/null +++ b/message_metrics.go @@ -0,0 +1,97 @@ +package nebula + +import ( + "fmt" + + "github.com/rcrowley/go-metrics" +) + +type MessageMetrics struct { + rx [][]metrics.Counter + tx [][]metrics.Counter + + rxUnknown metrics.Counter + txUnknown metrics.Counter +} + +func (m *MessageMetrics) Rx(t NebulaMessageType, s NebulaMessageSubType, i int64) { + if m != nil { + if t >= 0 && int(t) < len(m.rx) && s >= 0 && int(s) < len(m.rx[t]) { + m.rx[t][s].Inc(i) + } else if m.rxUnknown != nil { + m.rxUnknown.Inc(i) + } + } +} +func (m *MessageMetrics) Tx(t NebulaMessageType, s NebulaMessageSubType, i int64) { + if m != nil { + if t >= 0 && int(t) < len(m.tx) && s >= 0 && int(s) < len(m.tx[t]) { + m.tx[t][s].Inc(i) + } else if m.txUnknown != nil { + m.txUnknown.Inc(i) + } + } +} + +func newMessageMetrics() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + return [][]metrics.Counter{ + { + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.handshake_ixpsk0", t), nil), + }, + nil, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)}, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.lighthouse", t), nil)}, + { + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_request", t), nil), + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_response", t), nil), + }, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.close_tunnel", t), nil)}, + } + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + + rxUnknown: metrics.GetOrRegisterCounter("messages.rx.other", nil), + txUnknown: metrics.GetOrRegisterCounter("messages.tx.other", nil), + } +} + +// Historically we only recorded recv_error, so this is backwards compat +func newMessageMetricsOnlyRecvError() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + return [][]metrics.Counter{ + nil, + nil, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)}, + } + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + } +} + +func newLighthouseMetrics() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + h := make([][]metrics.Counter, len(NebulaMeta_MessageType_name)) + used := []NebulaMeta_MessageType{ + NebulaMeta_HostQuery, + NebulaMeta_HostQueryReply, + NebulaMeta_HostUpdateNotification, + NebulaMeta_HostPunchNotification, + } + for _, i := range used { + h[i] = []metrics.Counter{metrics.GetOrRegisterCounter(fmt.Sprintf("lighthouse.%s.%s", t, i.String()), nil)} + } + return h + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + + rxUnknown: metrics.GetOrRegisterCounter("lighthouse.rx.other", nil), + txUnknown: metrics.GetOrRegisterCounter("lighthouse.tx.other", nil), + } +} diff --git a/outside.go b/outside.go index adc91ca..add7c62 100644 --- a/outside.go +++ b/outside.go @@ -54,6 +54,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // Fallthrough to the bottom to record incoming traffic case lightHouse: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -74,6 +75,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // Fallthrough to the bottom to record incoming traffic case test: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -102,15 +104,18 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // are unauthenticated case handshake: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) HandleIncomingHandshake(f, addr, packet, header, hostinfo) return case recvError: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) // TODO: Remove this with recv_error deprecation f.handleRecvError(addr, header) return case closeTunnel: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -122,6 +127,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, return default: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) hostinfo.logger().Debugf("Unexpected packet received from %s", addr) return } @@ -298,7 +304,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out } func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) { - f.metricTxRecvError.Inc(1) + f.messageMetrics.Tx(recvError, 0, 1) //TODO: this should be a signed message so we can trust that we should drop the index b := HeaderEncode(make([]byte, HeaderLen), Version, uint8(recvError), 0, index, 0) @@ -311,8 +317,6 @@ func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) { } func (f *Interface) handleRecvError(addr *udpAddr, h *Header) { - f.metricRxRecvError.Inc(1) - // This flag is to stop caring about recv_error from old versions // This should go away when the old version is gone from prod if l.Level >= logrus.DebugLevel {