From 69f8e7cfc33197241d37b53047124ccb7d2938a9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 15 Apr 2017 17:07:32 +0200 Subject: [PATCH] Implement chunking. Move from rpc to separate util package. --- rpc/bytestream.go | 26 --------- sshbytestream/ssh.go | 2 +- util/chunking.go | 132 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 27 deletions(-) delete mode 100644 rpc/bytestream.go create mode 100644 util/chunking.go diff --git a/rpc/bytestream.go b/rpc/bytestream.go deleted file mode 100644 index b8492f8..0000000 --- a/rpc/bytestream.go +++ /dev/null @@ -1,26 +0,0 @@ -package rpc - -struct Unchunker { - In io.Reader - internalBuffer []byte - remainingChunkSize uint -} - -func NewUnchunker(conn io.Reader) { - return Unchunker{In: conn} // TODO -} - -func (c Unchunker) Read(b []byte) (n int, error) { - // read min(c.internalBuffer.len, b.len) from c.internalBuffer into b - // fill up internalBuffer - // 1 read up to max(remainingChunkSize,internalBuffer.len) from In - // 2 if remainingChunkSize == 0, read next chunk size, update remainingChunkSize - // 3 goto 1 -} - -struct Chunker { - In io.Reader - MaxChunkSize uint - - ChunkBuf []byte // length of buf determines chunk size? --> would be fixed then -} \ No newline at end of file diff --git a/sshbytestream/ssh.go b/sshbytestream/ssh.go index 6238218..3023969 100644 --- a/sshbytestream/ssh.go +++ b/sshbytestream/ssh.go @@ -80,7 +80,7 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser, io.Copy(&b, stderr) fmt.Println(b.String()) fmt.Printf("%v\n", cmd.ProcessState) - panic(err) + //panic(err) } }() diff --git a/util/chunking.go b/util/chunking.go new file mode 100644 index 0000000..d34c84b --- /dev/null +++ b/util/chunking.go @@ -0,0 +1,132 @@ +package chunking + +import ( + "io" + "encoding/binary" + "bytes" +) + +var ChunkBufSize uint32 = 32*1024 +var ChunkHeaderByteOrder = binary.LittleEndian + +type Unchunker struct { + ChunkCount int + in io.Reader + remainingChunkBytes uint32 +} + +func NewUnchunker(conn io.Reader) Unchunker { + return Unchunker{ + in: conn, + remainingChunkBytes: 0, + } +} + +func (c *Unchunker) Read(b []byte) (n int, err error) { + + if c.remainingChunkBytes == 0 { + + var nextChunkLen uint32 + err = binary.Read(c.in, ChunkHeaderByteOrder, &nextChunkLen) + if err != nil { + return + } + + // A chunk of len 0 indicates end of stream + if nextChunkLen == 0 { + return 0, io.EOF + } + + c.remainingChunkBytes = nextChunkLen + c.ChunkCount++ + + } + + maxRead := min(int(c.remainingChunkBytes), len(b)) + if maxRead < 0 { + panic("Cannot read negative amount of bytes") + } + if maxRead == 0 { + return 0, nil + } + + n, err = c.in.Read(b[0:maxRead]) + if err != nil { + return n, err + } + c.remainingChunkBytes -= uint32(n) + + return + +} + +func min(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +type Chunker struct { + ChunkCount int + in io.Reader + remainingChunkBytes int + payloadBuf []byte + headerBuf *bytes.Buffer +} + +func NewChunker(conn io.Reader) Chunker { + return NewChunkerSized(conn, ChunkBufSize) +} + +func NewChunkerSized(conn io.Reader, chunkSize uint32) Chunker { + + buf := make([]byte, int(chunkSize)-binary.Size(chunkSize)) + + return Chunker{ + in: conn, + remainingChunkBytes: 0, + payloadBuf: buf, + headerBuf: &bytes.Buffer{}, + + } + +} + +func (c *Chunker) Read(b []byte) (n int, err error) { + + //fmt.Printf("chunker: c.remainingChunkBytes: %d len(b): %d\n", c.remainingChunkBytes, len(b)) + + n = 0 + if c.remainingChunkBytes == 0 { + + newPayloadLen, err := c.in.Read(c.payloadBuf) + + if newPayloadLen == 0 { + return 0, io.EOF + } else if err != nil { + return newPayloadLen, err + } + + c.remainingChunkBytes = newPayloadLen + + // Write chunk header + c.headerBuf.Reset() + nextChunkLen := uint32(newPayloadLen); + headerLen := binary.Size(nextChunkLen) + err = binary.Write(c.headerBuf, ChunkHeaderByteOrder, nextChunkLen) + if err != nil { + return n, err + } + copy(b[0:headerLen], c.headerBuf.Bytes()) + n += headerLen + c.ChunkCount++ + } + + remainingBuf := b[n:] + n2 := copy(remainingBuf, c.payloadBuf[:c.remainingChunkBytes]) + //fmt.Printf("chunker: written: %d\n", n+int(n2)) + c.remainingChunkBytes -= n2 + return n+int(n2), err +} \ No newline at end of file