Refactor remotes and handshaking to give every address a fair shot (#437)
This commit is contained in:
@ -12,12 +12,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Total time to try a handshake = sequence of HandshakeTryInterval * HandshakeRetries
|
||||
// With 100ms interval and 20 retries is 23.5 seconds
|
||||
DefaultHandshakeTryInterval = time.Millisecond * 100
|
||||
DefaultHandshakeRetries = 20
|
||||
// DefaultHandshakeWaitRotation is the number of handshake attempts to do before starting to use other ips addresses
|
||||
DefaultHandshakeWaitRotation = 5
|
||||
DefaultHandshakeTryInterval = time.Millisecond * 100
|
||||
DefaultHandshakeRetries = 10
|
||||
DefaultHandshakeTriggerBuffer = 64
|
||||
)
|
||||
|
||||
@ -25,7 +21,6 @@ var (
|
||||
defaultHandshakeConfig = HandshakeConfig{
|
||||
tryInterval: DefaultHandshakeTryInterval,
|
||||
retries: DefaultHandshakeRetries,
|
||||
waitRotation: DefaultHandshakeWaitRotation,
|
||||
triggerBuffer: DefaultHandshakeTriggerBuffer,
|
||||
}
|
||||
)
|
||||
@ -33,45 +28,36 @@ var (
|
||||
type HandshakeConfig struct {
|
||||
tryInterval time.Duration
|
||||
retries int
|
||||
waitRotation int
|
||||
triggerBuffer int
|
||||
|
||||
messageMetrics *MessageMetrics
|
||||
}
|
||||
|
||||
type HandshakeManager struct {
|
||||
pendingHostMap *HostMap
|
||||
mainHostMap *HostMap
|
||||
lightHouse *LightHouse
|
||||
outside *udpConn
|
||||
config HandshakeConfig
|
||||
pendingHostMap *HostMap
|
||||
mainHostMap *HostMap
|
||||
lightHouse *LightHouse
|
||||
outside *udpConn
|
||||
config HandshakeConfig
|
||||
OutboundHandshakeTimer *SystemTimerWheel
|
||||
messageMetrics *MessageMetrics
|
||||
l *logrus.Logger
|
||||
|
||||
// can be used to trigger outbound handshake for the given vpnIP
|
||||
trigger chan uint32
|
||||
|
||||
OutboundHandshakeTimer *SystemTimerWheel
|
||||
InboundHandshakeTimer *SystemTimerWheel
|
||||
|
||||
messageMetrics *MessageMetrics
|
||||
l *logrus.Logger
|
||||
}
|
||||
|
||||
func NewHandshakeManager(l *logrus.Logger, tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainHostMap *HostMap, lightHouse *LightHouse, outside *udpConn, config HandshakeConfig) *HandshakeManager {
|
||||
return &HandshakeManager{
|
||||
pendingHostMap: NewHostMap(l, "pending", tunCidr, preferredRanges),
|
||||
mainHostMap: mainHostMap,
|
||||
lightHouse: lightHouse,
|
||||
outside: outside,
|
||||
|
||||
config: config,
|
||||
|
||||
trigger: make(chan uint32, config.triggerBuffer),
|
||||
|
||||
OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
|
||||
InboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
|
||||
|
||||
messageMetrics: config.messageMetrics,
|
||||
l: l,
|
||||
pendingHostMap: NewHostMap(l, "pending", tunCidr, preferredRanges),
|
||||
mainHostMap: mainHostMap,
|
||||
lightHouse: lightHouse,
|
||||
outside: outside,
|
||||
config: config,
|
||||
trigger: make(chan uint32, config.triggerBuffer),
|
||||
OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, hsTimeout(config.retries, config.tryInterval)),
|
||||
messageMetrics: config.messageMetrics,
|
||||
l: l,
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,7 +70,6 @@ func (c *HandshakeManager) Run(f EncWriter) {
|
||||
c.handleOutbound(vpnIP, f, true)
|
||||
case now := <-clockSource:
|
||||
c.NextOutboundHandshakeTimerTick(now, f)
|
||||
c.NextInboundHandshakeTimerTick(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -109,84 +94,84 @@ func (c *HandshakeManager) handleOutbound(vpnIP uint32, f EncWriter, lighthouseT
|
||||
hostinfo.Lock()
|
||||
defer hostinfo.Unlock()
|
||||
|
||||
// If we haven't finished the handshake and we haven't hit max retries, query
|
||||
// lighthouse and then send the handshake packet again.
|
||||
if hostinfo.HandshakeCounter < c.config.retries && !hostinfo.HandshakeComplete {
|
||||
if hostinfo.remote == nil {
|
||||
// We continue to query the lighthouse because hosts may
|
||||
// come online during handshake retries. If the query
|
||||
// succeeds (no error), add the lighthouse info to hostinfo
|
||||
ips := c.lightHouse.QueryCache(vpnIP)
|
||||
// If we have no responses yet, or only one IP (the host hadn't
|
||||
// finished reporting its own IPs yet), then send another query to
|
||||
// the LH.
|
||||
if len(ips) <= 1 {
|
||||
ips, err = c.lightHouse.Query(vpnIP, f)
|
||||
}
|
||||
if err == nil {
|
||||
for _, ip := range ips {
|
||||
hostinfo.AddRemote(ip)
|
||||
}
|
||||
hostinfo.ForcePromoteBest(c.mainHostMap.preferredRanges)
|
||||
}
|
||||
} else if lighthouseTriggered {
|
||||
// We were triggered by a lighthouse HostQueryReply packet, but
|
||||
// we have already picked a remote for this host (this can happen
|
||||
// if we are configured with multiple lighthouses). So we can skip
|
||||
// this trigger and let the timerwheel handle the rest of the
|
||||
// process
|
||||
return
|
||||
}
|
||||
|
||||
hostinfo.HandshakeCounter++
|
||||
|
||||
// We want to use the "best" calculated ip for the first 5 attempts, after that we just blindly rotate through
|
||||
// all the others until we can stand up a connection.
|
||||
if hostinfo.HandshakeCounter > c.config.waitRotation {
|
||||
hostinfo.rotateRemote()
|
||||
}
|
||||
|
||||
// Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation
|
||||
if hostinfo.HandshakeReady && hostinfo.remote != nil {
|
||||
c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1)
|
||||
err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote)
|
||||
if err != nil {
|
||||
hostinfo.logger(c.l).WithField("udpAddr", hostinfo.remote).
|
||||
WithField("initiatorIndex", hostinfo.localIndexId).
|
||||
WithField("remoteIndex", hostinfo.remoteIndexId).
|
||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||
WithError(err).Error("Failed to send handshake message")
|
||||
} else {
|
||||
//TODO: this log line is assuming a lot of stuff around the cached stage 0 handshake packet, we should
|
||||
// keep the real packet struct around for logging purposes
|
||||
hostinfo.logger(c.l).WithField("udpAddr", hostinfo.remote).
|
||||
WithField("initiatorIndex", hostinfo.localIndexId).
|
||||
WithField("remoteIndex", hostinfo.remoteIndexId).
|
||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||
Info("Handshake message sent")
|
||||
}
|
||||
}
|
||||
|
||||
// Readd to the timer wheel so we continue trying wait HandshakeTryInterval * counter longer for next try
|
||||
if !lighthouseTriggered {
|
||||
//l.Infoln("Interval: ", HandshakeTryInterval*time.Duration(hostinfo.HandshakeCounter))
|
||||
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval*time.Duration(hostinfo.HandshakeCounter))
|
||||
}
|
||||
} else {
|
||||
// We may have raced to completion but now that we have a lock we should ensure we have not yet completed.
|
||||
if hostinfo.HandshakeComplete {
|
||||
// Ensure we don't exist in the pending hostmap anymore since we have completed
|
||||
c.pendingHostMap.DeleteHostInfo(hostinfo)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *HandshakeManager) NextInboundHandshakeTimerTick(now time.Time) {
|
||||
c.InboundHandshakeTimer.advance(now)
|
||||
for {
|
||||
ep := c.InboundHandshakeTimer.Purge()
|
||||
if ep == nil {
|
||||
break
|
||||
// Check if we have a handshake packet to transmit yet
|
||||
if !hostinfo.HandshakeReady {
|
||||
// There is currently a slight race in getOrHandshake due to ConnectionState not being part of the HostInfo directly
|
||||
// Our hostinfo here was added to the pending map and the wheel may have ticked to us before we created ConnectionState
|
||||
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval*time.Duration(hostinfo.HandshakeCounter))
|
||||
return
|
||||
}
|
||||
|
||||
// If we are out of time, clean up
|
||||
if hostinfo.HandshakeCounter >= c.config.retries {
|
||||
hostinfo.logger(c.l).WithField("udpAddrs", hostinfo.remotes.CopyAddrs(c.pendingHostMap.preferredRanges)).
|
||||
WithField("initiatorIndex", hostinfo.localIndexId).
|
||||
WithField("remoteIndex", hostinfo.remoteIndexId).
|
||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||
WithField("durationNs", time.Since(hostinfo.handshakeStart).Nanoseconds()).
|
||||
Info("Handshake timed out")
|
||||
//TODO: emit metrics
|
||||
c.pendingHostMap.DeleteHostInfo(hostinfo)
|
||||
return
|
||||
}
|
||||
|
||||
// We only care about a lighthouse trigger before the first handshake transmit attempt. This is a very specific
|
||||
// optimization for a fast lighthouse reply
|
||||
//TODO: it would feel better to do this once, anytime, as our delay increases over time
|
||||
if lighthouseTriggered && hostinfo.HandshakeCounter > 0 {
|
||||
// If we didn't return here a lighthouse could cause us to aggressively send handshakes
|
||||
return
|
||||
}
|
||||
|
||||
// Get a remotes object if we don't already have one.
|
||||
// This is mainly to protect us as this should never be the case
|
||||
if hostinfo.remotes == nil {
|
||||
hostinfo.remotes = c.lightHouse.QueryCache(vpnIP)
|
||||
}
|
||||
|
||||
//TODO: this will generate a load of queries for hosts with only 1 ip (i'm not using a lighthouse, static mapped)
|
||||
if hostinfo.remotes.Len(c.pendingHostMap.preferredRanges) <= 1 {
|
||||
// If we only have 1 remote it is highly likely our query raced with the other host registered within the lighthouse
|
||||
// Our vpnIP here has a tunnel with a lighthouse but has yet to send a host update packet there so we only know about
|
||||
// the learned public ip for them. Query again to short circuit the promotion counter
|
||||
c.lightHouse.QueryServer(vpnIP, f)
|
||||
}
|
||||
|
||||
// Send a the handshake to all known ips, stage 2 takes care of assigning the hostinfo.remote based on the first to reply
|
||||
var sentTo []*udpAddr
|
||||
hostinfo.remotes.ForEach(c.pendingHostMap.preferredRanges, func(addr *udpAddr, _ bool) {
|
||||
c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1)
|
||||
err = c.outside.WriteTo(hostinfo.HandshakePacket[0], addr)
|
||||
if err != nil {
|
||||
hostinfo.logger(c.l).WithField("udpAddr", addr).
|
||||
WithField("initiatorIndex", hostinfo.localIndexId).
|
||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||
WithError(err).Error("Failed to send handshake message")
|
||||
|
||||
} else {
|
||||
sentTo = append(sentTo, addr)
|
||||
}
|
||||
index := ep.(uint32)
|
||||
})
|
||||
|
||||
c.pendingHostMap.DeleteIndex(index)
|
||||
hostinfo.logger(c.l).WithField("udpAddrs", sentTo).
|
||||
WithField("initiatorIndex", hostinfo.localIndexId).
|
||||
WithField("handshake", m{"stage": 1, "style": "ix_psk0"}).
|
||||
Info("Handshake message sent")
|
||||
|
||||
// Increment the counter to increase our delay, linear backoff
|
||||
hostinfo.HandshakeCounter++
|
||||
|
||||
// If a lighthouse triggered this attempt then we are still in the timer wheel and do not need to re-add
|
||||
if !lighthouseTriggered {
|
||||
//TODO: feel like we dupe handshake real fast in a tight loop, why?
|
||||
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval*time.Duration(hostinfo.HandshakeCounter))
|
||||
}
|
||||
}
|
||||
|
||||
@ -194,6 +179,7 @@ func (c *HandshakeManager) AddVpnIP(vpnIP uint32) *HostInfo {
|
||||
hostinfo := c.pendingHostMap.AddVpnIP(vpnIP)
|
||||
// We lock here and use an array to insert items to prevent locking the
|
||||
// main receive thread for very long by waiting to add items to the pending map
|
||||
//TODO: what lock?
|
||||
c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval)
|
||||
|
||||
return hostinfo
|
||||
@ -203,6 +189,7 @@ var (
|
||||
ErrExistingHostInfo = errors.New("existing hostinfo")
|
||||
ErrAlreadySeen = errors.New("already seen")
|
||||
ErrLocalIndexCollision = errors.New("local index collision")
|
||||
ErrExistingHandshake = errors.New("existing handshake")
|
||||
)
|
||||
|
||||
// CheckAndComplete checks for any conflicts in the main and pending hostmap
|
||||
@ -217,17 +204,21 @@ var (
|
||||
// ErrLocalIndexCollision if we already have an entry in the main or pending
|
||||
// hostmap for the hostinfo.localIndexId.
|
||||
func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket uint8, overwrite bool, f *Interface) (*HostInfo, error) {
|
||||
c.pendingHostMap.RLock()
|
||||
defer c.pendingHostMap.RUnlock()
|
||||
c.pendingHostMap.Lock()
|
||||
defer c.pendingHostMap.Unlock()
|
||||
c.mainHostMap.Lock()
|
||||
defer c.mainHostMap.Unlock()
|
||||
|
||||
// Check if we already have a tunnel with this vpn ip
|
||||
existingHostInfo, found := c.mainHostMap.Hosts[hostinfo.hostId]
|
||||
if found && existingHostInfo != nil {
|
||||
// Is it just a delayed handshake packet?
|
||||
if bytes.Equal(hostinfo.HandshakePacket[handshakePacket], existingHostInfo.HandshakePacket[handshakePacket]) {
|
||||
return existingHostInfo, ErrAlreadySeen
|
||||
}
|
||||
|
||||
if !overwrite {
|
||||
// It's a new handshake and we lost the race
|
||||
return existingHostInfo, ErrExistingHostInfo
|
||||
}
|
||||
}
|
||||
@ -237,6 +228,7 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
|
||||
// We have a collision, but for a different hostinfo
|
||||
return existingIndex, ErrLocalIndexCollision
|
||||
}
|
||||
|
||||
existingIndex, found = c.pendingHostMap.Indexes[hostinfo.localIndexId]
|
||||
if found && existingIndex != hostinfo {
|
||||
// We have a collision, but for a different hostinfo
|
||||
@ -252,7 +244,24 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
|
||||
Info("New host shadows existing host remoteIndex")
|
||||
}
|
||||
|
||||
// Check if we are also handshaking with this vpn ip
|
||||
pendingHostInfo, found := c.pendingHostMap.Hosts[hostinfo.hostId]
|
||||
if found && pendingHostInfo != nil {
|
||||
if !overwrite {
|
||||
// We won, let our pending handshake win
|
||||
return pendingHostInfo, ErrExistingHandshake
|
||||
}
|
||||
|
||||
// We lost, take this handshake and move any cached packets over so they get sent
|
||||
pendingHostInfo.ConnectionState.queueLock.Lock()
|
||||
hostinfo.packetStore = append(hostinfo.packetStore, pendingHostInfo.packetStore...)
|
||||
c.pendingHostMap.unlockedDeleteHostInfo(pendingHostInfo)
|
||||
pendingHostInfo.ConnectionState.queueLock.Unlock()
|
||||
pendingHostInfo.logger(c.l).Info("Handshake race lost, replacing pending handshake with completed tunnel")
|
||||
}
|
||||
|
||||
if existingHostInfo != nil {
|
||||
hostinfo.logger(c.l).Info("Race lost, taking new handshake")
|
||||
// We are going to overwrite this entry, so remove the old references
|
||||
delete(c.mainHostMap.Hosts, existingHostInfo.hostId)
|
||||
delete(c.mainHostMap.Indexes, existingHostInfo.localIndexId)
|
||||
@ -267,6 +276,8 @@ func (c *HandshakeManager) CheckAndComplete(hostinfo *HostInfo, handshakePacket
|
||||
// won't have a localIndexId collision because we already have an entry in the
|
||||
// pendingHostMap
|
||||
func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
|
||||
c.pendingHostMap.Lock()
|
||||
defer c.pendingHostMap.Unlock()
|
||||
c.mainHostMap.Lock()
|
||||
defer c.mainHostMap.Unlock()
|
||||
|
||||
@ -288,6 +299,7 @@ func (c *HandshakeManager) Complete(hostinfo *HostInfo, f *Interface) {
|
||||
}
|
||||
|
||||
c.mainHostMap.addHostInfo(hostinfo, f)
|
||||
c.pendingHostMap.unlockedDeleteHostInfo(hostinfo)
|
||||
}
|
||||
|
||||
// AddIndexHostInfo generates a unique localIndexId for this HostInfo
|
||||
@ -359,3 +371,7 @@ func generateIndex(l *logrus.Logger) (uint32, error) {
|
||||
}
|
||||
return index, nil
|
||||
}
|
||||
|
||||
func hsTimeout(tries int, interval time.Duration) time.Duration {
|
||||
return time.Duration(tries / 2 * ((2 * int(interval)) + (tries-1)*int(interval)))
|
||||
}
|
||||
|
Reference in New Issue
Block a user