mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-22 00:13:52 +01:00
Implement chunking.
Move from rpc to separate util package.
This commit is contained in:
parent
a790f8ee89
commit
69f8e7cfc3
@ -1,26 +0,0 @@
|
||||
package rpc
|
||||
|
||||
struct Unchunker {
|
||||
In io.Reader
|
||||
internalBuffer []byte
|
||||
remainingChunkSize uint
|
||||
}
|
||||
|
||||
func NewUnchunker(conn io.Reader) {
|
||||
return Unchunker{In: conn} // TODO
|
||||
}
|
||||
|
||||
func (c Unchunker) Read(b []byte) (n int, error) {
|
||||
// read min(c.internalBuffer.len, b.len) from c.internalBuffer into b
|
||||
// fill up internalBuffer
|
||||
// 1 read up to max(remainingChunkSize,internalBuffer.len) from In
|
||||
// 2 if remainingChunkSize == 0, read next chunk size, update remainingChunkSize
|
||||
// 3 goto 1
|
||||
}
|
||||
|
||||
struct Chunker {
|
||||
In io.Reader
|
||||
MaxChunkSize uint
|
||||
|
||||
ChunkBuf []byte // length of buf determines chunk size? --> would be fixed then
|
||||
}
|
@ -80,7 +80,7 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser,
|
||||
io.Copy(&b, stderr)
|
||||
fmt.Println(b.String())
|
||||
fmt.Printf("%v\n", cmd.ProcessState)
|
||||
panic(err)
|
||||
//panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
132
util/chunking.go
Normal file
132
util/chunking.go
Normal file
@ -0,0 +1,132 @@
|
||||
package chunking
|
||||
|
||||
import (
|
||||
"io"
|
||||
"encoding/binary"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
var ChunkBufSize uint32 = 32*1024
|
||||
var ChunkHeaderByteOrder = binary.LittleEndian
|
||||
|
||||
type Unchunker struct {
|
||||
ChunkCount int
|
||||
in io.Reader
|
||||
remainingChunkBytes uint32
|
||||
}
|
||||
|
||||
func NewUnchunker(conn io.Reader) Unchunker {
|
||||
return Unchunker{
|
||||
in: conn,
|
||||
remainingChunkBytes: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Unchunker) Read(b []byte) (n int, err error) {
|
||||
|
||||
if c.remainingChunkBytes == 0 {
|
||||
|
||||
var nextChunkLen uint32
|
||||
err = binary.Read(c.in, ChunkHeaderByteOrder, &nextChunkLen)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// A chunk of len 0 indicates end of stream
|
||||
if nextChunkLen == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
c.remainingChunkBytes = nextChunkLen
|
||||
c.ChunkCount++
|
||||
|
||||
}
|
||||
|
||||
maxRead := min(int(c.remainingChunkBytes), len(b))
|
||||
if maxRead < 0 {
|
||||
panic("Cannot read negative amount of bytes")
|
||||
}
|
||||
if maxRead == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
n, err = c.in.Read(b[0:maxRead])
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
c.remainingChunkBytes -= uint32(n)
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
} else {
|
||||
return b
|
||||
}
|
||||
}
|
||||
|
||||
type Chunker struct {
|
||||
ChunkCount int
|
||||
in io.Reader
|
||||
remainingChunkBytes int
|
||||
payloadBuf []byte
|
||||
headerBuf *bytes.Buffer
|
||||
}
|
||||
|
||||
func NewChunker(conn io.Reader) Chunker {
|
||||
return NewChunkerSized(conn, ChunkBufSize)
|
||||
}
|
||||
|
||||
func NewChunkerSized(conn io.Reader, chunkSize uint32) Chunker {
|
||||
|
||||
buf := make([]byte, int(chunkSize)-binary.Size(chunkSize))
|
||||
|
||||
return Chunker{
|
||||
in: conn,
|
||||
remainingChunkBytes: 0,
|
||||
payloadBuf: buf,
|
||||
headerBuf: &bytes.Buffer{},
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *Chunker) Read(b []byte) (n int, err error) {
|
||||
|
||||
//fmt.Printf("chunker: c.remainingChunkBytes: %d len(b): %d\n", c.remainingChunkBytes, len(b))
|
||||
|
||||
n = 0
|
||||
if c.remainingChunkBytes == 0 {
|
||||
|
||||
newPayloadLen, err := c.in.Read(c.payloadBuf)
|
||||
|
||||
if newPayloadLen == 0 {
|
||||
return 0, io.EOF
|
||||
} else if err != nil {
|
||||
return newPayloadLen, err
|
||||
}
|
||||
|
||||
c.remainingChunkBytes = newPayloadLen
|
||||
|
||||
// Write chunk header
|
||||
c.headerBuf.Reset()
|
||||
nextChunkLen := uint32(newPayloadLen);
|
||||
headerLen := binary.Size(nextChunkLen)
|
||||
err = binary.Write(c.headerBuf, ChunkHeaderByteOrder, nextChunkLen)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
copy(b[0:headerLen], c.headerBuf.Bytes())
|
||||
n += headerLen
|
||||
c.ChunkCount++
|
||||
}
|
||||
|
||||
remainingBuf := b[n:]
|
||||
n2 := copy(remainingBuf, c.payloadBuf[:c.remainingChunkBytes])
|
||||
//fmt.Printf("chunker: written: %d\n", n+int(n2))
|
||||
c.remainingChunkBytes -= n2
|
||||
return n+int(n2), err
|
||||
}
|
Loading…
Reference in New Issue
Block a user