EtherGuard-VPN/device/device.go

763 lines
19 KiB
Go
Raw Normal View History

2019-01-02 01:55:51 +01:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved.
*/
2019-03-03 04:04:41 +01:00
package device
import (
2021-09-21 03:15:23 +02:00
"bytes"
2021-08-20 19:32:50 +02:00
"encoding/base64"
2021-08-24 10:43:55 +02:00
"errors"
2021-12-02 18:13:48 +01:00
"fmt"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
2019-05-14 09:09:52 +02:00
2021-12-02 18:13:48 +01:00
"github.com/KusakabeSi/EtherGuard-VPN/conn"
"github.com/KusakabeSi/EtherGuard-VPN/mtypes"
"github.com/KusakabeSi/EtherGuard-VPN/path"
"github.com/KusakabeSi/EtherGuard-VPN/ratelimiter"
"github.com/KusakabeSi/EtherGuard-VPN/rwcancel"
"github.com/KusakabeSi/EtherGuard-VPN/tap"
2021-08-20 19:32:50 +02:00
fixed_time_cache "github.com/KusakabeSi/go-cache"
)
type Device struct {
state struct {
// state holds the device's state. It is accessed atomically.
// Use the device.deviceState method to read it.
// device.deviceState does not acquire the mutex, so it captures only a snapshot.
// During state transitions, the state variable is updated before the device itself.
// The state is thus either the current state of the device or
// the intended future state of the device.
// For example, while executing a call to Up, state will be deviceStateUp.
// There is no guarantee that that intended future state of the device
// will become the actual state; Up can fail.
// The device can also change state multiple times between time of check and time of use.
// Unsynchronized uses of state must therefore be advisory/best-effort only.
state uint32 // actually a deviceState, but typed uint32 for convenience
// stopping blocks until all inputs to Device have been closed.
2018-05-05 06:00:38 +02:00
stopping sync.WaitGroup
// mu protects state changes.
sync.Mutex
}
net struct {
stopping sync.WaitGroup
sync.RWMutex
bind conn.Bind // bind interface
netlinkCancel *rwcancel.RWCancel
port uint16 // listening port
fwmark uint32 // mark value (0 = disabled)
}
2018-05-13 23:14:43 +02:00
staticIdentity struct {
sync.RWMutex
privateKey NoisePrivateKey
publicKey NoisePublicKey
}
rate struct {
underLoadUntil int64
limiter ratelimiter.Ratelimiter
}
peers struct {
sync.RWMutex // protects keyMap
keyMap map[NoisePublicKey]*Peer
2021-12-02 18:13:48 +01:00
IDMap map[mtypes.Vertex]*Peer
2021-08-20 19:32:50 +02:00
SuperPeer map[NoisePublicKey]*Peer
LocalV4 net.IP
LocalV6 net.IP
}
2021-12-04 03:32:59 +01:00
state_hashes mtypes.StateHash
2021-08-20 19:32:50 +02:00
event_tryendpoint chan struct{}
2021-08-24 10:43:55 +02:00
EdgeConfigPath string
2021-12-02 18:13:48 +01:00
EdgeConfig *mtypes.EdgeConfig
2021-08-24 10:43:55 +02:00
SuperConfigPath string
2021-12-02 18:13:48 +01:00
SuperConfig *mtypes.SuperConfig
2021-08-20 19:32:50 +02:00
2021-12-05 22:36:50 +01:00
Chan_server_register chan mtypes.RegisterMsg
Chan_server_pong chan mtypes.PongMsg
Chan_save_config chan struct{}
Chan_Supernode_OK chan struct{}
Chan_SendPingStart chan struct{}
Chan_SendRegisterStart chan struct{}
Chan_HttpPostStart chan struct{}
2021-08-25 10:13:53 +02:00
2018-05-13 23:14:43 +02:00
indexTable IndexTable
cookieChecker CookieChecker
2021-12-03 23:46:58 +01:00
IsSuperNode bool
ID mtypes.Vertex
graph *path.IG
l2fib sync.Map
LogLevel mtypes.LoggerInfo
DupData fixed_time_cache.Cache
Version string
2021-08-16 20:58:15 +02:00
2021-12-02 18:13:48 +01:00
HttpPostCount uint64
JWTSecret mtypes.JWTSecret
pool struct {
messageBuffers *WaitPool
inboundElements *WaitPool
outboundElements *WaitPool
}
queue struct {
encryption *outboundQueue
decryption *inboundQueue
handshake *handshakeQueue
}
2021-08-16 20:58:15 +02:00
tap struct {
device tap.Device
mtu int32
}
ipcMutex sync.RWMutex
2021-12-02 18:13:48 +01:00
closed chan int
log *Logger
2017-06-01 21:31:30 +02:00
}
2021-10-01 10:56:42 +02:00
type IdAndTime struct {
2021-12-02 18:13:48 +01:00
ID mtypes.Vertex
2021-10-01 10:56:42 +02:00
Time time.Time
}
// deviceState represents the state of a Device.
// There are three states: down, up, closed.
// Transitions:
//
// down -----+
// ↑↓ ↓
// up -> closed
//
type deviceState uint32
//go:generate go run golang.org/x/tools/cmd/stringer -type deviceState -trimprefix=deviceState
const (
deviceStateDown deviceState = iota
deviceStateUp
deviceStateClosed
)
// deviceState returns device.state.state as a deviceState
// See those docs for how to interpret this value.
func (device *Device) deviceState() deviceState {
return deviceState(atomic.LoadUint32(&device.state.state))
}
// isClosed reports whether the device is closed (or is closing).
// See device.state.state comments for how to interpret this value.
func (device *Device) isClosed() bool {
return device.deviceState() == deviceStateClosed
}
// isUp reports whether the device is up (or is attempting to come up).
// See device.state.state comments for how to interpret this value.
func (device *Device) isUp() bool {
return device.deviceState() == deviceStateUp
}
// Must hold device.peers.Lock()
func removePeerLocked(device *Device, peer *Peer, key NoisePublicKey) {
// stop routing and processing of packets
peer.Stop()
// remove from peer map
2021-08-20 19:32:50 +02:00
id := peer.ID
delete(device.peers.keyMap, key)
2021-12-09 08:46:15 +01:00
if id == mtypes.NodeID_SuperNode {
2021-08-20 19:32:50 +02:00
delete(device.peers.SuperPeer, key)
} else {
delete(device.peers.IDMap, id)
}
}
// changeState attempts to change the device state to match want.
func (device *Device) changeState(want deviceState) (err error) {
device.state.Lock()
defer device.state.Unlock()
old := device.deviceState()
if old == deviceStateClosed {
// once closed, always closed
device.log.Verbosef("Interface closed, ignored requested state %s", want)
return nil
}
switch want {
case old:
return nil
case deviceStateUp:
atomic.StoreUint32(&device.state.state, uint32(deviceStateUp))
err = device.upLocked()
if err == nil {
2018-02-04 16:46:24 +01:00
break
}
fallthrough // up failed; bring the device all the way back down
case deviceStateDown:
atomic.StoreUint32(&device.state.state, uint32(deviceStateDown))
errDown := device.downLocked()
if err == nil {
err = errDown
}
2018-02-04 16:46:24 +01:00
}
device.log.Verbosef("Interface state was %s, requested %s, now %s", old, want, device.deviceState())
return
}
// upLocked attempts to bring the device up and reports whether it succeeded.
// The caller must hold device.state.mu and is responsible for updating device.state.state.
func (device *Device) upLocked() error {
if err := device.BindUpdate(); err != nil {
device.log.Errorf("Unable to update bind: %v", err)
return err
}
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Start()
if atomic.LoadUint32(&peer.persistentKeepaliveInterval) > 0 {
peer.SendKeepalive()
}
}
device.peers.RUnlock()
return nil
2017-12-29 17:42:09 +01:00
}
// downLocked attempts to bring the device down.
// The caller must hold device.state.mu and is responsible for updating device.state.state.
func (device *Device) downLocked() error {
err := device.BindClose()
if err != nil {
device.log.Errorf("Bind close failed: %v", err)
}
2017-12-29 17:42:09 +01:00
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Stop()
}
device.peers.RUnlock()
return err
}
func (device *Device) Up() error {
return device.changeState(deviceStateUp)
}
func (device *Device) Down() error {
return device.changeState(deviceStateDown)
2017-12-29 17:42:09 +01:00
}
func (device *Device) IsUnderLoad() bool {
// check if currently under load
now := time.Now()
underLoad := len(device.queue.handshake.c) >= QueueHandshakeSize/8
if underLoad {
atomic.StoreInt64(&device.rate.underLoadUntil, now.Add(UnderLoadAfterTime).UnixNano())
return true
}
// check if recently under load
return atomic.LoadInt64(&device.rate.underLoadUntil) > now.UnixNano()
}
2017-08-04 16:15:53 +02:00
func (device *Device) SetPrivateKey(sk NoisePrivateKey) error {
// lock required resources
device.staticIdentity.Lock()
defer device.staticIdentity.Unlock()
if sk.Equals(device.staticIdentity.privateKey) {
return nil
}
device.peers.Lock()
defer device.peers.Unlock()
2019-08-05 17:46:34 +02:00
lockedPeers := make([]*Peer, 0, len(device.peers.keyMap))
for _, peer := range device.peers.keyMap {
peer.handshake.mutex.RLock()
2019-08-05 17:46:34 +02:00
lockedPeers = append(lockedPeers, peer)
}
2017-06-24 15:34:17 +02:00
// remove peers with matching public keys
2017-08-04 16:15:53 +02:00
2021-09-21 22:03:11 +02:00
publicKey := sk.PublicKey()
for key, peer := range device.peers.keyMap {
if peer.handshake.remoteStatic.Equals(publicKey) {
peer.handshake.mutex.RUnlock()
removePeerLocked(device, peer, key)
peer.handshake.mutex.RLock()
2017-08-04 16:15:53 +02:00
}
}
2017-06-24 15:34:17 +02:00
// update key material
2018-05-13 23:14:43 +02:00
device.staticIdentity.privateKey = sk
device.staticIdentity.publicKey = publicKey
device.cookieChecker.Init(publicKey)
2017-06-24 15:34:17 +02:00
// do static-static DH pre-computations
2019-08-05 17:46:34 +02:00
expiredPeers := make([]*Peer, 0, len(device.peers.keyMap))
2020-02-04 18:08:51 +01:00
for _, peer := range device.peers.keyMap {
2018-05-14 12:27:29 +02:00
handshake := &peer.handshake
2020-02-04 18:08:51 +01:00
handshake.precomputedStaticStatic = device.staticIdentity.privateKey.sharedSecret(handshake.remoteStatic)
expiredPeers = append(expiredPeers, peer)
2017-06-23 13:41:59 +02:00
}
2017-08-04 16:15:53 +02:00
2019-08-05 17:46:34 +02:00
for _, peer := range lockedPeers {
peer.handshake.mutex.RUnlock()
}
for _, peer := range expiredPeers {
peer.ExpireCurrentKeypairs()
}
2017-08-04 16:15:53 +02:00
return nil
2017-06-23 13:41:59 +02:00
}
2021-12-02 18:13:48 +01:00
func NewDevice(tapDevice tap.Device, id mtypes.Vertex, bind conn.Bind, logger *Logger, graph *path.IG, IsSuperNode bool, configpath string, econfig *mtypes.EdgeConfig, sconfig *mtypes.SuperConfig, superevents *mtypes.SUPER_Events, version string) *Device {
device := new(Device)
device.state.state = uint32(deviceStateDown)
2021-12-02 18:13:48 +01:00
device.closed = make(chan int)
2017-11-14 18:26:28 +01:00
device.log = logger
device.net.bind = bind
2021-08-16 20:58:15 +02:00
device.tap.device = tapDevice
mtu, err := device.tap.device.MTU()
2018-04-18 16:39:14 +02:00
if err != nil {
device.log.Errorf("Trouble determining MTU, assuming default: %v", err)
2018-04-19 15:52:59 +02:00
mtu = DefaultMTU
2018-04-18 16:39:14 +02:00
}
2021-08-16 20:58:15 +02:00
device.tap.mtu = int32(mtu)
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
2021-12-02 18:13:48 +01:00
device.peers.IDMap = make(map[mtypes.Vertex]*Peer)
2021-08-21 16:54:24 +02:00
device.peers.SuperPeer = make(map[NoisePublicKey]*Peer)
2021-08-20 19:32:50 +02:00
device.IsSuperNode = IsSuperNode
2021-08-16 20:58:15 +02:00
device.ID = id
2021-08-20 19:32:50 +02:00
device.graph = graph
2021-09-20 22:20:00 +02:00
device.Version = version
2021-12-02 18:13:48 +01:00
device.JWTSecret = mtypes.ByteSlice2Byte32(mtypes.RandomBytes(32, []byte(fmt.Sprintf("%v", time.Now()))))
2021-08-20 19:32:50 +02:00
2021-12-04 03:32:59 +01:00
device.state_hashes.NhTable.Store("")
device.state_hashes.Peer.Store("")
device.state_hashes.SuperParam.Store("")
device.rate.limiter.Init()
2018-05-13 18:23:40 +02:00
device.indexTable.Init()
2018-09-22 06:29:02 +02:00
device.PopulatePools()
2021-08-20 19:32:50 +02:00
if IsSuperNode {
2021-12-03 23:46:58 +01:00
device.SuperConfigPath = configpath
device.SuperConfig = sconfig
device.EdgeConfig = &mtypes.EdgeConfig{}
2021-12-11 20:33:01 +01:00
device.EdgeConfig.Interface.MTU = DefaultMTU
2021-12-04 03:32:59 +01:00
device.Chan_server_pong = superevents.Event_server_pong
device.Chan_server_register = superevents.Event_server_register
2021-08-25 10:13:53 +02:00
device.LogLevel = sconfig.LogLevel
2021-08-20 19:32:50 +02:00
} else {
2021-08-24 10:43:55 +02:00
device.EdgeConfigPath = configpath
2021-08-21 16:54:24 +02:00
device.EdgeConfig = econfig
2021-12-03 23:46:58 +01:00
device.SuperConfig = &mtypes.SuperConfig{}
device.DupData = *fixed_time_cache.NewCache(mtypes.S2TD(econfig.DynamicRoute.DupCheckTimeout), false, mtypes.S2TD(60))
2021-08-20 19:32:50 +02:00
device.event_tryendpoint = make(chan struct{}, 1<<6)
2021-12-04 03:32:59 +01:00
device.Chan_save_config = make(chan struct{}, 1<<5)
device.Chan_Supernode_OK = make(chan struct{}, 1<<5)
device.Chan_SendPingStart = make(chan struct{}, 1<<5)
2021-12-05 22:36:50 +01:00
device.Chan_SendRegisterStart = make(chan struct{}, 1<<5)
2021-12-04 03:32:59 +01:00
device.Chan_HttpPostStart = make(chan struct{}, 1<<5)
2021-08-25 10:13:53 +02:00
device.LogLevel = econfig.LogLevel
2021-12-09 23:39:37 +01:00
device.SuperConfig.DampingResistance = device.EdgeConfig.DynamicRoute.DampingResistance
2021-12-03 23:46:58 +01:00
2021-08-20 19:32:50 +02:00
go device.RoutineSetEndpoint()
2021-10-27 03:02:44 +02:00
go device.RoutineDetectOfflineAndTryNextEndpoint()
2021-12-05 22:36:50 +01:00
go device.RoutineRegister(device.Chan_SendRegisterStart)
2021-12-04 03:32:59 +01:00
go device.RoutineSendPing(device.Chan_SendPingStart)
2021-08-23 18:39:04 +02:00
go device.RoutineSpreadAllMyNeighbor()
2021-08-24 10:43:55 +02:00
go device.RoutineResetConn()
2021-10-01 10:56:42 +02:00
go device.RoutineClearL2FIB()
2021-10-27 03:02:44 +02:00
go device.RoutineRecalculateNhTable()
2021-12-04 03:32:59 +01:00
go device.RoutinePostPeerInfo(device.Chan_HttpPostStart)
2021-08-20 19:32:50 +02:00
}
2021-12-02 18:13:48 +01:00
// create queues
device.queue.handshake = newHandshakeQueue()
device.queue.encryption = newOutboundQueue()
device.queue.decryption = newInboundQueue()
2017-07-01 23:29:22 +02:00
// start workers
2018-05-05 06:00:38 +02:00
cpus := runtime.NumCPU()
device.state.stopping.Wait()
device.queue.encryption.wg.Add(cpus) // One for each RoutineHandshake
for i := 0; i < cpus; i++ {
go device.RoutineEncryption(i + 1)
go device.RoutineDecryption(i + 1)
go device.RoutineHandshake(i + 1)
}
2017-12-01 23:37:26 +01:00
device.state.stopping.Add(1) // RoutineReadFromTUN
device.queue.encryption.wg.Add(1) // RoutineReadFromTUN
go device.RoutineReadFromTUN()
go device.RoutineTUNEventReader()
2017-12-01 23:37:26 +01:00
return device
2017-06-24 15:34:17 +02:00
}
2021-12-02 18:13:48 +01:00
func (device *Device) LookupPeerIDAtConfig(pk NoisePublicKey) (ID mtypes.Vertex, err error) {
2021-08-24 10:43:55 +02:00
if device.IsSuperNode {
2021-12-02 18:13:48 +01:00
var peerlist []mtypes.SuperPeerInfo
2021-08-24 10:43:55 +02:00
if device.SuperConfig == nil {
2021-12-09 08:46:15 +01:00
return 0, errors.New("superconfig is nil")
2021-08-24 10:43:55 +02:00
}
peerlist = device.SuperConfig.Peers
2021-09-21 03:15:23 +02:00
pkstr := pk.ToString()
2021-09-20 22:20:00 +02:00
for _, peerinfo := range peerlist {
if peerinfo.PubKey == pkstr {
return peerinfo.NodeID, nil
}
}
2021-08-24 10:43:55 +02:00
} else {
2021-12-02 18:13:48 +01:00
var peerlist []mtypes.PeerInfo
2021-08-24 10:43:55 +02:00
if device.EdgeConfig == nil {
2021-12-09 08:46:15 +01:00
return 0, errors.New("edgeconfig is nil")
2021-08-24 10:43:55 +02:00
}
peerlist = device.EdgeConfig.Peers
2021-09-21 03:15:23 +02:00
pkstr := pk.ToString()
2021-09-20 22:20:00 +02:00
for _, peerinfo := range peerlist {
if peerinfo.PubKey == pkstr {
return peerinfo.NodeID, nil
}
2021-08-24 10:43:55 +02:00
}
}
2021-09-20 22:20:00 +02:00
2021-12-09 08:46:15 +01:00
return 0, errors.New("peer not found in the config file")
}
type VPair struct {
s mtypes.Vertex
d mtypes.Vertex
}
type PSKDB struct {
db sync.Map
}
func (D *PSKDB) GetPSK(s mtypes.Vertex, d mtypes.Vertex) (psk NoisePresharedKey) {
if s > d {
s, d = d, s
}
vp := VPair{
s: s,
d: d,
}
pski, ok := D.db.Load(vp)
if !ok {
psk = RandomPSK()
pski, _ = D.db.LoadOrStore(vp, psk)
return pski.(NoisePresharedKey)
}
return pski.(NoisePresharedKey)
}
func (D *PSKDB) DelNode(n mtypes.Vertex) {
D.db.Range(func(key, value interface{}) bool {
vp := key.(VPair)
if vp.s == n || vp.d == n {
D.db.Delete(vp)
}
return true
})
2021-08-24 10:43:55 +02:00
}
2017-06-24 15:34:17 +02:00
func (device *Device) LookupPeer(pk NoisePublicKey) *Peer {
device.peers.RLock()
defer device.peers.RUnlock()
return device.peers.keyMap[pk]
2017-06-24 15:34:17 +02:00
}
2021-08-20 19:32:50 +02:00
func (device *Device) LookupPeerByStr(pks string) *Peer {
var pk NoisePublicKey
sk_slice, _ := base64.StdEncoding.DecodeString(pks)
copy(pk[:], sk_slice)
return device.LookupPeer(pk)
}
2021-12-09 08:46:15 +01:00
func (pk NoisePublicKey) ToString() string {
2021-09-21 03:15:23 +02:00
if bytes.Equal(pk[:], make([]byte, len(pk))) {
return ""
}
return string(base64.StdEncoding.EncodeToString(pk[:]))
2021-08-20 19:32:50 +02:00
}
2021-12-09 08:46:15 +01:00
func (pk NoisePrivateKey) ToString() (result string) {
2021-09-21 03:15:23 +02:00
if bytes.Equal(pk[:], make([]byte, len(pk))) {
return ""
}
return string(base64.StdEncoding.EncodeToString(pk[:]))
2021-08-20 19:32:50 +02:00
}
2021-12-09 08:46:15 +01:00
func (pk NoisePresharedKey) ToString() (result string) {
2021-09-21 03:15:23 +02:00
if bytes.Equal(pk[:], make([]byte, len(pk))) {
return ""
}
return string(base64.StdEncoding.EncodeToString(pk[:]))
2021-08-20 19:32:50 +02:00
}
2021-09-21 03:15:23 +02:00
func Str2PubKey(k string) (pk NoisePublicKey, err error) {
if k == "" {
2021-12-09 08:46:15 +01:00
err = errors.New("empty public key string")
2021-09-21 03:15:23 +02:00
return
}
sk_slice, err := base64.StdEncoding.DecodeString(k)
2021-08-20 19:32:50 +02:00
copy(pk[:], sk_slice)
return
}
2021-09-21 03:15:23 +02:00
func Str2PriKey(k string) (pk NoisePrivateKey, err error) {
if k == "" {
2021-12-09 08:46:15 +01:00
err = errors.New("empty private key string")
2021-09-21 03:15:23 +02:00
return
}
sk_slice, err := base64.StdEncoding.DecodeString(k)
2021-08-20 19:32:50 +02:00
copy(pk[:], sk_slice)
return
}
2021-09-21 03:15:23 +02:00
func Str2PSKey(k string) (pk NoisePresharedKey, err error) {
if k == "" {
return
}
sk_slice, err := base64.StdEncoding.DecodeString(k)
2021-08-20 19:32:50 +02:00
copy(pk[:], sk_slice)
return
}
2021-12-09 08:46:15 +01:00
func RandomKeyPair() (pri NoisePrivateKey, pub NoisePublicKey) {
pri = mtypes.ByteSlice2Byte32(mtypes.RandomBytes(32, make([]byte, 32)))
pub = pri.PublicKey()
return
}
func RandomPSK() (pk NoisePresharedKey) {
return mtypes.ByteSlice2Byte32(mtypes.RandomBytes(32, make([]byte, 32)))
}
2021-12-02 18:13:48 +01:00
func (device *Device) GetConnurl(v mtypes.Vertex) string {
2021-09-21 22:03:11 +02:00
if peer, has := device.peers.IDMap[v]; has {
if peer.endpoint != nil {
return peer.endpoint.DstToString()
}
}
return ""
}
2021-12-02 18:13:48 +01:00
func (device *Device) RemovePeerByID(id mtypes.Vertex) {
2021-09-21 22:03:11 +02:00
device.peers.Lock()
defer device.peers.Unlock()
peer, ok := device.peers.IDMap[id]
if ok {
removePeerLocked(device, peer, peer.handshake.remoteStatic)
}
2021-08-20 19:32:50 +02:00
}
2017-06-24 15:34:17 +02:00
func (device *Device) RemovePeer(key NoisePublicKey) {
device.peers.Lock()
defer device.peers.Unlock()
// stop peer and remove from routing
peer, ok := device.peers.keyMap[key]
if ok {
removePeerLocked(device, peer, key)
}
2017-06-01 21:31:30 +02:00
}
2017-06-24 15:34:17 +02:00
func (device *Device) RemoveAllPeers() {
device.peers.Lock()
defer device.peers.Unlock()
for key, peer := range device.peers.keyMap {
removePeerLocked(device, peer, key)
2017-06-01 21:31:30 +02:00
}
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
2021-12-02 18:13:48 +01:00
device.peers.IDMap = make(map[mtypes.Vertex]*Peer)
}
func (device *Device) Close() {
device.state.Lock()
defer device.state.Unlock()
if device.isClosed() {
return
}
atomic.StoreUint32(&device.state.state, uint32(deviceStateClosed))
device.log.Verbosef("Device closing")
2021-08-16 20:58:15 +02:00
device.tap.device.Close()
device.downLocked()
// Remove peers before closing queues,
// because peers assume that queues are active.
device.RemoveAllPeers()
// We kept a reference to the encryption and decryption queues,
// in case we started any new peers that might write to them.
// No new peers are coming; we are done with these queues.
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-15 00:07:23 +01:00
device.queue.encryption.wg.Done()
device.queue.decryption.wg.Done()
device.queue.handshake.wg.Done()
device.state.stopping.Wait()
2018-02-11 22:53:39 +01:00
device.rate.limiter.Close()
device.log.Verbosef("Device closed")
close(device.closed)
}
2021-12-02 18:13:48 +01:00
func (device *Device) Wait() chan int {
return device.closed
}
func (device *Device) SendKeepalivesToPeersWithCurrentKeypair() {
if !device.isUp() {
return
}
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.keypairs.RLock()
sendKeepalive := peer.keypairs.current != nil && !peer.keypairs.current.created.Add(RejectAfterTime).Before(time.Now())
peer.keypairs.RUnlock()
if sendKeepalive {
peer.SendKeepalive()
}
}
device.peers.RUnlock()
}
// closeBindLocked closes the device's net.bind.
// The caller must hold the net mutex.
func closeBindLocked(device *Device) error {
var err error
netc := &device.net
if netc.netlinkCancel != nil {
netc.netlinkCancel.Cancel()
}
if netc.bind != nil {
err = netc.bind.Close()
}
netc.stopping.Wait()
return err
}
func (device *Device) Bind() conn.Bind {
device.net.Lock()
defer device.net.Unlock()
return device.net.bind
}
func (device *Device) BindSetMark(mark uint32) error {
device.net.Lock()
defer device.net.Unlock()
// check if modified
if device.net.fwmark == mark {
return nil
}
// update fwmark on existing bind
device.net.fwmark = mark
if device.isUp() && device.net.bind != nil {
if err := device.net.bind.SetMark(mark); err != nil {
return err
}
}
// clear cached source addresses
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Lock()
defer peer.Unlock()
if peer.endpoint != nil {
peer.endpoint.ClearSrc()
}
}
device.peers.RUnlock()
return nil
}
func (device *Device) BindUpdate() error {
device.net.Lock()
defer device.net.Unlock()
// close existing sockets
if err := closeBindLocked(device); err != nil {
return err
}
// open new sockets
if !device.isUp() {
return nil
}
// bind to new port
var err error
var recvFns []conn.ReceiveFunc
netc := &device.net
recvFns, netc.port, err = netc.bind.Open(netc.port)
if err != nil {
netc.port = 0
return err
}
netc.netlinkCancel, err = device.startRouteListener(netc.bind)
if err != nil {
netc.bind.Close()
netc.port = 0
return err
}
// set fwmark
if netc.fwmark != 0 {
err = netc.bind.SetMark(netc.fwmark)
if err != nil {
return err
}
}
// clear cached source addresses
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Lock()
defer peer.Unlock()
if peer.endpoint != nil {
peer.endpoint.ClearSrc()
}
}
device.peers.RUnlock()
// start receiving routines
device.net.stopping.Add(len(recvFns))
device.queue.decryption.wg.Add(len(recvFns)) // each RoutineReceiveIncoming goroutine writes to device.queue.decryption
device.queue.handshake.wg.Add(len(recvFns)) // each RoutineReceiveIncoming goroutine writes to device.queue.handshake
for _, fn := range recvFns {
go device.RoutineReceiveIncoming(fn)
}
device.log.Verbosef("UDP bind has been updated")
return nil
}
func (device *Device) BindClose() error {
device.net.Lock()
err := closeBindLocked(device)
device.net.Unlock()
return err
}