device: flush peer queues before starting device

In case some old packets snuck in there before, this flushes before
starting afresh.

Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
This commit is contained in:
Jason A. Donenfeld 2021-02-10 00:39:28 +01:00
parent 5bf8d73127
commit 484a9fd324
2 changed files with 30 additions and 24 deletions

View File

@ -83,21 +83,23 @@ func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue {
q := &autodrainingInboundQueue{ q := &autodrainingInboundQueue{
c: make(chan *QueueInboundElement, QueueInboundSize), c: make(chan *QueueInboundElement, QueueInboundSize),
} }
runtime.SetFinalizer(q, func(q *autodrainingInboundQueue) { runtime.SetFinalizer(q, device.flushInboundQueue)
for {
select {
case elem := <-q.c:
elem.Lock()
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
default:
return
}
}
})
return q return q
} }
func (device *Device) flushInboundQueue(q *autodrainingInboundQueue) {
for {
select {
case elem := <-q.c:
elem.Lock()
device.PutMessageBuffer(elem.buffer)
device.PutInboundElement(elem)
default:
return
}
}
}
type autodrainingOutboundQueue struct { type autodrainingOutboundQueue struct {
c chan *QueueOutboundElement c chan *QueueOutboundElement
} }
@ -111,17 +113,19 @@ func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue {
q := &autodrainingOutboundQueue{ q := &autodrainingOutboundQueue{
c: make(chan *QueueOutboundElement, QueueOutboundSize), c: make(chan *QueueOutboundElement, QueueOutboundSize),
} }
runtime.SetFinalizer(q, func(q *autodrainingOutboundQueue) { runtime.SetFinalizer(q, device.flushOutboundQueue)
for {
select {
case elem := <-q.c:
elem.Lock()
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
default:
return
}
}
})
return q return q
} }
func (device *Device) flushOutboundQueue(q *autodrainingOutboundQueue) {
for {
select {
case elem := <-q.c:
elem.Lock()
device.PutMessageBuffer(elem.buffer)
device.PutOutboundElement(elem)
default:
return
}
}
}

View File

@ -186,6 +186,8 @@ func (peer *Peer) Start() {
peer.timersStart() peer.timersStart()
device.flushInboundQueue(peer.queue.inbound)
device.flushOutboundQueue(peer.queue.outbound)
go peer.RoutineSequentialSender() go peer.RoutineSequentialSender()
go peer.RoutineSequentialReceiver() go peer.RoutineSequentialReceiver()