Lighthouse performance pass (#418)
This commit is contained in:
697
lighthouse.go
697
lighthouse.go
@ -1,6 +1,7 @@
|
||||
package nebula
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -10,19 +11,38 @@ import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/slackhq/nebula/cert"
|
||||
)
|
||||
|
||||
//TODO: if the pb code for ipv6 used a fixed data type we could save more work
|
||||
//TODO: nodes are roaming lighthouses, this is bad. How are they learning?
|
||||
//TODO: as a lh client, ignore any address within my nebula network?????
|
||||
|
||||
var ErrHostNotKnown = errors.New("host not known")
|
||||
|
||||
// The maximum number of ip addresses to store for a given vpnIp per address family
|
||||
const maxAddrs = 10
|
||||
|
||||
type ip4And6 struct {
|
||||
//TODO: adding a lock here could allow us to release the lock on lh.addrMap quicker
|
||||
|
||||
// v4 and v6 store addresses that have been self reported by the client
|
||||
v4 []*Ip4AndPort
|
||||
v6 []*Ip6AndPort
|
||||
|
||||
// Learned addresses are ones that a client does not know about but a lighthouse learned from as a result of the received packet
|
||||
learnedV4 []*Ip4AndPort
|
||||
learnedV6 []*Ip6AndPort
|
||||
}
|
||||
|
||||
type LightHouse struct {
|
||||
//TODO: We need a timer wheel to kick out vpnIps that haven't reported in a long time
|
||||
sync.RWMutex //Because we concurrently read and write to our maps
|
||||
amLighthouse bool
|
||||
myIp uint32
|
||||
punchConn *udpConn
|
||||
|
||||
// Local cache of answers from light houses
|
||||
addrMap map[uint32][]*udpAddr
|
||||
addrMap map[uint32]*ip4And6
|
||||
|
||||
// filters remote addresses allowed for each host
|
||||
// - When we are a lighthouse, this filters what addresses we store and
|
||||
@ -53,14 +73,13 @@ type LightHouse struct {
|
||||
|
||||
type EncWriter interface {
|
||||
SendMessageToVpnIp(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
|
||||
SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
|
||||
}
|
||||
|
||||
func NewLightHouse(l *logrus.Logger, amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort uint32, pc *udpConn, punchBack bool, punchDelay time.Duration, metricsEnabled bool) *LightHouse {
|
||||
h := LightHouse{
|
||||
amLighthouse: amLighthouse,
|
||||
myIp: myIp,
|
||||
addrMap: make(map[uint32][]*udpAddr),
|
||||
addrMap: make(map[uint32]*ip4And6),
|
||||
nebulaPort: nebulaPort,
|
||||
lighthouses: make(map[uint32]struct{}),
|
||||
staticList: make(map[uint32]struct{}),
|
||||
@ -110,13 +129,14 @@ func (lh *LightHouse) ValidateLHStaticEntries() error {
|
||||
}
|
||||
|
||||
func (lh *LightHouse) Query(ip uint32, f EncWriter) ([]*udpAddr, error) {
|
||||
//TODO: we need to hold the lock through the next func
|
||||
if !lh.IsLighthouseIP(ip) {
|
||||
lh.QueryServer(ip, f)
|
||||
}
|
||||
lh.RLock()
|
||||
if v, ok := lh.addrMap[ip]; ok {
|
||||
lh.RUnlock()
|
||||
return v, nil
|
||||
return TransformLHReplyToUdpAddrs(v), nil
|
||||
}
|
||||
lh.RUnlock()
|
||||
return nil, ErrHostNotKnown
|
||||
@ -141,17 +161,29 @@ func (lh *LightHouse) QueryServer(ip uint32, f EncWriter) {
|
||||
}
|
||||
}
|
||||
|
||||
// Query our local lighthouse cached results
|
||||
func (lh *LightHouse) QueryCache(ip uint32) []*udpAddr {
|
||||
//TODO: we need to hold the lock through the next func
|
||||
lh.RLock()
|
||||
if v, ok := lh.addrMap[ip]; ok {
|
||||
lh.RUnlock()
|
||||
return v
|
||||
return TransformLHReplyToUdpAddrs(v)
|
||||
}
|
||||
lh.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
func (lh *LightHouse) queryAndPrepMessage(ip uint32, f func(*ip4And6) (int, error)) (bool, int, error) {
|
||||
lh.RLock()
|
||||
if v, ok := lh.addrMap[ip]; ok {
|
||||
n, err := f(v)
|
||||
lh.RUnlock()
|
||||
return true, n, err
|
||||
}
|
||||
lh.RUnlock()
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
func (lh *LightHouse) DeleteVpnIP(vpnIP uint32) {
|
||||
// First we check the static mapping
|
||||
// and do nothing if it is there
|
||||
@ -161,11 +193,46 @@ func (lh *LightHouse) DeleteVpnIP(vpnIP uint32) {
|
||||
lh.Lock()
|
||||
//l.Debugln(lh.addrMap)
|
||||
delete(lh.addrMap, vpnIP)
|
||||
lh.l.Debugf("deleting %s from lighthouse.", IntIp(vpnIP))
|
||||
|
||||
if lh.l.Level >= logrus.DebugLevel {
|
||||
lh.l.Debugf("deleting %s from lighthouse.", IntIp(vpnIP))
|
||||
}
|
||||
|
||||
lh.Unlock()
|
||||
}
|
||||
|
||||
func (lh *LightHouse) AddRemote(vpnIP uint32, toIp *udpAddr, static bool) {
|
||||
// AddRemote is correct way for non LightHouse members to add an address. toAddr will be placed in the learned map
|
||||
// static means this is a static host entry from the config file, it should only be used on start up
|
||||
func (lh *LightHouse) AddRemote(vpnIP uint32, toAddr *udpAddr, static bool) {
|
||||
if ipv4 := toAddr.IP.To4(); ipv4 != nil {
|
||||
lh.addRemoteV4(vpnIP, NewIp4AndPort(ipv4, uint32(toAddr.Port)), static, true)
|
||||
} else {
|
||||
lh.addRemoteV6(vpnIP, NewIp6AndPort(toAddr.IP, uint32(toAddr.Port)), static, true)
|
||||
}
|
||||
|
||||
//TODO: if we do not add due to a config filter we may end up not having any addresses here
|
||||
if static {
|
||||
lh.staticList[vpnIP] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// unsafeGetAddrs assumes you have the lh lock
|
||||
func (lh *LightHouse) unsafeGetAddrs(vpnIP uint32) *ip4And6 {
|
||||
am, ok := lh.addrMap[vpnIP]
|
||||
if !ok {
|
||||
am = &ip4And6{
|
||||
v4: make([]*Ip4AndPort, 0),
|
||||
v6: make([]*Ip6AndPort, 0),
|
||||
learnedV4: make([]*Ip4AndPort, 0),
|
||||
learnedV6: make([]*Ip6AndPort, 0),
|
||||
}
|
||||
lh.addrMap[vpnIP] = am
|
||||
}
|
||||
return am
|
||||
}
|
||||
|
||||
// addRemoteV4 is a lighthouse internal method that prepends a remote if it is allowed by the allow list and not duplicated
|
||||
func (lh *LightHouse) addRemoteV4(vpnIP uint32, to *Ip4AndPort, static bool, learned bool) {
|
||||
// First we check if the sender thinks this is a static entry
|
||||
// and do nothing if it is not, but should be considered static
|
||||
if static == false {
|
||||
@ -176,24 +243,108 @@ func (lh *LightHouse) AddRemote(vpnIP uint32, toIp *udpAddr, static bool) {
|
||||
|
||||
lh.Lock()
|
||||
defer lh.Unlock()
|
||||
for _, v := range lh.addrMap[vpnIP] {
|
||||
if v.Equals(toIp) {
|
||||
am := lh.unsafeGetAddrs(vpnIP)
|
||||
|
||||
if learned {
|
||||
if !lh.unlockedShouldAddV4(am.learnedV4, to) {
|
||||
return
|
||||
}
|
||||
am.learnedV4 = prependAndLimitV4(am.learnedV4, to)
|
||||
} else {
|
||||
if !lh.unlockedShouldAddV4(am.v4, to) {
|
||||
return
|
||||
}
|
||||
am.v4 = prependAndLimitV4(am.v4, to)
|
||||
}
|
||||
}
|
||||
|
||||
func prependAndLimitV4(cache []*Ip4AndPort, to *Ip4AndPort) []*Ip4AndPort {
|
||||
cache = append(cache, nil)
|
||||
copy(cache[1:], cache)
|
||||
cache[0] = to
|
||||
if len(cache) > MaxRemotes {
|
||||
cache = cache[:maxAddrs]
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
// unlockedShouldAddV4 checks if to is allowed by our allow list and is not already present in the cache
|
||||
func (lh *LightHouse) unlockedShouldAddV4(am []*Ip4AndPort, to *Ip4AndPort) bool {
|
||||
ip := int2ip(to.Ip)
|
||||
allow := lh.remoteAllowList.Allow(ip)
|
||||
if lh.l.Level >= logrus.DebugLevel {
|
||||
lh.l.WithField("remoteIp", ip).WithField("allow", allow).Debug("remoteAllowList.Allow")
|
||||
}
|
||||
|
||||
if !allow {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, v := range am {
|
||||
if v.Ip == to.Ip && v.Port == to.Port {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// addRemoteV6 is a lighthouse internal method that prepends a remote if it is allowed by the allow list and not duplicated
|
||||
func (lh *LightHouse) addRemoteV6(vpnIP uint32, to *Ip6AndPort, static bool, learned bool) {
|
||||
// First we check if the sender thinks this is a static entry
|
||||
// and do nothing if it is not, but should be considered static
|
||||
if static == false {
|
||||
if _, ok := lh.staticList[vpnIP]; ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
allow := lh.remoteAllowList.Allow(toIp.IP)
|
||||
lh.l.WithField("remoteIp", toIp).WithField("allow", allow).Debug("remoteAllowList.Allow")
|
||||
lh.Lock()
|
||||
defer lh.Unlock()
|
||||
am := lh.unsafeGetAddrs(vpnIP)
|
||||
|
||||
if learned {
|
||||
if !lh.unlockedShouldAddV6(am.learnedV6, to) {
|
||||
return
|
||||
}
|
||||
am.learnedV6 = prependAndLimitV6(am.learnedV6, to)
|
||||
} else {
|
||||
if !lh.unlockedShouldAddV6(am.v6, to) {
|
||||
return
|
||||
}
|
||||
am.v6 = prependAndLimitV6(am.v6, to)
|
||||
}
|
||||
}
|
||||
|
||||
func prependAndLimitV6(cache []*Ip6AndPort, to *Ip6AndPort) []*Ip6AndPort {
|
||||
cache = append(cache, nil)
|
||||
copy(cache[1:], cache)
|
||||
cache[0] = to
|
||||
if len(cache) > MaxRemotes {
|
||||
cache = cache[:maxAddrs]
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
// unlockedShouldAddV6 checks if to is allowed by our allow list and is not already present in the cache
|
||||
func (lh *LightHouse) unlockedShouldAddV6(am []*Ip6AndPort, to *Ip6AndPort) bool {
|
||||
ip := net.IP(to.Ip)
|
||||
allow := lh.remoteAllowList.Allow(ip)
|
||||
if lh.l.Level >= logrus.DebugLevel {
|
||||
lh.l.WithField("remoteIp", ip).WithField("allow", allow).Debug("remoteAllowList.Allow")
|
||||
}
|
||||
|
||||
if !allow {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
//l.Debugf("Adding reply of %s as %s\n", IntIp(vpnIP), toIp)
|
||||
if static {
|
||||
lh.staticList[vpnIP] = struct{}{}
|
||||
for _, v := range am {
|
||||
if bytes.Equal(v.Ip, to.Ip) && v.Port == to.Port {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
lh.addrMap[vpnIP] = append(lh.addrMap[vpnIP], toIp.Copy())
|
||||
return true
|
||||
}
|
||||
|
||||
func (lh *LightHouse) AddRemoteAndReset(vpnIP uint32, toIp *udpAddr) {
|
||||
@ -201,7 +352,6 @@ func (lh *LightHouse) AddRemoteAndReset(vpnIP uint32, toIp *udpAddr) {
|
||||
lh.DeleteVpnIP(vpnIP)
|
||||
lh.AddRemote(vpnIP, toIp, false)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (lh *LightHouse) IsLighthouseIP(vpnIP uint32) bool {
|
||||
@ -220,32 +370,20 @@ func NewLhQueryByInt(VpnIp uint32) *NebulaMeta {
|
||||
}
|
||||
}
|
||||
|
||||
type ip4Or6 struct {
|
||||
v4 IpAndPort
|
||||
v6 Ip6AndPort
|
||||
func NewIp4AndPort(ip net.IP, port uint32) *Ip4AndPort {
|
||||
ipp := Ip4AndPort{Port: port}
|
||||
ipp.Ip = ip2int(ip)
|
||||
return &ipp
|
||||
}
|
||||
|
||||
func NewIpAndPort(ip net.IP, port uint32) ip4Or6 {
|
||||
ipp := ip4Or6{}
|
||||
|
||||
if ipv4 := ip.To4(); ipv4 != nil {
|
||||
ipp.v4 = IpAndPort{Port: port}
|
||||
ipp.v4.Ip = ip2int(ip)
|
||||
|
||||
} else {
|
||||
ipp.v6 = Ip6AndPort{Port: port}
|
||||
ipp.v6.Ip = make([]byte, len(ip))
|
||||
copy(ipp.v6.Ip, ip)
|
||||
}
|
||||
|
||||
return ipp
|
||||
func NewIp6AndPort(ip net.IP, port uint32) *Ip6AndPort {
|
||||
ipp := Ip6AndPort{Port: port}
|
||||
ipp.Ip = make([]byte, len(ip))
|
||||
copy(ipp.Ip, ip)
|
||||
return &ipp
|
||||
}
|
||||
|
||||
func NewIpAndPortFromUDPAddr(addr *udpAddr) ip4Or6 {
|
||||
return NewIpAndPort(addr.IP, uint32(addr.Port))
|
||||
}
|
||||
|
||||
func NewUDPAddrFromLH4(ipp *IpAndPort) *udpAddr {
|
||||
func NewUDPAddrFromLH4(ipp *Ip4AndPort) *udpAddr {
|
||||
ip := ipp.Ip
|
||||
return NewUDPAddr(
|
||||
net.IPv4(byte(ip&0xff000000>>24), byte(ip&0x00ff0000>>16), byte(ip&0x0000ff00>>8), byte(ip&0x000000ff)),
|
||||
@ -269,26 +407,26 @@ func (lh *LightHouse) LhUpdateWorker(f EncWriter) {
|
||||
}
|
||||
|
||||
func (lh *LightHouse) SendUpdate(f EncWriter) {
|
||||
var v4 []*IpAndPort
|
||||
var v4 []*Ip4AndPort
|
||||
var v6 []*Ip6AndPort
|
||||
|
||||
for _, e := range *localIps(lh.l, lh.localAllowList) {
|
||||
// Only add IPs that aren't my VPN/tun IP
|
||||
if ip2int(e) != lh.myIp {
|
||||
ipp := NewIpAndPort(e, lh.nebulaPort)
|
||||
if len(ipp.v6.Ip) > 0 {
|
||||
v6 = append(v6, &ipp.v6)
|
||||
} else {
|
||||
v4 = append(v4, &ipp.v4)
|
||||
}
|
||||
if ip2int(e) == lh.myIp {
|
||||
continue
|
||||
}
|
||||
|
||||
// Only add IPs that aren't my VPN/tun IP
|
||||
if ip := e.To4(); ip != nil {
|
||||
v4 = append(v4, NewIp4AndPort(e, lh.nebulaPort))
|
||||
} else {
|
||||
v6 = append(v6, NewIp6AndPort(e, lh.nebulaPort))
|
||||
}
|
||||
}
|
||||
m := &NebulaMeta{
|
||||
Type: NebulaMeta_HostUpdateNotification,
|
||||
Details: &NebulaMetaDetails{
|
||||
VpnIp: lh.myIp,
|
||||
IpAndPorts: v4,
|
||||
Ip4AndPorts: v4,
|
||||
Ip6AndPorts: v6,
|
||||
},
|
||||
}
|
||||
@ -298,7 +436,7 @@ func (lh *LightHouse) SendUpdate(f EncWriter) {
|
||||
out := make([]byte, mtu)
|
||||
for vpnIp := range lh.lighthouses {
|
||||
mm, err := proto.Marshal(m)
|
||||
if err != nil {
|
||||
if err != nil && lh.l.Level >= logrus.DebugLevel {
|
||||
lh.l.Debugf("Invalid marshal to update")
|
||||
}
|
||||
//l.Error("LIGHTHOUSE PACKET SEND", mm)
|
||||
@ -311,9 +449,9 @@ type LightHouseHandler struct {
|
||||
lh *LightHouse
|
||||
nb []byte
|
||||
out []byte
|
||||
pb []byte
|
||||
meta *NebulaMeta
|
||||
iap []ip4Or6
|
||||
iapp []*ip4Or6
|
||||
l *logrus.Logger
|
||||
}
|
||||
|
||||
func (lh *LightHouse) NewRequestHandler() *LightHouseHandler {
|
||||
@ -321,288 +459,283 @@ func (lh *LightHouse) NewRequestHandler() *LightHouseHandler {
|
||||
lh: lh,
|
||||
nb: make([]byte, 12, 12),
|
||||
out: make([]byte, mtu),
|
||||
l: lh.l,
|
||||
pb: make([]byte, mtu),
|
||||
|
||||
meta: &NebulaMeta{
|
||||
Details: &NebulaMetaDetails{},
|
||||
},
|
||||
}
|
||||
|
||||
lhh.resizeIpAndPorts(10)
|
||||
|
||||
return lhh
|
||||
}
|
||||
|
||||
func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) {
|
||||
lh.metrics.Rx(NebulaMessageType(t), 0, i)
|
||||
}
|
||||
|
||||
func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) {
|
||||
lh.metrics.Tx(NebulaMessageType(t), 0, i)
|
||||
}
|
||||
|
||||
// This method is similar to Reset(), but it re-uses the pointer structs
|
||||
// so that we don't have to re-allocate them
|
||||
func (lhh *LightHouseHandler) resetMeta() *NebulaMeta {
|
||||
details := lhh.meta.Details
|
||||
|
||||
details.Reset()
|
||||
lhh.meta.Reset()
|
||||
|
||||
// Keep the array memory around
|
||||
details.Ip4AndPorts = details.Ip4AndPorts[:0]
|
||||
details.Ip6AndPorts = details.Ip6AndPorts[:0]
|
||||
lhh.meta.Details = details
|
||||
|
||||
return lhh.meta
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) resizeIpAndPorts(n int) {
|
||||
if cap(lhh.iap) < n {
|
||||
lhh.iap = make([]ip4Or6, n)
|
||||
lhh.iapp = make([]*ip4Or6, n)
|
||||
|
||||
for i := range lhh.iap {
|
||||
lhh.iapp[i] = &lhh.iap[i]
|
||||
}
|
||||
}
|
||||
lhh.iap = lhh.iap[:n]
|
||||
lhh.iapp = lhh.iapp[:n]
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) setIpAndPortsFromNetIps(ips []*udpAddr) []*ip4Or6 {
|
||||
lhh.resizeIpAndPorts(len(ips))
|
||||
for i, e := range ips {
|
||||
lhh.iap[i] = NewIpAndPortFromUDPAddr(e)
|
||||
}
|
||||
return lhh.iapp
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *cert.NebulaCertificate, f EncWriter) {
|
||||
lh := lhh.lh
|
||||
//TODO: do we need c here?
|
||||
func (lhh *LightHouseHandler) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, w EncWriter) {
|
||||
n := lhh.resetMeta()
|
||||
err := proto.UnmarshalMerge(p, n)
|
||||
err := n.Unmarshal(p)
|
||||
if err != nil {
|
||||
lh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
|
||||
lhh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
|
||||
Error("Failed to unmarshal lighthouse packet")
|
||||
//TODO: send recv_error?
|
||||
return
|
||||
}
|
||||
|
||||
if n.Details == nil {
|
||||
lh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
|
||||
lhh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
|
||||
Error("Invalid lighthouse update")
|
||||
//TODO: send recv_error?
|
||||
return
|
||||
}
|
||||
|
||||
lh.metricRx(n.Type, 1)
|
||||
lhh.lh.metricRx(n.Type, 1)
|
||||
|
||||
switch n.Type {
|
||||
case NebulaMeta_HostQuery:
|
||||
// Exit if we don't answer queries
|
||||
if !lh.amLighthouse {
|
||||
lh.l.Debugln("I don't answer queries, but received from: ", rAddr)
|
||||
return
|
||||
}
|
||||
|
||||
//l.Debugln("Got Query")
|
||||
ips, err := lh.Query(n.Details.VpnIp, f)
|
||||
if err != nil {
|
||||
//l.Debugf("Can't answer query %s from %s because error: %s", IntIp(n.Details.VpnIp), rAddr, err)
|
||||
return
|
||||
} else {
|
||||
reqVpnIP := n.Details.VpnIp
|
||||
n = lhh.resetMeta()
|
||||
n.Type = NebulaMeta_HostQueryReply
|
||||
n.Details.VpnIp = reqVpnIP
|
||||
|
||||
v4s := make([]*IpAndPort, 0)
|
||||
v6s := make([]*Ip6AndPort, 0)
|
||||
for _, v := range lhh.setIpAndPortsFromNetIps(ips) {
|
||||
if len(v.v6.Ip) > 0 {
|
||||
v6s = append(v6s, &v.v6)
|
||||
} else {
|
||||
v4s = append(v4s, &v.v4)
|
||||
}
|
||||
}
|
||||
|
||||
if len(v4s) > 0 {
|
||||
n.Details.IpAndPorts = v4s
|
||||
}
|
||||
|
||||
if len(v6s) > 0 {
|
||||
n.Details.Ip6AndPorts = v6s
|
||||
}
|
||||
|
||||
reply, err := proto.Marshal(n)
|
||||
if err != nil {
|
||||
lh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply")
|
||||
return
|
||||
}
|
||||
lh.metricTx(NebulaMeta_HostQueryReply, 1)
|
||||
f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, lhh.nb, lhh.out[:0])
|
||||
|
||||
// This signals the other side to punch some zero byte udp packets
|
||||
ips, err = lh.Query(vpnIp, f)
|
||||
if err != nil {
|
||||
lh.l.WithField("vpnIp", IntIp(vpnIp)).Debugln("Can't notify host to punch")
|
||||
return
|
||||
} else {
|
||||
//l.Debugln("Notify host to punch", iap)
|
||||
n = lhh.resetMeta()
|
||||
n.Type = NebulaMeta_HostPunchNotification
|
||||
n.Details.VpnIp = vpnIp
|
||||
|
||||
v4s := make([]*IpAndPort, 0)
|
||||
v6s := make([]*Ip6AndPort, 0)
|
||||
for _, v := range lhh.setIpAndPortsFromNetIps(ips) {
|
||||
if len(v.v6.Ip) > 0 {
|
||||
v6s = append(v6s, &v.v6)
|
||||
} else {
|
||||
v4s = append(v4s, &v.v4)
|
||||
}
|
||||
}
|
||||
|
||||
if len(v4s) > 0 {
|
||||
n.Details.IpAndPorts = v4s
|
||||
}
|
||||
|
||||
if len(v6s) > 0 {
|
||||
n.Details.Ip6AndPorts = v6s
|
||||
}
|
||||
|
||||
reply, _ := proto.Marshal(n)
|
||||
lh.metricTx(NebulaMeta_HostPunchNotification, 1)
|
||||
f.SendMessageToVpnIp(lightHouse, 0, reqVpnIP, reply, lhh.nb, lhh.out[:0])
|
||||
}
|
||||
//fmt.Println(reply, remoteaddr)
|
||||
}
|
||||
lhh.handleHostQuery(n, vpnIp, rAddr, w)
|
||||
|
||||
case NebulaMeta_HostQueryReply:
|
||||
if !lh.IsLighthouseIP(vpnIp) {
|
||||
return
|
||||
}
|
||||
|
||||
for _, a := range n.Details.IpAndPorts {
|
||||
ans := NewUDPAddrFromLH4(a)
|
||||
if ans != nil {
|
||||
lh.AddRemote(n.Details.VpnIp, ans, false)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range n.Details.Ip6AndPorts {
|
||||
ans := NewUDPAddrFromLH6(a)
|
||||
if ans != nil {
|
||||
lh.AddRemote(n.Details.VpnIp, ans, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Non-blocking attempt to trigger, skip if it would block
|
||||
select {
|
||||
case lh.handshakeTrigger <- n.Details.VpnIp:
|
||||
default:
|
||||
}
|
||||
lhh.handleHostQueryReply(n, vpnIp)
|
||||
|
||||
case NebulaMeta_HostUpdateNotification:
|
||||
//Simple check that the host sent this not someone else
|
||||
if n.Details.VpnIp != vpnIp {
|
||||
lh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("answer", IntIp(n.Details.VpnIp)).Debugln("Host sent invalid update")
|
||||
return
|
||||
}
|
||||
|
||||
for _, a := range n.Details.IpAndPorts {
|
||||
ans := NewUDPAddrFromLH4(a)
|
||||
if ans != nil {
|
||||
lh.AddRemote(n.Details.VpnIp, ans, false)
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range n.Details.Ip6AndPorts {
|
||||
ans := NewUDPAddrFromLH6(a)
|
||||
if ans != nil {
|
||||
lh.AddRemote(n.Details.VpnIp, ans, false)
|
||||
}
|
||||
}
|
||||
lhh.handleHostUpdateNotification(n, vpnIp)
|
||||
|
||||
case NebulaMeta_HostMovedNotification:
|
||||
case NebulaMeta_HostPunchNotification:
|
||||
if !lh.IsLighthouseIP(vpnIp) {
|
||||
lhh.handleHostPunchNotification(n, vpnIp, w)
|
||||
}
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) handleHostQuery(n *NebulaMeta, vpnIp uint32, addr *udpAddr, w EncWriter) {
|
||||
// Exit if we don't answer queries
|
||||
if !lhh.lh.amLighthouse {
|
||||
if lhh.l.Level >= logrus.DebugLevel {
|
||||
lhh.l.Debugln("I don't answer queries, but received from: ", addr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//TODO: we can DRY this further
|
||||
reqVpnIP := n.Details.VpnIp
|
||||
//TODO: Maybe instead of marshalling into n we marshal into a new `r` to not nuke our current request data
|
||||
//TODO: If we use a lock on cache we can avoid holding it on lh.addrMap and keep things moving better
|
||||
found, ln, err := lhh.lh.queryAndPrepMessage(n.Details.VpnIp, func(cache *ip4And6) (int, error) {
|
||||
n = lhh.resetMeta()
|
||||
n.Type = NebulaMeta_HostQueryReply
|
||||
n.Details.VpnIp = reqVpnIP
|
||||
|
||||
lhh.coalesceAnswers(cache, n)
|
||||
|
||||
return n.MarshalTo(lhh.pb)
|
||||
})
|
||||
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
lhh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply")
|
||||
return
|
||||
}
|
||||
|
||||
lhh.lh.metricTx(NebulaMeta_HostQueryReply, 1)
|
||||
w.SendMessageToVpnIp(lightHouse, 0, vpnIp, lhh.pb[:ln], lhh.nb, lhh.out[:0])
|
||||
|
||||
// This signals the other side to punch some zero byte udp packets
|
||||
found, ln, err = lhh.lh.queryAndPrepMessage(vpnIp, func(cache *ip4And6) (int, error) {
|
||||
n = lhh.resetMeta()
|
||||
n.Type = NebulaMeta_HostPunchNotification
|
||||
n.Details.VpnIp = vpnIp
|
||||
|
||||
lhh.coalesceAnswers(cache, n)
|
||||
|
||||
return n.MarshalTo(lhh.pb)
|
||||
})
|
||||
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
lhh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host was queried for")
|
||||
return
|
||||
}
|
||||
|
||||
lhh.lh.metricTx(NebulaMeta_HostPunchNotification, 1)
|
||||
w.SendMessageToVpnIp(lightHouse, 0, reqVpnIP, lhh.pb[:ln], lhh.nb, lhh.out[:0])
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) coalesceAnswers(cache *ip4And6, n *NebulaMeta) {
|
||||
n.Details.Ip4AndPorts = append(n.Details.Ip4AndPorts, cache.v4...)
|
||||
n.Details.Ip4AndPorts = append(n.Details.Ip4AndPorts, cache.learnedV4...)
|
||||
|
||||
n.Details.Ip6AndPorts = append(n.Details.Ip6AndPorts, cache.v6...)
|
||||
n.Details.Ip6AndPorts = append(n.Details.Ip6AndPorts, cache.learnedV6...)
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) handleHostQueryReply(n *NebulaMeta, vpnIp uint32) {
|
||||
if !lhh.lh.IsLighthouseIP(vpnIp) {
|
||||
return
|
||||
}
|
||||
|
||||
// We can't just slam the responses in as they may come from multiple lighthouses and we should coalesce the answers
|
||||
for _, to := range n.Details.Ip4AndPorts {
|
||||
lhh.lh.addRemoteV4(n.Details.VpnIp, to, false, false)
|
||||
}
|
||||
|
||||
for _, to := range n.Details.Ip6AndPorts {
|
||||
lhh.lh.addRemoteV6(n.Details.VpnIp, to, false, false)
|
||||
}
|
||||
|
||||
// Non-blocking attempt to trigger, skip if it would block
|
||||
select {
|
||||
case lhh.lh.handshakeTrigger <- n.Details.VpnIp:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) handleHostUpdateNotification(n *NebulaMeta, vpnIp uint32) {
|
||||
if !lhh.lh.amLighthouse {
|
||||
if lhh.l.Level >= logrus.DebugLevel {
|
||||
lhh.l.Debugln("I am not a lighthouse, do not take host updates: ", vpnIp)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//Simple check that the host sent this not someone else
|
||||
if n.Details.VpnIp != vpnIp {
|
||||
if lhh.l.Level >= logrus.DebugLevel {
|
||||
lhh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("answer", IntIp(n.Details.VpnIp)).Debugln("Host sent invalid update")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
lhh.lh.Lock()
|
||||
defer lhh.lh.Unlock()
|
||||
am := lhh.lh.unsafeGetAddrs(vpnIp)
|
||||
|
||||
//TODO: other note on a lock for am so we can release more quickly and lock our real unit of change which is far less contended
|
||||
//TODO: we are not filtering by local or remote allowed addrs here, is this an ok change to make?
|
||||
|
||||
// We don't accumulate addresses being told to us
|
||||
am.v4 = am.v4[:0]
|
||||
am.v6 = am.v6[:0]
|
||||
|
||||
for _, v := range n.Details.Ip4AndPorts {
|
||||
if lhh.lh.unlockedShouldAddV4(am.v4, v) {
|
||||
am.v4 = append(am.v4, v)
|
||||
}
|
||||
}
|
||||
|
||||
for _, v := range n.Details.Ip6AndPorts {
|
||||
if lhh.lh.unlockedShouldAddV6(am.v6, v) {
|
||||
am.v6 = append(am.v6, v)
|
||||
}
|
||||
}
|
||||
|
||||
// We prefer the first n addresses if we got too big
|
||||
if len(am.v4) > MaxRemotes {
|
||||
am.v4 = am.v4[:MaxRemotes]
|
||||
}
|
||||
|
||||
if len(am.v6) > MaxRemotes {
|
||||
am.v6 = am.v6[:MaxRemotes]
|
||||
}
|
||||
}
|
||||
|
||||
func (lhh *LightHouseHandler) handleHostPunchNotification(n *NebulaMeta, vpnIp uint32, w EncWriter) {
|
||||
if !lhh.lh.IsLighthouseIP(vpnIp) {
|
||||
return
|
||||
}
|
||||
|
||||
empty := []byte{0}
|
||||
punch := func(vpnPeer *udpAddr) {
|
||||
if vpnPeer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
empty := []byte{0}
|
||||
for _, a := range n.Details.IpAndPorts {
|
||||
vpnPeer := NewUDPAddrFromLH4(a)
|
||||
if vpnPeer == nil {
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
time.Sleep(lhh.lh.punchDelay)
|
||||
lhh.lh.metricHolepunchTx.Inc(1)
|
||||
lhh.lh.punchConn.WriteTo(empty, vpnPeer)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
time.Sleep(lh.punchDelay)
|
||||
lh.metricHolepunchTx.Inc(1)
|
||||
lh.punchConn.WriteTo(empty, vpnPeer)
|
||||
|
||||
}()
|
||||
|
||||
if lh.l.Level >= logrus.DebugLevel {
|
||||
//TODO: lacking the ip we are actually punching on, old: l.Debugf("Punching %s on %d for %s", IntIp(a.Ip), a.Port, IntIp(n.Details.VpnIp))
|
||||
lh.l.Debugf("Punching on %d for %s", a.Port, IntIp(n.Details.VpnIp))
|
||||
}
|
||||
if lhh.l.Level >= logrus.DebugLevel {
|
||||
//TODO: lacking the ip we are actually punching on, old: l.Debugf("Punching %s on %d for %s", IntIp(a.Ip), a.Port, IntIp(n.Details.VpnIp))
|
||||
lhh.l.Debugf("Punching on %d for %s", vpnPeer.Port, IntIp(n.Details.VpnIp))
|
||||
}
|
||||
}
|
||||
|
||||
for _, a := range n.Details.Ip6AndPorts {
|
||||
vpnPeer := NewUDPAddrFromLH6(a)
|
||||
if vpnPeer == nil {
|
||||
continue
|
||||
for _, a := range n.Details.Ip4AndPorts {
|
||||
punch(NewUDPAddrFromLH4(a))
|
||||
}
|
||||
|
||||
for _, a := range n.Details.Ip6AndPorts {
|
||||
punch(NewUDPAddrFromLH6(a))
|
||||
}
|
||||
|
||||
// This sends a nebula test packet to the host trying to contact us. In the case
|
||||
// of a double nat or other difficult scenario, this may help establish
|
||||
// a tunnel.
|
||||
if lhh.lh.punchBack {
|
||||
go func() {
|
||||
time.Sleep(time.Second * 5)
|
||||
if lhh.l.Level >= logrus.DebugLevel {
|
||||
lhh.l.Debugf("Sending a nebula test packet to vpn ip %s", IntIp(n.Details.VpnIp))
|
||||
}
|
||||
|
||||
go func() {
|
||||
time.Sleep(lh.punchDelay)
|
||||
lh.metricHolepunchTx.Inc(1)
|
||||
lh.punchConn.WriteTo(empty, vpnPeer)
|
||||
|
||||
}()
|
||||
|
||||
if lh.l.Level >= logrus.DebugLevel {
|
||||
//TODO: lacking the ip we are actually punching on, old: l.Debugf("Punching %s on %d for %s", IntIp(a.Ip), a.Port, IntIp(n.Details.VpnIp))
|
||||
lh.l.Debugf("Punching on %d for %s", a.Port, IntIp(n.Details.VpnIp))
|
||||
}
|
||||
}
|
||||
|
||||
// This sends a nebula test packet to the host trying to contact us. In the case
|
||||
// of a double nat or other difficult scenario, this may help establish
|
||||
// a tunnel.
|
||||
if lh.punchBack {
|
||||
go func() {
|
||||
time.Sleep(time.Second * 5)
|
||||
lh.l.Debugf("Sending a nebula test packet to vpn ip %s", IntIp(n.Details.VpnIp))
|
||||
// TODO we have to allocate a new output buffer here since we are spawning a new goroutine
|
||||
// for each punchBack packet. We should move this into a timerwheel or a single goroutine
|
||||
// managed by a channel.
|
||||
f.SendMessageToVpnIp(test, testRequest, n.Details.VpnIp, []byte(""), make([]byte, 12, 12), make([]byte, mtu))
|
||||
}()
|
||||
}
|
||||
//NOTE: we have to allocate a new output buffer here since we are spawning a new goroutine
|
||||
// for each punchBack packet. We should move this into a timerwheel or a single goroutine
|
||||
// managed by a channel.
|
||||
w.SendMessageToVpnIp(test, testRequest, n.Details.VpnIp, []byte(""), make([]byte, 12, 12), make([]byte, mtu))
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) {
|
||||
lh.metrics.Rx(NebulaMessageType(t), 0, i)
|
||||
}
|
||||
func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) {
|
||||
lh.metrics.Tx(NebulaMessageType(t), 0, i)
|
||||
}
|
||||
func TransformLHReplyToUdpAddrs(ips *ip4And6) []*udpAddr {
|
||||
addrs := make([]*udpAddr, len(ips.v4)+len(ips.v6)+len(ips.learnedV4)+len(ips.learnedV6))
|
||||
i := 0
|
||||
|
||||
/*
|
||||
func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) {
|
||||
c := ci.messageCounter
|
||||
b := HeaderEncode(nil, Version, uint8(path_check), 0, ci.remoteIndex, c)
|
||||
ci.messageCounter++
|
||||
|
||||
if ci.eKey != nil {
|
||||
msg := ci.eKey.EncryptDanger(b, nil, []byte(strconv.Itoa(counter)), c)
|
||||
//msg := ci.eKey.EncryptDanger(b, nil, []byte(fmt.Sprintf("%d", counter)), c)
|
||||
f.outside.WriteTo(msg, endpoint)
|
||||
l.Debugf("path_check sent, remote index: %d, pathCounter %d", ci.remoteIndex, counter)
|
||||
for _, v := range ips.learnedV4 {
|
||||
addrs[i] = NewUDPAddrFromLH4(v)
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Interface) sendPathCheckReply(ci *ConnectionState, endpoint *net.UDPAddr, counter []byte) {
|
||||
c := ci.messageCounter
|
||||
b := HeaderEncode(nil, Version, uint8(path_check_reply), 0, ci.remoteIndex, c)
|
||||
ci.messageCounter++
|
||||
|
||||
if ci.eKey != nil {
|
||||
msg := ci.eKey.EncryptDanger(b, nil, counter, c)
|
||||
f.outside.WriteTo(msg, endpoint)
|
||||
l.Debugln("path_check sent, remote index: ", ci.remoteIndex)
|
||||
for _, v := range ips.v4 {
|
||||
addrs[i] = NewUDPAddrFromLH4(v)
|
||||
i++
|
||||
}
|
||||
|
||||
for _, v := range ips.learnedV6 {
|
||||
addrs[i] = NewUDPAddrFromLH6(v)
|
||||
i++
|
||||
}
|
||||
|
||||
for _, v := range ips.v6 {
|
||||
addrs[i] = NewUDPAddrFromLH6(v)
|
||||
i++
|
||||
}
|
||||
|
||||
return addrs
|
||||
}
|
||||
*/
|
||||
|
Reference in New Issue
Block a user