rpc/dataconn/stream: fix goroutine leaks & transitive buffer leaks

fixes #174
This commit is contained in:
Christian Schwarz 2019-09-07 22:22:05 +02:00
parent e5f944c2f8
commit 3bfe0c16d0
2 changed files with 18 additions and 2 deletions

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"net" "net"
"strings" "strings"
"sync/atomic"
"unicode/utf8" "unicode/utf8"
"github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/logger"
@ -81,8 +82,10 @@ func doWriteStream(ctx context.Context, c *heartbeatconn.Conn, stream io.Reader,
} }
reads := make(chan read, 5) reads := make(chan read, 5)
var stopReading uint32
go func() { go func() {
for { defer close(reads)
for atomic.LoadUint32(&stopReading) == 0 {
buffer := bufpool.Get(1 << FramePayloadShift) buffer := bufpool.Get(1 << FramePayloadShift)
bufferBytes := buffer.Bytes() bufferBytes := buffer.Bytes()
n, err := io.ReadFull(stream, bufferBytes) n, err := io.ReadFull(stream, bufferBytes)
@ -97,12 +100,21 @@ func doWriteStream(ctx context.Context, c *heartbeatconn.Conn, stream io.Reader,
} }
if err != nil { if err != nil {
reads <- read{err: err} // RULE1 reads <- read{err: err} // RULE1
close(reads)
return return
} }
} }
}() }()
defer func() {
// stop reading
atomic.StoreUint32(&stopReading, 1)
// drain in-flight reads
for read := range reads {
debug("doWriteStream: drain read channel")
read.buf.Free()
}
}()
for read := range reads { for read := range reads {
if read.err == nil { if read.err == nil {
// RULE 1: read.buf is valid // RULE 1: read.buf is valid

View File

@ -189,5 +189,9 @@ func (c *Conn) SendStream(ctx context.Context, src zfs.StreamCopier, frameType u
func (c *Conn) Close() error { func (c *Conn) Close() error {
err := c.hc.Shutdown() err := c.hc.Shutdown()
<-c.waitReadFramesDone <-c.waitReadFramesDone
for read := range c.frameReads {
debug("Conn.Close() draining queued read")
read.f.Buffer.Free()
}
return err return err
} }