diff --git a/send.go b/send.go index ce545df..734c425 100644 --- a/send.go +++ b/send.go @@ -336,6 +336,16 @@ func (peer *Peer) RoutineNonce() { peer.routines.stopping.Done() }() + flush := func() { + for { + select { + case <-peer.queue.nonce: + default: + return + } + } + } + peer.routines.starting.Done() logDebug.Println(peer, ": Routine: nonce worker - started") @@ -347,15 +357,22 @@ func (peer *Peer) RoutineNonce() { case <-peer.routines.stop: return + case <-peer.signals.flushNonceQueue: + flush() + goto NextPacket + case elem, ok := <-peer.queue.nonce: if !ok { return } - // wait for key pair + // make sure to always pick the newest key for { + + // check validity of newest key pair + keypair = peer.keypairs.Current() if keypair != nil && keypair.sendNonce < RejectAfterMessages { if time.Now().Sub(keypair.created) < RejectAfterTime { @@ -364,6 +381,8 @@ func (peer *Peer) RoutineNonce() { } peer.queue.packetInNonceQueueIsAwaitingKey = true + // no suitable key pair, request for new handshake + select { case <-peer.signals.newKeypairArrived: default: @@ -371,19 +390,18 @@ func (peer *Peer) RoutineNonce() { peer.SendHandshakeInitiation(false) + // wait for key to be established + logDebug.Println(peer, ": Awaiting keypair") select { case <-peer.signals.newKeypairArrived: logDebug.Println(peer, ": Obtained awaited keypair") + case <-peer.signals.flushNonceQueue: - for { - select { - case <-peer.queue.nonce: - default: - goto NextPacket - } - } + flush() + goto NextPacket + case <-peer.routines.stop: return } @@ -394,10 +412,14 @@ func (peer *Peer) RoutineNonce() { elem.peer = peer elem.nonce = atomic.AddUint64(&keypair.sendNonce, 1) - 1 + // double check in case of race condition added by future code + if elem.nonce >= RejectAfterMessages { + atomic.StoreUint64(&keypair.sendNonce, RejectAfterMessages) goto NextPacket } + elem.keypair = keypair elem.dropped = AtomicFalse elem.mutex.Lock()