press: second overhaul

... drop all compression algorithms except gzip
... factor code to make gzip seekable into new external library
... many small bugfixes
This commit is contained in:
buengese 2020-09-03 23:41:57 +02:00
parent 5da98499ed
commit ba6b077b30
7 changed files with 170 additions and 1283 deletions

View File

@ -1,75 +0,0 @@
package press
import (
"bufio"
"io"
"github.com/klauspost/compress/gzip"
)
// AlgGzip represents gzip compression algorithm
type AlgGzip struct {
level int
blockSize uint32
}
// InitializeGzip initializes the gzip compression Algorithm
func InitializeGzip(bs uint32, level int) Algorithm {
a := new(AlgGzip)
a.blockSize = bs
a.level = level
return a
}
// GetFileExtension returns file extension
func (a *AlgGzip) GetFileExtension() string {
return ".gz"
}
// GetHeader returns the Lz4 compression header
func (a *AlgGzip) GetHeader() []byte {
return []byte{}
}
// GetFooter returns
func (a *AlgGzip) GetFooter() []byte {
return []byte{}
}
// CompressBlock that compresses a block using gzip
func (a *AlgGzip) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) {
// Initialize buffer
bufw := bufio.NewWriterSize(out, int(a.blockSize+(a.blockSize)>>4))
// Initialize block writer
outw, err := gzip.NewWriterLevel(bufw, a.level)
if err != nil {
return 0, 0, err
}
// Compress block
_, err = outw.Write(in)
if err != nil {
return 0, 0, err
}
// Finalize gzip file, flush buffer and return
err = outw.Close()
if err != nil {
return 0, 0, err
}
blockSize := uint32(bufw.Buffered())
err = bufw.Flush()
return blockSize, uint64(len(in)), err
}
// DecompressBlock decompresses Lz4 compressed block
func (a *AlgGzip) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) {
gzipReader, err := gzip.NewReader(in)
if err != nil {
return 0, err
}
written, err := io.Copy(out, gzipReader)
return int(written), err
}

View File

@ -1,223 +0,0 @@
package press
// This file implements the LZ4 algorithm.
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math/bits"
"github.com/buengese/xxh32"
lz4 "github.com/pierrec/lz4"
)
/*
Structure of LZ4 header:
Flags:
Version = 01
Independent = 1
Block Checksum = 1
Content Size = 0
Content Checksum = 0
Reserved = 0
Dictionary ID = 0
BD byte:
Reserved = 0
Block Max Size = 101 (or 5; 256kb)
Reserved = 0000
Header checksum byte (xxhash(flags and bd byte) >> 1) & 0xff
*/
// LZ4Header - Header of our LZ4 file
//var LZ4Header = []byte{0x04, 0x22, 0x4d, 0x18, 0x70, 0x50, 0x84}
// LZ4Footer - Footer of our LZ4 file
var LZ4Footer = []byte{0x00, 0x00, 0x00, 0x00} // This is just an empty block
const (
frameMagic uint32 = 0x184D2204
compressedBlockFlag = 1 << 31
compressedBlockMask = compressedBlockFlag - 1
)
// AlgLz4 is the Lz4 Compression algorithm
type AlgLz4 struct {
Header lz4.Header
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
}
// InitializeLz4 creates an Lz4 compression algorithm
func InitializeLz4(bs uint32, blockChecksum bool) Algorithm {
a := new(AlgLz4)
a.Header.Reset()
a.Header = lz4.Header{
BlockChecksum: blockChecksum,
BlockMaxSize: int(bs),
}
return a
}
// GetFileExtension returns file extension
func (a *AlgLz4) GetFileExtension() string {
return ".lz4"
}
// GetHeader returns the Lz4 compression header
func (a *AlgLz4) GetHeader() []byte {
// Size is optional.
buf := a.buf[:]
// Set the fixed size data: magic number, block max size and flags.
binary.LittleEndian.PutUint32(buf[0:], frameMagic)
flg := byte(lz4.Version << 6)
flg |= 1 << 5 // No block dependency.
if a.Header.BlockChecksum {
flg |= 1 << 4
}
if a.Header.Size > 0 {
flg |= 1 << 3
}
buf[4] = flg
buf[5] = blockSizeValueToIndex(a.Header.BlockMaxSize) << 4
// Current buffer size: magic(4) + flags(1) + block max size (1).
n := 6
if a.Header.Size > 0 {
binary.LittleEndian.PutUint64(buf[n:], a.Header.Size)
n += 8
}
// The header checksum includes the flags, block max size and optional Size.
buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
// Header ready, write it out.
return buf[0 : n+1]
}
// GetFooter returns
func (a *AlgLz4) GetFooter() []byte {
return LZ4Footer
}
// CompressBlock that compresses a block using lz4
func (a *AlgLz4) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) {
if len(in) > 0 {
n, err := a.compressBlock(in, out)
if err != nil {
return 0, 0, err
}
return n, uint64(len(in)), nil
}
return 0, 0, nil
}
// compressBlock compresses a block.
func (a *AlgLz4) compressBlock(data []byte, dst io.Writer) (uint32, error) {
zdata := make([]byte, a.Header.BlockMaxSize) // The compressed block size cannot exceed the input's.
var zn int
if level := a.Header.CompressionLevel; level != 0 {
zn, _ = lz4.CompressBlockHC(data, zdata, level)
} else {
var hashTable [1 << 16]int
zn, _ = lz4.CompressBlock(data, zdata, hashTable[:])
}
var bLen uint32
if zn > 0 && zn < len(data) {
// Compressible and compressed size smaller than uncompressed: ok!
bLen = uint32(zn)
zdata = zdata[:zn]
} else {
// Uncompressed block.
bLen = uint32(len(data)) | compressedBlockFlag
zdata = data
}
// Write the block.
if err := a.writeUint32(bLen, dst); err != nil {
return 0, err
}
_, err := dst.Write(zdata)
if err != nil {
return 0, err
}
if !a.Header.BlockChecksum {
return bLen, nil
}
checksum := xxh32.ChecksumZero(zdata)
if err := a.writeUint32(checksum, dst); err != nil {
return 0, err
}
return bLen, nil
}
// writeUint32 writes a uint32 to the underlying writer.
func (a *AlgLz4) writeUint32(x uint32, dst io.Writer) error {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, x)
_, err := dst.Write(buf)
return err
}
func blockSizeValueToIndex(size int) byte {
return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2)
}
// DecompressBlock decompresses Lz4 compressed block
func (a *AlgLz4) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) {
// Get our compressed data
var b bytes.Buffer
_, err = io.Copy(&b, in)
if err != nil {
return 0, err
}
zdata := b.Bytes()
bLen := binary.LittleEndian.Uint32(zdata[:4])
if bLen&compressedBlockFlag > 0 {
// Uncompressed block.
bLen &= compressedBlockMask
if bLen > BlockSize {
return 0, fmt.Errorf("lz4: invalid block size: %d", bLen)
}
data := zdata[4 : bLen+4]
if a.Header.BlockChecksum {
checksum := binary.LittleEndian.Uint32(zdata[4+bLen:])
if h := xxh32.ChecksumZero(data); h != checksum {
return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum)
}
}
_, err := out.Write(data)
return len(data), err
}
// compressed block
if bLen > BlockSize {
return 0, fmt.Errorf("lz4: invalid block size: %d", bLen)
}
if a.Header.BlockChecksum {
checksum := binary.LittleEndian.Uint32(zdata[4+bLen:])
if h := xxh32.ChecksumZero(zdata[4 : bLen+4]); h != checksum {
return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum)
}
}
data := make([]byte, BlockSize)
n, err = lz4.UncompressBlock(zdata[4:bLen+4], data)
if err != nil {
return 0, err
}
_, err = out.Write(data[:n])
return n, err
}

View File

@ -1,75 +0,0 @@
package press
import (
"bufio"
"io"
"github.com/ulikunitz/xz"
)
// AlgXZ represents the XZ compression algorithm
type AlgXZ struct {
blockSize uint32
config xz.WriterConfig
}
// InitializeXZ creates an Lz4 compression algorithm
func InitializeXZ(bs uint32) Algorithm {
a := new(AlgXZ)
a.blockSize = bs
a.config = xz.WriterConfig{}
return a
}
// GetFileExtension returns file extension
func (a *AlgXZ) GetFileExtension() string {
return ".xz"
}
// GetHeader returns the Lz4 compression header
func (a *AlgXZ) GetHeader() []byte {
return []byte{}
}
// GetFooter returns
func (a *AlgXZ) GetFooter() []byte {
return []byte{}
}
// CompressBlock that compresses a block using lz4
func (a *AlgXZ) CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error) {
// Initialize buffer
bufw := bufio.NewWriterSize(out, int(a.blockSize+(a.blockSize)>>4))
// Initialize block writer
outw, err := a.config.NewWriter(bufw)
if err != nil {
return 0, 0, err
}
// Compress block
_, err = outw.Write(in)
if err != nil {
return 0, 0, err
}
// Finalize gzip file, flush buffer and return
err = outw.Close()
if err != nil {
return 0, 0, err
}
blockSize := uint32(bufw.Buffered())
err = bufw.Flush()
return blockSize, uint64(len(in)), err
}
// DecompressBlock decompresses Lz4 compressed block
func (a *AlgXZ) DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error) {
xzReader, err := xz.NewReader(in)
if err != nil {
return 0, err
}
written, err := io.Copy(out, xzReader)
return int(written), err
}

View File

@ -1,526 +0,0 @@
// Package press provides wrappers for Fs and Object which implement compression.
// This file is the backend implementation for seekable compression.
package press
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
)
// Compression modes
const (
Uncompressed = -1
LZ4 = 2
Gzip = 4
XZ = 8
)
// Errors
var (
ErrMetadataCorrupted = errors.New("metadata may have been corrupted")
)
// DEBUG - flag for debug mode
const DEBUG = false
// Compression is a struct containing configurable variables (what used to be constants)
type Compression struct {
CompressionMode int // Compression mode
Algorithm Algorithm
BlockSize uint32 // Size of blocks. Higher block size means better compression but more download bandwidth needed for small downloads
// ~1MB is recommended for xz, while ~128KB is recommended for gzip and lz4
HeuristicBytes int64 // Bytes to perform gzip heuristic on to determine whether a file should be compressed
NumThreads int // Number of threads to use for compression
MaxCompressionRatio float64 // Maximum compression ratio for a file to be considered compressible
BinPath string // Path to compression binary. This is used for all non-gzip compression.
}
// Algorithm is the main compression Algorithm Interface
type Algorithm interface {
GetHeader() []byte
GetFileExtension() string
CompressBlock(in []byte, out io.Writer) (compressedSize uint32, uncompressedSize uint64, err error)
DecompressBlock(in io.Reader, out io.Writer, BlockSize uint32) (n int, err error)
GetFooter() []byte
}
// NewCompressionPreset creates a Compression object with a preset mode/bs
func NewCompressionPreset(preset string) (*Compression, error) {
switch preset {
case "lz4":
alg := InitializeLz4(262144, true)
return NewCompression(LZ4, alg, 262144) // LZ4 compression (very fast)
case "gzip":
alg := InitializeGzip(131072, 6)
return NewCompression(Gzip, alg, 131070) // GZIP-default compression (medium)*/
case "xz":
alg := InitializeXZ(1048576)
return NewCompression(XZ, alg, 1048576) // XZ compression (strong compression)*/
}
return nil, errors.New("Compression mode doesn't exist")
}
// NewCompressionPresetNumber creates a Compression object with a preset mode/bs
func NewCompressionPresetNumber(preset int) (*Compression, error) {
switch preset {
case LZ4:
alg := InitializeLz4(262144, true)
return NewCompression(LZ4, alg, 262144) // LZ4 compression (very fast)
case Gzip:
alg := InitializeGzip(131072, 6)
return NewCompression(Gzip, alg, 131070) // GZIP-default compression (medium)*/
case XZ:
alg := InitializeXZ(1048576)
return NewCompression(XZ, alg, 1048576) // XZ compression (strong compression)*/
}
return nil, errors.New("Compression mode doesn't exist")
}
// NewCompression creates a Compression object with some default configuration values
func NewCompression(mode int, alg Algorithm, bs uint32) (*Compression, error) {
return NewCompressionAdvanced(mode, alg, bs, 1048576, 12, 0.9)
}
// NewCompressionAdvanced creates a Compression object
func NewCompressionAdvanced(mode int, alg Algorithm, bs uint32, hb int64, threads int, mcr float64) (c *Compression, err error) {
// Set vars
c = new(Compression)
c.Algorithm = alg
c.CompressionMode = mode
c.BlockSize = bs
c.HeuristicBytes = hb
c.NumThreads = threads
c.MaxCompressionRatio = mcr
return c, err
}
/*** UTILITY FUNCTIONS ***/
// GetFileExtension gets a file extension for current compression mode
func (c *Compression) GetFileExtension() string {
return c.Algorithm.GetFileExtension()
}
// GetFileCompressionInfo gets a file extension along with compressibility of file
func (c *Compression) GetFileCompressionInfo(reader io.Reader) (compressable bool, extension string, err error) {
// Use our compression algorithm to do a heuristic on the first few bytes
var emulatedBlock, emulatedBlockCompressed bytes.Buffer
_, err = io.CopyN(&emulatedBlock, reader, c.HeuristicBytes)
if err != nil && err != io.EOF {
return false, "", err
}
compressedSize, uncompressedSize, err := c.Algorithm.CompressBlock(emulatedBlock.Bytes(), &emulatedBlockCompressed)
if err != nil {
return false, "", err
}
compressionRatio := float64(compressedSize) / float64(uncompressedSize)
// If the data is not compressible, return so
if compressionRatio > c.MaxCompressionRatio {
return false, ".bin", nil
}
// If the file is compressible, select file extension based on compression mode
return true, c.Algorithm.GetFileExtension(), nil
}
/*** MAIN COMPRESSION INTERFACE ***/
// compressionResult represents the result of compression for a single block (gotten by a single thread)
type compressionResult struct {
buffer *bytes.Buffer
n uint64
err error
}
// CompressFileReturningBlockData compresses a file returning the block data for that file.
func (c *Compression) CompressFileReturningBlockData(in io.Reader, out io.Writer) (blockData []uint32, err error) {
// Initialize buffered writer
bufw := bufio.NewWriterSize(out, int((c.BlockSize+(c.BlockSize)>>4)*uint32(c.NumThreads)))
// Get blockData, copy over header, add length of header to blockData
blockData = make([]uint32, 0)
header := c.Algorithm.GetHeader()
_, err = bufw.Write(header)
if err != nil {
return nil, err
}
blockData = append(blockData, uint32(len(header)))
// Compress blocks
for {
// Loop through threads, spawning a go procedure for each thread. If we get eof on one thread, set eofAt to that thread and break
compressionResults := make([]chan compressionResult, c.NumThreads)
eofAt := -1
for i := 0; i < c.NumThreads; i++ {
// Create thread channel and allocate buffer to pass to thread
compressionResults[i] = make(chan compressionResult)
var inputBuffer bytes.Buffer
_, err = io.CopyN(&inputBuffer, in, int64(c.BlockSize))
if err == io.EOF {
eofAt = i
} else if err != nil {
return nil, err
}
// Run thread
go func(i int, in []byte) {
// Initialize thread writer and result struct
var res compressionResult
var buffer bytes.Buffer
// Compress block
_, n, err := c.Algorithm.CompressBlock(in, &buffer)
if err != nil && err != io.EOF { // This errored out.
res.buffer = nil
res.n = 0
res.err = err
compressionResults[i] <- res
return
}
// Pass our data back to the main thread as a compression result
res.buffer = &buffer
res.n = n
res.err = err
compressionResults[i] <- res
}(i, inputBuffer.Bytes())
// If we have reached eof, we don't need more threads
if eofAt != -1 {
break
}
}
// Process writers in order
for i := 0; i < c.NumThreads; i++ {
if compressionResults[i] != nil {
// Get current compression result, get buffer, and copy buffer over to output
res := <-compressionResults[i]
close(compressionResults[i])
if res.buffer == nil {
return nil, res.err
}
blockSize := uint32(res.buffer.Len())
_, err = io.Copy(bufw, res.buffer)
if err != nil {
return nil, err
}
if DEBUG {
fmt.Printf("%d %d\n", res.n, blockSize)
}
// Append block size to block data
blockData = append(blockData, blockSize)
// If this is the last block, add the raw size of the last block to the end of blockData and break
if eofAt == i {
if DEBUG {
log.Printf("%d %d %d\n", res.n, byte(res.n%256), byte(res.n/256))
}
blockData = append(blockData, uint32(res.n))
break
}
}
}
// Get number of bytes written in this block (they should all be in the bufio buffer), then close gzip and flush buffer
err = bufw.Flush()
if err != nil {
return nil, err
}
// If eof happened, break
if eofAt != -1 {
if DEBUG {
log.Printf("%d", eofAt)
log.Printf("%v", blockData)
}
break
}
}
// Write footer and flush
footer := c.Algorithm.GetFooter()
_, err = bufw.Write(footer)
if err != nil {
return nil, err
}
err = bufw.Flush()
// Return
return blockData, err
}
/*** BLOCK DECOMPRESSION FUNCTIONS ***/
// Wrapper function for decompressBlock that implements multithreading
// decompressionResult represents the result of decompressing a block
type decompressionResult struct {
err error
buffer *bytes.Buffer
}
func (d *Decompressor) decompressBlockRangeMultithreaded(in io.Reader, out io.Writer, startingBlock uint32) (n int, err error) {
// First, use bufio.Reader to reduce the number of reads and bufio.Writer to reduce the number of writes
bufin := bufio.NewReader(in)
bufout := bufio.NewWriter(out)
// Decompress each block individually.
currBatch := startingBlock // Block # of start of current batch of blocks
totalBytesCopied := 0
for {
// Loop through threads
eofAt := -1
decompressionResults := make([]chan decompressionResult, d.c.NumThreads)
for i := 0; i < d.c.NumThreads; i++ {
// Get currBlock
currBlock := currBatch + uint32(i)
// Create channel
decompressionResults[i] = make(chan decompressionResult)
// Check if we've reached EOF
if currBlock >= d.numBlocks {
eofAt = i
break
}
// Get block to decompress
var compressedBlock bytes.Buffer
var err error
n, err := io.CopyN(&compressedBlock, bufin, d.blockStarts[currBlock+1]-d.blockStarts[currBlock])
if err != nil || n == 0 { // End of stream
eofAt = i
break
}
// Spawn thread to decompress block
if DEBUG {
log.Printf("Spawning %d", i)
}
go func(i int, currBlock uint32, in io.Reader) {
var block bytes.Buffer
var res decompressionResult
// Decompress block
_, res.err = d.c.Algorithm.DecompressBlock(in, &block, d.c.BlockSize)
res.buffer = &block
decompressionResults[i] <- res
}(i, currBlock, &compressedBlock)
}
if DEBUG {
log.Printf("Eof at %d", eofAt)
}
// Process results
for i := 0; i < d.c.NumThreads; i++ {
// If we got EOF, return
if eofAt == i {
return totalBytesCopied, bufout.Flush() // Flushing bufout is needed to prevent us from getting all nulls
}
// Get result and close
res := <-decompressionResults[i]
close(decompressionResults[i])
if res.err != nil {
return totalBytesCopied, res.err
}
// Copy to output and add to total bytes copied
n, err := io.Copy(bufout, res.buffer)
totalBytesCopied += int(n)
if err != nil {
return totalBytesCopied, err
}
}
// Add NumThreads to currBatch
currBatch += uint32(d.c.NumThreads)
}
}
/*** MAIN DECOMPRESSION INTERFACE ***/
// Decompressor is the ReadSeeker implementation for decompression
type Decompressor struct {
cursorPos *int64 // The current location we have seeked to
blockStarts []int64 // The start of each block. These will be recovered from the block sizes
numBlocks uint32 // Number of blocks
decompressedSize int64 // Decompressed size of the file.
in io.ReadSeeker // Input
c *Compression // Compression options
}
// Parses block data. Returns the number of blocks, the block start locations for each block, and the decompressed size of the entire file.
func parseBlockData(blockData []uint32, BlockSize uint32) (numBlocks uint32, blockStarts []int64, decompressedSize int64) {
// Parse the block data
blockDataLen := len(blockData)
numBlocks = uint32(blockDataLen - 1)
if DEBUG {
log.Printf("%v\n", blockData)
log.Printf("metadata len, numblocks = %d, %d", blockDataLen, numBlocks)
}
blockStarts = make([]int64, numBlocks+1) // Starts with start of first block (and end of header), ends with end of last block
currentBlockPosition := int64(0)
for i := uint32(0); i < numBlocks; i++ { // Loop through block data, getting starts of blocks.
currentBlockSize := blockData[i]
currentBlockPosition += int64(currentBlockSize)
blockStarts[i] = currentBlockPosition
}
blockStarts[numBlocks] = currentBlockPosition // End of last block
//log.Printf("Block Starts: %v\n", d.blockStarts)
numBlocks-- // Subtract 1 from number of blocks because our header technically isn't a block
// Get uncompressed size of last block and derive uncompressed size of file
lastBlockRawSize := blockData[blockDataLen-1]
decompressedSize = int64(numBlocks-1)*int64(BlockSize) + int64(lastBlockRawSize)
if DEBUG {
log.Printf("Decompressed size = %d", decompressedSize)
}
return numBlocks, blockStarts, decompressedSize
}
// Initializes decompressor with the block data specified.
func (d *Decompressor) initWithBlockData(c *Compression, in io.ReadSeeker, size int64, blockData []uint32) (err error) {
// Copy over compression object
d.c = c
// Initialize cursor position
d.cursorPos = new(int64)
// Parse the block data
d.numBlocks, d.blockStarts, d.decompressedSize = parseBlockData(blockData, d.c.BlockSize)
// Initialize cursor position value and copy over reader
*d.cursorPos = 0
_, err = in.Seek(0, io.SeekStart)
d.in = in
return err
}
// Read reads data using a decompressor
func (d Decompressor) Read(p []byte) (int, error) {
if DEBUG {
log.Printf("Cursor pos before: %d\n", *d.cursorPos)
}
// Check if we're at the end of the file or before the beginning of the file
if *d.cursorPos >= d.decompressedSize || *d.cursorPos < 0 {
if DEBUG {
log.Println("Out of bounds EOF")
}
return 0, io.EOF
}
// Get block range to read
blockNumber := *d.cursorPos / int64(d.c.BlockSize)
blockStart := d.blockStarts[blockNumber] // Start position of blocks to read
dataOffset := *d.cursorPos % int64(d.c.BlockSize) // Offset of data to read in blocks to read
bytesToRead := len(p) // Number of bytes to read
blocksToRead := (int64(bytesToRead)+dataOffset)/int64(d.c.BlockSize) + 1 // Number of blocks to read
returnEOF := false
if blockNumber+blocksToRead > int64(d.numBlocks) { // Overflowed the last block
blocksToRead = int64(d.numBlocks) - blockNumber
returnEOF = true
}
blockEnd := d.blockStarts[blockNumber+blocksToRead] // Start of the block after the last block we want to get is the end of the last block we want to get
blockLen := blockEnd - blockStart
// Read compressed block range into buffer
var compressedBlocks bytes.Buffer
_, err := d.in.Seek(blockStart, io.SeekStart)
if err != nil {
return 0, err
}
n1, err := io.CopyN(&compressedBlocks, d.in, blockLen)
if DEBUG {
log.Printf("block # = %d @ %d <- %d, len %d, copied %d bytes", blockNumber, blockStart, *d.cursorPos, blockLen, n1)
}
if err != nil {
if DEBUG {
log.Println("Copy Error")
}
return 0, err
}
// Decompress block range
var b bytes.Buffer
n, err := d.decompressBlockRangeMultithreaded(&compressedBlocks, &b, uint32(blockNumber))
if err != nil {
log.Println("Decompression error")
return n, err
}
// Calculate bytes read
readOverflow := *d.cursorPos + int64(bytesToRead) - d.decompressedSize
if readOverflow < 0 {
readOverflow = 0
}
bytesRead := int64(bytesToRead) - readOverflow
if DEBUG {
log.Printf("Read offset = %d, overflow = %d", dataOffset, readOverflow)
log.Printf("Decompressed %d bytes; read %d out of %d bytes\n", n, bytesRead, bytesToRead)
// log.Printf("%v", b.Bytes())
}
// If we read 0 bytes, we reached the end of the file
if bytesRead == 0 {
log.Println("EOF")
return 0, io.EOF
}
// Copy from buffer+offset to p
_, err = io.CopyN(ioutil.Discard, &b, dataOffset)
if err != nil {
return 0, err
}
n, err = b.Read(p) // Note: everything after bytesToRead bytes will be discarded; we are returning bytesToRead instead of n
if err != nil {
return n, err
}
// Increment cursor position and return
*d.cursorPos += bytesRead
if returnEOF {
if DEBUG {
log.Println("EOF")
}
return int(bytesRead), io.EOF
}
return int(bytesRead), nil
}
// Seek seeks to a location in compressed stream
func (d Decompressor) Seek(offset int64, whence int) (int64, error) {
// Seek to offset in cursorPos
if whence == io.SeekStart {
*d.cursorPos = offset
} else if whence == io.SeekCurrent {
*d.cursorPos += offset
} else if whence == io.SeekEnd {
*d.cursorPos = d.decompressedSize + offset
}
// Return
return offset, nil
}
// DecompressFileExtData decompresses a file using external block data. Argument "size" is very useful here.
func (c *Compression) DecompressFileExtData(in io.ReadSeeker, size int64, blockData []uint32) (FileHandle io.ReadSeeker, decompressedSize int64, err error) {
var decompressor Decompressor
err = decompressor.initWithBlockData(c, in, size, blockData)
return decompressor, decompressor.decompressedSize, err
}

View File

@ -1,131 +0,0 @@
package press
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/base64"
"io"
"io/ioutil"
"math/rand"
"os"
"strings"
"testing"
)
const TestStringSmall = "The quick brown fox jumps over the lazy dog."
const TestSizeLarge = 2097152 // 2 megabytes
// Tests compression and decompression for a preset
func testCompressDecompress(t *testing.T, preset string, testString string) {
// Create compression instance
comp, err := NewCompressionPreset(preset)
if err != nil {
t.Fatal(err)
}
// Open files and hashers
testFile := strings.NewReader(testString)
testFileHasher := md5.New()
if err != nil {
t.Fatal(err)
}
compressedFile, err := ioutil.TempFile(os.TempDir(), "rclone_compression_test")
if err != nil {
t.Fatal(err)
}
outHasher := md5.New()
// Compress file and hash it (size doesn't matter here)
testFileReader, testFileWriter := io.Pipe()
go func() {
_, err := io.Copy(io.MultiWriter(testFileHasher, testFileWriter), testFile)
if err != nil {
t.Fatal("Failed to write compressed file")
}
err = testFileWriter.Close()
if err != nil {
t.Log("Failed to close compressed file")
}
}()
var blockData []uint32
blockData, err = comp.CompressFileReturningBlockData(testFileReader, compressedFile)
if err != nil {
t.Fatalf("Compression failed with error: %v", err)
}
testFileHash := testFileHasher.Sum(nil)
// Get the size, seek to the beginning of the compressed file
size, err := compressedFile.Seek(0, io.SeekEnd)
if err != nil {
t.Fatal(err)
}
_, err = compressedFile.Seek(0, io.SeekStart)
if err != nil {
t.Fatal(err)
}
t.Logf("Compressed size: %d\n", size)
// Decompress file into a hasher
var FileHandle io.ReadSeeker
var decompressedSize int64
FileHandle, decompressedSize, err = comp.DecompressFileExtData(compressedFile, size, blockData)
if err != nil {
t.Fatal(err)
}
t.Logf("Decompressed size: %d\n", decompressedSize)
bufr := bufio.NewReaderSize(FileHandle, 12345678)
_, err = io.Copy(outHasher, bufr)
if err != nil && err != io.EOF {
t.Fatal(err)
}
decompressedFileHash := outHasher.Sum(nil)
// Clean up
err = compressedFile.Close()
if err != nil {
t.Log("Warning: cannot close compressed test file")
}
err = os.Remove(compressedFile.Name())
if err != nil {
t.Log("Warning: cannot remove compressed test file")
}
// Compare hashes
if !bytes.Equal(testFileHash, decompressedFileHash) {
t.Logf("Hash of original file: %x\n", testFileHash)
t.Logf("Hash of recovered file: %x\n", decompressedFileHash)
t.Fatal("Hashes do not match!")
}
}
// Tests both small and large strings for a preset
func testSmallLarge(t *testing.T, preset string) {
testStringLarge := getCompressibleString(TestSizeLarge)
t.Run("TestSmall", func(t *testing.T) {
testCompressDecompress(t, preset, TestStringSmall)
})
t.Run("TestLarge", func(t *testing.T) {
testCompressDecompress(t, preset, testStringLarge)
})
}
// Gets a compressible string
func getCompressibleString(size int) string {
// Get pseudorandom bytes
prbytes := make([]byte, size*3/4+16)
prsource := rand.New(rand.NewSource(0))
prsource.Read(prbytes)
// Encode in base64
encoding := base64.NewEncoding("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/")
return encoding.EncodeToString(prbytes)[:size]
}
func TestCompression(t *testing.T) {
testCases := []string{"lz4", "gzip", "xz"}
for _, tc := range testCases {
t.Run(tc, func(t *testing.T) {
testSmallLarge(t, tc)
})
}
}

View File

@ -13,9 +13,11 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"regexp"
"strings" "strings"
"time" "time"
"github.com/buengese/sgzip"
"github.com/gabriel-vasile/mimetype" "github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -27,22 +29,35 @@ import (
"github.com/rclone/rclone/fs/fspath" "github.com/rclone/rclone/fs/fspath"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/operations"
// Used for Rcat
) )
// Globals // Globals
const (
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
maxChunkSize = 8388608 // at 256KB and 8 MB.
bufferSize = 8388608
heuristicBytes = 1048576
metaFileExt = ".meta"
uncompressedFileExt = ".bin"
)
// Compression modes
const (
Uncompressed = 0
Gzip = 2
)
var unconpressibleRegexp = regexp.MustCompile("(^(video|image|audio)/.*)|(^.*?/(x-7z-compressed|zip|gzip|x-rar-compressed|zstd|x-xz|lzip|warc))")
// Register with Fs // Register with Fs
func init() { func init() {
// Build compression mode options. Show XZ options only if they're supported on the current system. // Build compression mode options.
compressionModeOptions := []fs.OptionExample{{ // Default compression mode options compressionModeOptions := []fs.OptionExample{
Value: "lz4", { // Default compression mode options {
Help: "Fast, real-time compression with reasonable compression ratios.",
}, {
Value: "gzip", Value: "gzip",
Help: "Standard gzip compression with fastest parameters.", Help: "Standard gzip compression with fastest parameters.",
}, {
Value: "xz",
Help: "Standard xz compression with fastest parameters.",
}, },
} }
@ -60,24 +75,33 @@ func init() {
Help: "Compression mode.", Help: "Compression mode.",
Default: "gzip", Default: "gzip",
Examples: compressionModeOptions, Examples: compressionModeOptions,
}, {
Name: "compression_level",
Help: "gzip compression level -2 to 9",
Default: sgzip.DefaultCompression,
Advanced: true,
}}, }},
}) })
} }
// Constants // Options defines the configuration for this backend
const bufferSize = 8388608 // Size of buffer when compressing or decompressing the entire file. type Options struct {
// Larger size means more multithreading with larger block sizes and thread counts. Remote string `config:"remote"`
// Currently at 8MB. CompressionMode string `config:"compression_mode"`
const initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently CompressionLevel int `config:"compression_level"`
const maxChunkSize = 8388608 // at 256KB and 8 MB. }
const metaFileExt = ".meta" /*** FILESYSTEM FUNCTIONS ***/
const uncompressedFileExt = ".bin"
// newCompressionForConfig constructs a Compression object for the given config name // Fs represents a wrapped fs.Fs
func newCompressionForConfig(opt *Options) (*Compression, error) { type Fs struct {
c, err := NewCompressionPreset(opt.CompressionMode) fs.Fs
return c, err wrapper fs.Fs
name string
root string
opt Options
mode int // compression mode id
features *fs.Features // optional features
} }
// NewFs contstructs an Fs from the path, container:path // NewFs contstructs an Fs from the path, container:path
@ -99,16 +123,12 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote)
} }
c, err := newCompressionForConfig(opt)
if err != nil {
return nil, err
}
// Strip trailing slashes if they exist in rpath // Strip trailing slashes if they exist in rpath
rpath = strings.TrimRight(rpath, "\\/") rpath = strings.TrimRight(rpath, "\\/")
// First, check for a file // First, check for a file
// If a metadata file was found, return an error. Otherwise, check for a directory // If a metadata file was found, return an error. Otherwise, check for a directory
remotePath := fspath.JoinRootPath(wPath, generateMetadataName(rpath)) remotePath := fspath.JoinRootPath(wPath, makeMetadataName(rpath))
wrappedFs, err := wInfo.NewFs(wName, remotePath, wConfig) wrappedFs, err := wInfo.NewFs(wName, remotePath, wConfig)
if err != fs.ErrorIsFile { if err != fs.ErrorIsFile {
remotePath = fspath.JoinRootPath(wPath, rpath) remotePath = fspath.JoinRootPath(wPath, rpath)
@ -124,7 +144,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
name: name, name: name,
root: rpath, root: rpath,
opt: *opt, opt: *opt,
c: c, mode: compressionModeFromName(opt.CompressionMode),
} }
// the features here are ones we could support, and they are // the features here are ones we could support, and they are
// ANDed with the ones from wrappedFs // ANDed with the ones from wrappedFs
@ -140,6 +160,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
}).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs)
// We support reading MIME types no matter the wrapped fs // We support reading MIME types no matter the wrapped fs
f.features.ReadMimeType = true f.features.ReadMimeType = true
// We can only support putstream if we have serverside copy or move
if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil { if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil {
f.features.PutStream = nil f.features.PutStream = nil
} }
@ -147,6 +168,15 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
return f, err return f, err
} }
func compressionModeFromName(name string) int {
switch name {
case "gzip":
return Gzip
default:
return Uncompressed
}
}
// Converts an int64 to hex // Converts an int64 to hex
func int64ToHex(number int64) string { func int64ToHex(number int64) string {
intBytes := make([]byte, 8) intBytes := make([]byte, 8)
@ -192,7 +222,7 @@ func processFileName(compressedFileName string) (origFileName string, extension
} }
// Generates the file name for a metadata file // Generates the file name for a metadata file
func generateMetadataName(remote string) (newRemote string) { func makeMetadataName(remote string) (newRemote string) {
return remote + metaFileExt return remote + metaFileExt
} }
@ -202,43 +232,20 @@ func isMetadataFile(filename string) bool {
} }
// Generates the file name for a data file // Generates the file name for a data file
func (c *Compression) generateDataName(remote string, size int64, compressed bool) (newRemote string) { func makeDataName(remote string, size int64, mode int) (newRemote string) {
if compressed { if mode > 0 {
newRemote = remote + int64ToHex(size) + c.GetFileExtension() newRemote = remote + int64ToHex(size) + ".gz"
} else { } else {
newRemote = remote + uncompressedFileExt newRemote = remote + uncompressedFileExt
} }
return newRemote return newRemote
} }
// Generates the file name from a compression mode func (f *Fs) dataName(remote string, size int64, compressed bool) (name string) {
func generateDataNameFromCompressionMode(remote string, size int64, mode int) (newRemote string) { if !compressed {
if mode != Uncompressed { return makeDataName(remote, size, Uncompressed)
c, _ := NewCompressionPresetNumber(mode)
newRemote = c.generateDataName(remote, size, true)
} else {
newRemote = remote + uncompressedFileExt
} }
return newRemote return makeDataName(remote, size, f.mode)
}
// Options defines the configuration for this backend
type Options struct {
Remote string `config:"remote"`
CompressionMode string `config:"compression_mode"`
}
/*** FILESYSTEM FUNCTIONS ***/
// Fs represents a wrapped fs.Fs
type Fs struct {
fs.Fs
wrapper fs.Fs
name string
root string
opt Options
features *fs.Features // optional features
c *Compression
} }
// Get an Object from a data DirEntry // Get an Object from a data DirEntry
@ -251,7 +258,7 @@ func (f *Fs) addData(entries *fs.DirEntries, o fs.Object) {
if size == -2 { // File is uncompressed if size == -2 { // File is uncompressed
size = o.Size() size = o.Size()
} }
metaName := generateMetadataName(origFileName) metaName := makeMetadataName(origFileName)
*entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size)) *entries = append(*entries, f.newObjectSizeAndNameOnly(o, metaName, size))
} }
@ -271,9 +278,6 @@ func (f *Fs) processEntries(entries fs.DirEntries) (newEntries fs.DirEntries, er
for _, entry := range entries { for _, entry := range entries {
switch x := entry.(type) { switch x := entry.(type) {
case fs.Object: case fs.Object:
// if isMetadataFile(x.Remote()) {
// f.addMeta(&newEntries, x) // Only care about metadata files; non-metadata files are redundant.
// }
if !isMetadataFile(x.Remote()) { if !isMetadataFile(x.Remote()) {
f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant. f.addData(&newEntries, x) // Only care about data files for now; metadata files are redundant.
} }
@ -333,7 +337,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
// NewObject finds the Object at remote. // NewObject finds the Object at remote.
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// Read metadata from metadata object // Read metadata from metadata object
mo, err := f.Fs.NewObject(ctx, generateMetadataName(remote)) mo, err := f.Fs.NewObject(ctx, makeMetadataName(remote))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -342,34 +346,32 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return nil, errors.New("error decoding metadata") return nil, errors.New("error decoding metadata")
} }
// Create our Object // Create our Object
o, err := f.Fs.NewObject(ctx, generateDataNameFromCompressionMode(remote, meta.Size, meta.CompressionMode)) o, err := f.Fs.NewObject(ctx, makeDataName(remote, meta.Size, meta.Mode))
return f.newObject(o, mo, meta), err return f.newObject(o, mo, meta), err
} }
// Checks the compressibility and mime type of a file. Returns a rewinded reader, whether the file is compressible, and an error code. // findMimeType attempts to find the mime type of the object so we can determine compressibility
func (c *Compression) checkFileCompressibilityAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) { // returns a multireader with the bytes that were read to determine mime type
// Unwrap accounting, get compressibility of file, rewind reader, then wrap accounting back on func findMimeType(in io.Reader) (newReader io.Reader, mimeType string, err error) {
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
var b bytes.Buffer var b bytes.Buffer
_, err = io.CopyN(&b, in, c.HeuristicBytes) _, err = io.CopyN(&b, in, heuristicBytes)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return nil, false, "", err return nil, "", err
}
compressible, _, err = c.GetFileCompressionInfo(bytes.NewReader(b.Bytes()))
if err != nil {
return nil, false, "", err
} }
mime := mimetype.Detect(b.Bytes()) mime := mimetype.Detect(b.Bytes())
in = io.MultiReader(bytes.NewReader(b.Bytes()), in) in = io.MultiReader(bytes.NewReader(b.Bytes()), in)
in = wrap(in) return wrap(in), mime.String(), nil
return in, compressible, mime.String(), nil }
func isCompressible(mime string) bool {
return !unconpressibleRegexp.MatchString(mime)
} }
// Verifies an object hash // Verifies an object hash
func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) (err error) { func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.MultiHasher, ht hash.Type) error {
srcHash := hasher.Sums()[ht] srcHash := hasher.Sums()[ht]
var dstHash string dstHash, err := o.Hash(ctx, ht)
dstHash, err = o.Hash(ctx, ht)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to read destination hash") return errors.Wrap(err, "failed to read destination hash")
} }
@ -386,9 +388,9 @@ func (f *Fs) verifyObjectHash(ctx context.Context, o fs.Object, hasher *hash.Mul
type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
type blockDataAndError struct { type compressionResult struct {
err error err error
blockData []uint32 meta sgzip.GzipMetadata
} }
// Put a compressed version of a file. Returns a wrappable object and metadata. // Put a compressed version of a file. Returns a wrappable object and metadata.
@ -401,21 +403,29 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
in = io.TeeReader(in, metaHasher) in = io.TeeReader(in, metaHasher)
// Compress the file // Compress the file
var wrappedIn io.Reader
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
compressionResult := make(chan blockDataAndError) results := make(chan compressionResult)
go func() { go func() {
blockData, err := f.c.CompressFileReturningBlockData(in, pipeWriter) gz, err := sgzip.NewWriterLevel(pipeWriter, f.opt.CompressionLevel)
if err != nil {
results <- compressionResult{err: err, meta: sgzip.GzipMetadata{}}
return
}
_, err = io.Copy(gz, in)
gzErr := gz.Close()
if gzErr != nil {
fs.Errorf(nil, "Failed to close compress: %v", gzErr)
if err == nil {
err = gzErr
}
}
closeErr := pipeWriter.Close() closeErr := pipeWriter.Close()
if closeErr != nil { if closeErr != nil {
fs.Errorf(nil, "Failed to close compression pipe: %v", err) fs.Errorf(nil, "Failed to close pipe: %v", closeErr)
if err == nil {
err = closeErr
} }
} results <- compressionResult{err: err, meta: gz.MetaData()}
compressionResult <- blockDataAndError{err: err, blockData: blockData}
}() }()
wrappedIn = wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Bufio required for multithreading wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering
// If verifyCompressedObject is on, find a hash the destination supports to compute a hash of // If verifyCompressedObject is on, find a hash the destination supports to compute a hash of
// the compressed data. // the compressed data.
@ -435,8 +445,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
} }
// Transfer the data // Transfer the data
//o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), true), src.Size()), options...) o, err := operations.Rcat(ctx, f.Fs, makeDataName(src.Remote(), src.Size(), f.mode), ioutil.NopCloser(wrappedIn), src.ModTime(ctx))
o, err := operations.Rcat(ctx, f.Fs, f.c.generateDataName(src.Remote(), src.Size(), true), ioutil.NopCloser(wrappedIn), src.ModTime(ctx))
if err != nil { if err != nil {
if o != nil { if o != nil {
removeErr := o.Remove(ctx) removeErr := o.Remove(ctx)
@ -447,7 +456,7 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
return nil, nil, err return nil, nil, err
} }
// Check whether we got an error during compression // Check whether we got an error during compression
result := <-compressionResult result := <-results
err = result.err err = result.err
if err != nil { if err != nil {
if o != nil { if o != nil {
@ -460,13 +469,11 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
} }
// Generate metadata // Generate metadata
blockData := result.blockData meta := newMetadata(result.meta.Size, f.mode, result.meta, metaHasher.Sum(nil), mimeType)
_, _, decompressedSize := parseBlockData(blockData, f.c.BlockSize)
meta := newMetadata(decompressedSize, f.c.CompressionMode, blockData, metaHasher.Sum(nil), mimeType)
// Check the hashes of the compressed data if we were comparing them // Check the hashes of the compressed data if we were comparing them
if ht != hash.None && hasher != nil { if ht != hash.None && hasher != nil {
err := f.verifyObjectHash(ctx, o, hasher, ht) err = f.verifyObjectHash(ctx, o, hasher, ht)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -479,28 +486,23 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) {
// Unwrap the accounting, add our metadata hasher, then wrap it back on // Unwrap the accounting, add our metadata hasher, then wrap it back on
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
metaHasher := md5.New()
in = io.TeeReader(in, metaHasher) hs := hash.NewHashSet(hash.MD5)
wrappedIn := wrap(in)
// If verifyCompressedObject is on, find a hash the destination supports to compute a hash of
// the compressed data.
ht := f.Fs.Hashes().GetOne() ht := f.Fs.Hashes().GetOne()
var hasher *hash.MultiHasher if verifyCompressedObject {
var err error if !hs.Contains(ht) {
if ht != hash.None && verifyCompressedObject { hs.Add(ht)
// unwrap the accounting again }
wrappedIn, wrap = accounting.UnWrap(wrappedIn) }
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht)) metaHasher, err := hash.NewMultiHasherTypes(hs)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// add the hasher and re-wrap the accounting in = io.TeeReader(in, metaHasher)
wrappedIn = io.TeeReader(wrappedIn, hasher) wrappedIn := wrap(in)
wrappedIn = wrap(wrappedIn)
}
// Put the object // Put the object
o, err := put(ctx, wrappedIn, f.wrapInfo(src, f.c.generateDataName(src.Remote(), src.Size(), false), src.Size()), options...) o, err := put(ctx, wrappedIn, f.wrapInfo(src, makeDataName(src.Remote(), src.Size(), Uncompressed), src.Size()), options...)
//o, err := operations.Rcat(f, f.c.generateDataName(src.Remote(), src.Size(), false), wrappedIn, src.ModTime())
if err != nil { if err != nil {
if o != nil { if o != nil {
removeErr := o.Remove(ctx) removeErr := o.Remove(ctx)
@ -511,14 +513,19 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo,
return nil, nil, err return nil, nil, err
} }
// Check the hashes of the compressed data if we were comparing them // Check the hashes of the compressed data if we were comparing them
if ht != hash.None && hasher != nil { if ht != hash.None && verifyCompressedObject {
err := f.verifyObjectHash(ctx, o, hasher, ht) err := f.verifyObjectHash(ctx, o, metaHasher, ht)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
} }
// Return our object and metadata // Return our object and metadata
return o, newMetadata(o.Size(), Uncompressed, []uint32{}, metaHasher.Sum([]byte{}), mimeType), nil sum, err := metaHasher.Sum(hash.MD5)
if err != nil {
return nil, nil, err
}
return o, newMetadata(o.Size(), Uncompressed, sgzip.GzipMetadata{}, sum, mimeType), nil
} }
// This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object. // This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object.
@ -538,7 +545,7 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec
metaReader := bytes.NewReader(b.Bytes()) metaReader := bytes.NewReader(b.Bytes())
// Put the data // Put the data
mo, err = put(ctx, metaReader, f.wrapInfo(src, generateMetadataName(src.Remote()), int64(b.Len())), options...) mo, err = put(ctx, metaReader, f.wrapInfo(src, makeMetadataName(src.Remote()), int64(b.Len())), options...)
if err != nil { if err != nil {
removeErr := mo.Remove(ctx) removeErr := mo.Remove(ctx)
if removeErr != nil { if removeErr != nil {
@ -592,11 +599,11 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
o, err := f.NewObject(ctx, src.Remote()) o, err := f.NewObject(ctx, src.Remote())
if err == fs.ErrorObjectNotFound { if err == fs.ErrorObjectNotFound {
// Get our file compressibility // Get our file compressibility
in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) in, mimeType, err := findMimeType(in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType, true) return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, isCompressible(mimeType), mimeType, true)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -612,10 +619,11 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
} }
found := err == nil found := err == nil
in, compressible, mimeType, err := f.c.checkFileCompressibilityAndType(in) in, mimeType, err := findMimeType(in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
compressible := isCompressible(mimeType)
newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true) newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true)
if err != nil { if err != nil {
return nil, err return nil, err
@ -633,16 +641,18 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
moveFs, ok := f.Fs.(fs.Mover) moveFs, ok := f.Fs.(fs.Mover)
var wrapObj fs.Object var wrapObj fs.Object
if ok { if ok {
wrapObj, err = moveFs.Move(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) wrapObj, err = moveFs.Move(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Couldn't rename streamed object.") return nil, errors.Wrap(err, "Couldn't rename streamed object.")
} }
newObj.Object = wrapObj
return newObj, nil
} }
// If we don't have move we'll need to resort to serverside copy and remove // If we don't have move we'll need to resort to serverside copy and remove
copyFs, ok := f.Fs.(fs.Copier) copyFs, ok := f.Fs.(fs.Copier)
if ok { if ok {
wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.c.generateDataName(src.Remote(), newObj.size, compressible)) wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Could't copy streamed object.") return nil, errors.Wrap(err, "Could't copy streamed object.")
} }
@ -652,9 +662,7 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.") return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.")
} }
} }
newObj.Object = wrapObj newObj.Object = wrapObj
return newObj, nil return newObj, nil
} }
@ -665,15 +673,6 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
// //
// This will create a duplicate if we upload a new file without // This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that. // checking to see if there is one already - use Put() for that.
/*func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// If PutUnchecked is supported, do it.
// I'm unsure about this. With the current metadata model this might actually break things. Needs some manual testing.
do := f.Fs.Features().PutUnchecked
if do == nil {
return nil, errors.New("can't PutUnchecked")
}
return f.putWithCustomFunctions(ctx, in, src, options, do, do, false)
}*/
// Hashes returns the supported hash sets. // Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set { func (f *Fs) Hashes() hash.Set {
@ -744,13 +743,13 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
if err != nil { if err != nil {
return nil, err return nil, err
} }
newFilename := generateMetadataName(remote) newFilename := makeMetadataName(remote)
moResult, err := do(ctx, o.mo, newFilename) moResult, err := do(ctx, o.mo, newFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Copy over data // Copy over data
newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
oResult, err := do(ctx, o.Object, newFilename) oResult, err := do(ctx, o.Object, newFilename)
if err != nil { if err != nil {
return nil, err return nil, err
@ -794,14 +793,14 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
if err != nil { if err != nil {
return nil, err return nil, err
} }
newFilename := generateMetadataName(remote) newFilename := makeMetadataName(remote)
moResult, err := do(ctx, o.mo, newFilename) moResult, err := do(ctx, o.mo, newFilename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Move data // Move data
newFilename = generateDataNameFromCompressionMode(remote, src.Size(), o.meta.CompressionMode) newFilename = makeDataName(remote, src.Size(), o.meta.Mode)
oResult, err := do(ctx, o.Object, newFilename) oResult, err := do(ctx, o.Object, newFilename)
if err != nil { if err != nil {
return nil, err return nil, err
@ -908,7 +907,7 @@ func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryT
case fs.EntryObject: case fs.EntryObject:
// Note: All we really need to do to monitor the object is to check whether the metadata changed, // Note: All we really need to do to monitor the object is to check whether the metadata changed,
// as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same. // as the metadata contains the hash. This will work unless there's a hash collision and the sizes stay the same.
wrappedPath = generateMetadataName(path) wrappedPath = makeMetadataName(path)
default: default:
fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType) fs.Errorf(path, "press ChangeNotify: ignoring unknown EntryType %d", entryType)
return return
@ -937,10 +936,10 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration
// ObjectMetadata describes the metadata for an Object. // ObjectMetadata describes the metadata for an Object.
type ObjectMetadata struct { type ObjectMetadata struct {
Size int64 // Uncompressed size of the file. Size int64 // Uncompressed size of the file.
CompressionMode int // Compression mode of the file. Mode int // Compression mode of the file.
BlockData []uint32 // Block indexing data for the file.
Hash []byte // MD5 hash of the file. Hash []byte // MD5 hash of the file.
MimeType string // Mime type of the file MimeType string // Mime type of the file
CompressionMetadata sgzip.GzipMetadata
} }
// Object with external metadata // Object with external metadata
@ -954,11 +953,11 @@ type Object struct {
} }
// This function generates a metadata object // This function generates a metadata object
func newMetadata(size int64, compressionMode int, blockData []uint32, hash []byte, mimeType string) *ObjectMetadata { func newMetadata(size int64, mode int, cmeta sgzip.GzipMetadata, hash []byte, mimeType string) *ObjectMetadata {
meta := new(ObjectMetadata) meta := new(ObjectMetadata)
meta.Size = size meta.Size = size
meta.CompressionMode = compressionMode meta.Mode = mode
meta.BlockData = blockData meta.CompressionMetadata = cmeta
meta.Hash = hash meta.Hash = hash
meta.MimeType = mimeType meta.MimeType = mimeType
return meta return meta
@ -1042,35 +1041,20 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.mo, o.mo.Update(ctx, in, src, options...) return o.mo, o.mo.Update(ctx, in, src, options...)
} }
// Get our file compressibility in, mimeType, err := findMimeType(in)
in, compressible, mimeType, err := o.f.c.checkFileCompressibilityAndType(in)
if err != nil { if err != nil {
return err return err
} }
compressible := isCompressible(mimeType)
// Since we're encoding the original filesize in the name we'll need to make sure that this name is updated before the actual update // Since we are storing the filesize in the name the new object may have different name than the old
// We'll make sure to delete the old object in this case
var newObject *Object var newObject *Object
origName := o.Remote() origName := o.Remote()
if o.meta.CompressionMode != Uncompressed || compressible { if o.meta.Mode != Uncompressed || compressible {
// If we aren't, we must either move-then-update or reupload-then-remove the object, and update the metadata.
// Check if this FS supports moving
moveFs, ok := o.f.Fs.(fs.Mover)
if ok { // If this fs supports moving, use move-then-update. This may help keep some versioning alive.
// First, move the object
var movedObject fs.Object
movedObject, err = moveFs.Move(ctx, o.Object, o.f.c.generateDataName(o.Remote(), src.Size(), compressible))
if err != nil {
return err
}
// Create function that updates moved object, then update
update := func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return movedObject, movedObject.Update(ctx, in, src, options...)
}
newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true)
} else { // If this fs does not support moving, fall back to reuploading the object then removing the old one.
newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true)
removeErr := o.Object.Remove(ctx) // Note: We must do remove later so a failed update doesn't destroy data. if newObject.Object.Remote() != o.Object.Remote() {
if removeErr != nil { if removeErr := o.Object.Remove(ctx); removeErr != nil {
return removeErr return removeErr
} }
} }
@ -1220,7 +1204,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read
return nil, err return nil, err
} }
// If we're uncompressed, just pass this to the underlying object // If we're uncompressed, just pass this to the underlying object
if o.meta.CompressionMode == Uncompressed { if o.meta.Mode == Uncompressed {
return o.Object.Open(ctx, options...) return o.Object.Open(ctx, options...)
} }
// Get offset and limit from OpenOptions, pass the rest to the underlying remote // Get offset and limit from OpenOptions, pass the rest to the underlying remote
@ -1239,27 +1223,21 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read
// Get a chunkedreader for the wrapped object // Get a chunkedreader for the wrapped object
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize) chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize)
// Get file handle // Get file handle
c, err := NewCompressionPresetNumber(o.meta.CompressionMode) var file io.Reader
if offset != 0 {
file, err = sgzip.NewReaderAt(chunkedReader, &o.meta.CompressionMetadata, offset)
} else {
file, err = sgzip.NewReader(chunkedReader)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
FileHandle, _, err := c.DecompressFileExtData(chunkedReader, o.Object.Size(), o.meta.BlockData)
if err != nil {
return nil, err
}
// Seek and limit according to the options given
// Note: This if statement is not required anymore because all 0-size files will be uncompressed. I'm leaving this here just in case I come back here debugging.
if offset != 0 { // Note: this if statement is only required because seeking to 0 on a 0-size file makes chunkedReader complain about an "invalid seek position".
_, err = FileHandle.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
}
}
var fileReader io.Reader var fileReader io.Reader
if limit != -1 { if limit != -1 {
fileReader = io.LimitReader(FileHandle, limit) fileReader = io.LimitReader(file, limit)
} else { } else {
fileReader = FileHandle fileReader = file
} }
// Return a ReadCloser // Return a ReadCloser
return combineReaderAndCloser(fileReader, chunkedReader), nil return combineReaderAndCloser(fileReader, chunkedReader), nil

View File

@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
_ "github.com/rclone/rclone/backend/dropbox"
_ "github.com/rclone/rclone/backend/local" _ "github.com/rclone/rclone/backend/local"
"github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/fstest/fstests"
@ -35,37 +36,6 @@ func TestIntegration(t *testing.T) {
}) })
} }
// TestRemoteLz4 tests LZ4 compression
func TestRemoteLz4(t *testing.T) {
if *fstest.RemoteName != "" {
t.Skip("Skipping as -remote set")
}
tempdir := filepath.Join(os.TempDir(), "rclone-press-test-lz4")
name := "TestPressLz4"
fstests.Run(t, &fstests.Opt{
RemoteName: name + ":",
NilObject: (*Object)(nil),
UnimplementableFsMethods: []string{
"OpenWriterAt",
"MergeDirs",
"DirCacheFlush",
"PutUnchecked",
"PutStream",
"UserInfo",
"Disconnect",
},
UnimplementableObjectMethods: []string{
"GetTier",
"SetTier",
},
ExtraConfig: []fstests.ExtraConfigItem{
{Name: name, Key: "type", Value: "press"},
{Name: name, Key: "remote", Value: tempdir},
{Name: name, Key: "compression_mode", Value: "lz4"},
},
})
}
// TestRemoteGzip tests GZIP compression // TestRemoteGzip tests GZIP compression
func TestRemoteGzip(t *testing.T) { func TestRemoteGzip(t *testing.T) {
if *fstest.RemoteName != "" { if *fstest.RemoteName != "" {
@ -96,34 +66,3 @@ func TestRemoteGzip(t *testing.T) {
}, },
}) })
} }
// TestRemoteXz tests XZ compression
func TestRemoteXz(t *testing.T) {
if *fstest.RemoteName != "" {
t.Skip("Skipping as -remote set")
}
tempdir := filepath.Join(os.TempDir(), "rclone-press-test-xz")
name := "TestPressXz"
fstests.Run(t, &fstests.Opt{
RemoteName: name + ":",
NilObject: (*Object)(nil),
UnimplementableFsMethods: []string{
"OpenWriterAt",
"MergeDirs",
"DirCacheFlush",
"PutUnchecked",
"PutStream",
"UserInfo",
"Disconnect",
},
UnimplementableObjectMethods: []string{
"GetTier",
"SetTier",
},
ExtraConfig: []fstests.ExtraConfigItem{
{Name: name, Key: "type", Value: "press"},
{Name: name, Key: "remote", Value: tempdir},
{Name: name, Key: "compression_mode", Value: "xz"},
},
})
}