press: various fixups

This commit is contained in:
buengese 2020-09-28 19:57:04 +02:00
parent 10a3129049
commit 954a23ed5d
2 changed files with 84 additions and 99 deletions

View File

@ -13,7 +13,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"regexp"
"strings" "strings"
"time" "time"
@ -36,8 +35,9 @@ const (
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
maxChunkSize = 8388608 // at 256KB and 8 MB. maxChunkSize = 8388608 // at 256KB and 8 MB.
bufferSize = 8388608 bufferSize = 8388608
heuristicBytes = 1048576 heuristicBytes = 1048576
minCompressionRatio = 1.1
metaFileExt = ".meta" metaFileExt = ".meta"
uncompressedFileExt = ".bin" uncompressedFileExt = ".bin"
@ -49,8 +49,6 @@ const (
Gzip = 2 Gzip = 2
) )
var unconpressibleRegexp = regexp.MustCompile("(^(video|image|audio)/.*)|(^.*?/(x-7z-compressed|zip|gzip|x-rar-compressed|zstd|x-xz|lzip|warc))")
// Register with Fs // Register with Fs
func init() { func init() {
// Build compression mode options. // Build compression mode options.
@ -71,13 +69,21 @@ func init() {
Help: "Remote to compress.", Help: "Remote to compress.",
Required: true, Required: true,
}, { }, {
Name: "compression_mode", Name: "mode",
Help: "Compression mode.", Help: "Compression mode.",
Default: "gzip", Default: "gzip",
Examples: compressionModeOptions, Examples: compressionModeOptions,
}, { }, {
Name: "compression_level", Name: "level",
Help: "gzip compression level -2 to 9", Help: `GZIP compression level (-2 to 9).
Generally -1 (default, equivalent to 5) is recommended.
Levels 1 to 9 increase compressiong at the cost of speed.. Going past 6
generally offers very little return.
Level -2 uses Huffmann encoding only. Only use if you now what you
are doing
Level 0 turns off compression.`,
Default: sgzip.DefaultCompression, Default: sgzip.DefaultCompression,
Advanced: true, Advanced: true,
}}, }},
@ -87,8 +93,8 @@ func init() {
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
Remote string `config:"remote"` Remote string `config:"remote"`
CompressionMode string `config:"compression_mode"` CompressionMode string `config:"mode"`
CompressionLevel int `config:"compression_level"` CompressionLevel int `config:"level"`
} }
/*** FILESYSTEM FUNCTIONS ***/ /*** FILESYSTEM FUNCTIONS ***/
@ -150,7 +156,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
// ANDed with the ones from wrappedFs // ANDed with the ones from wrappedFs
f.features = (&fs.Features{ f.features = (&fs.Features{
CaseInsensitive: true, CaseInsensitive: true,
DuplicateFiles: true, DuplicateFiles: false,
ReadMimeType: false, ReadMimeType: false,
WriteMimeType: false, WriteMimeType: false,
GetTier: true, GetTier: true,
@ -161,8 +167,8 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
// We support reading MIME types no matter the wrapped fs // We support reading MIME types no matter the wrapped fs
f.features.ReadMimeType = true f.features.ReadMimeType = true
// We can only support putstream if we have serverside copy or move // We can only support putstream if we have serverside copy or move
if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil { if !operations.CanServerSideMove(wrappedFs) {
f.features.PutStream = nil f.features.Disable("PutStream")
} }
return f, err return f, err
@ -350,22 +356,39 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
return f.newObject(o, mo, meta), err return f.newObject(o, mo, meta), err
} }
// findMimeType attempts to find the mime type of the object so we can determine compressibility // checkCompressAndType attempts to find the mime type of the object so we can determine compressibility
// returns a multireader with the bytes that were read to determine mime type // returns a multireader with the bytes that were read to determine mime type
func findMimeType(in io.Reader) (newReader io.Reader, mimeType string, err error) { func checkCompressAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) {
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
var b bytes.Buffer buf := make([]byte, heuristicBytes)
_, err = io.CopyN(&b, in, heuristicBytes) n, err := in.Read(buf)
buf = buf[:n]
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return nil, "", err return nil, false, "", err
} }
mime := mimetype.Detect(b.Bytes()) mime := mimetype.Detect(buf)
in = io.MultiReader(bytes.NewReader(b.Bytes()), in) compressible, err = isCompressible(bytes.NewReader(buf))
return wrap(in), mime.String(), nil if err != nil {
return nil, false, "", err
}
in = io.MultiReader(bytes.NewReader(buf), in)
return wrap(in), compressible, mime.String(), nil
} }
func isCompressible(mime string) bool { // isCompressible checks the compression ratio of the provided data and returns true if the ratio exceeds
return !unconpressibleRegexp.MatchString(mime) // the configured threshold
func isCompressible(r io.Reader) (bool, error) {
var b bytes.Buffer
w, err := sgzip.NewWriterLevel(&b, sgzip.DefaultCompression)
if err != nil {
return false, err
}
n, err := io.Copy(w, r)
if err != nil {
return false, err
}
ratio := float64(n) / float64(b.Len())
return ratio > minCompressionRatio, nil
} }
// Verifies an object hash // Verifies an object hash
@ -394,7 +417,7 @@ type compressionResult struct {
} }
// Put a compressed version of a file. Returns a wrappable object and metadata. // Put a compressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap reader accounting // Unwrap reader accounting
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
@ -422,17 +445,20 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
closeErr := pipeWriter.Close() closeErr := pipeWriter.Close()
if closeErr != nil { if closeErr != nil {
fs.Errorf(nil, "Failed to close pipe: %v", closeErr) fs.Errorf(nil, "Failed to close pipe: %v", closeErr)
if err == nil {
err = closeErr
}
} }
results <- compressionResult{err: err, meta: gz.MetaData()} results <- compressionResult{err: err, meta: gz.MetaData()}
}() }()
wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering
// If verifyCompressedObject is on, find a hash the destination supports to compute a hash of // Find a hash the destination supports to compute a hash of
// the compressed data. // the compressed data.
ht := f.Fs.Hashes().GetOne() ht := f.Fs.Hashes().GetOne()
var hasher *hash.MultiHasher var hasher *hash.MultiHasher
var err error var err error
if ht != hash.None && verifyCompressedObject { if ht != hash.None {
// unwrap the accounting again // unwrap the accounting again
wrappedIn, wrap = accounting.UnWrap(wrappedIn) wrappedIn, wrap = accounting.UnWrap(wrappedIn)
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht)) hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht))
@ -483,16 +509,14 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o
} }
// Put an uncompressed version of a file. Returns a wrappable object and metadata. // Put an uncompressed version of a file. Returns a wrappable object and metadata.
func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) {
// Unwrap the accounting, add our metadata hasher, then wrap it back on // Unwrap the accounting, add our metadata hasher, then wrap it back on
in, wrap := accounting.UnWrap(in) in, wrap := accounting.UnWrap(in)
hs := hash.NewHashSet(hash.MD5) hs := hash.NewHashSet(hash.MD5)
ht := f.Fs.Hashes().GetOne() ht := f.Fs.Hashes().GetOne()
if verifyCompressedObject { if !hs.Contains(ht) {
if !hs.Contains(ht) { hs.Add(ht)
hs.Add(ht)
}
} }
metaHasher, err := hash.NewMultiHasherTypes(hs) metaHasher, err := hash.NewMultiHasherTypes(hs)
if err != nil { if err != nil {
@ -513,7 +537,7 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo,
return nil, nil, err return nil, nil, err
} }
// Check the hashes of the compressed data if we were comparing them // Check the hashes of the compressed data if we were comparing them
if ht != hash.None && verifyCompressedObject { if ht != hash.None {
err := f.verifyObjectHash(ctx, o, metaHasher, ht) err := f.verifyObjectHash(ctx, o, metaHasher, ht)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -529,7 +553,7 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo,
} }
// This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object. // This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object.
func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn, verifyCompressedObject bool) (mo fs.Object, err error) { func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (mo fs.Object, err error) {
// Generate the metadata contents // Generate the metadata contents
var b bytes.Buffer var b bytes.Buffer
gzipWriter := gzip.NewWriter(&b) gzipWriter := gzip.NewWriter(&b)
@ -560,21 +584,21 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec
// This function will put both the data and metadata for an Object. // This function will put both the data and metadata for an Object.
// putData is the function used for data, while putMeta is the function used for metadata. // putData is the function used for data, while putMeta is the function used for metadata.
func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption,
putData putFn, putMeta putFn, compressible bool, mimeType string, verifyCompressedObject bool) (*Object, error) { putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) {
// Put file then metadata // Put file then metadata
var dataObject fs.Object var dataObject fs.Object
var meta *ObjectMetadata var meta *ObjectMetadata
var err error var err error
if compressible { if compressible {
dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType)
} else { } else {
dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType)
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
mo, err := f.putMetadata(ctx, meta, src, options, putMeta, verifyCompressedObject) mo, err := f.putMetadata(ctx, meta, src, options, putMeta)
// meta data upload may fail. in this case we try to remove the original object // meta data upload may fail. in this case we try to remove the original object
if err != nil { if err != nil {
@ -593,17 +617,17 @@ func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.Ob
// will return the object and the error, otherwise will return // will return the object and the error, otherwise will return
// nil and the error // nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
// If there's already an existent objects we need to make sure to explcitly update it to make sure we don't leave // If there's already an existent objects we need to make sure to explicitly update it to make sure we don't leave
// orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it // orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it
// destroys all server-side versioning. // destroys all server-side versioning.
o, err := f.NewObject(ctx, src.Remote()) o, err := f.NewObject(ctx, src.Remote())
if err == fs.ErrorObjectNotFound { if err == fs.ErrorObjectNotFound {
// Get our file compressibility // Get our file compressibility
in, mimeType, err := findMimeType(in) in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, isCompressible(mimeType), mimeType, true) return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -619,50 +643,33 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
} }
found := err == nil found := err == nil
in, mimeType, err := findMimeType(in) in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
compressible := isCompressible(mimeType) newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType)
newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Our transfer is now complete if we allready had an object with the same name we can safely remove it now // Our transfer is now complete. We have to make sure to remove the old object because our new object will
// this is necessary to make sure we don't leave the remote in an inconsistent state. // have a different name except when both the old and the new object where uncompressed.
if found { if found && (oldObj.(*Object).meta.Mode != Uncompressed || compressible) {
err = oldObj.(*Object).Object.Remove(ctx) err = oldObj.(*Object).Object.Remove(ctx)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Could remove original object") return nil, errors.Wrap(err, "Could remove original object")
} }
} }
moveFs, ok := f.Fs.(fs.Mover) // If our new object is compressed we have to rename it with the correct size.
var wrapObj fs.Object // Uncompressed objects don't store the size in the name so we they'll allready have the correct name.
if ok { if compressible {
wrapObj, err = moveFs.Move(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible)) wrapObj, err := operations.Move(ctx, f.Fs, nil, f.dataName(src.Remote(), newObj.size, compressible), newObj.Object)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Couldn't rename streamed object.") return nil, errors.Wrap(err, "Couldn't rename streamed Object.")
} }
newObj.Object = wrapObj newObj.Object = wrapObj
return newObj, nil
} }
// If we don't have move we'll need to resort to serverside copy and remove
copyFs, ok := f.Fs.(fs.Copier)
if ok {
wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible))
if err != nil {
return nil, errors.Wrap(err, "Could't copy streamed object.")
}
// Remove the original
err = newObj.Remove(ctx)
if err != nil {
return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.")
}
}
newObj.Object = wrapObj
return newObj, nil return newObj, nil
} }
@ -935,7 +942,6 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration
// ObjectMetadata describes the metadata for an Object. // ObjectMetadata describes the metadata for an Object.
type ObjectMetadata struct { type ObjectMetadata struct {
Size int64 // Uncompressed size of the file.
Mode int // Compression mode of the file. Mode int // Compression mode of the file.
Hash []byte // MD5 hash of the file. Hash []byte // MD5 hash of the file.
MimeType string // Mime type of the file MimeType string // Mime type of the file
@ -1009,25 +1015,8 @@ func (o *Object) Remove(ctx context.Context) error {
// ReadCloserWrapper combines a Reader and a Closer to a ReadCloser // ReadCloserWrapper combines a Reader and a Closer to a ReadCloser
type ReadCloserWrapper struct { type ReadCloserWrapper struct {
dataSource io.Reader io.Reader
closer io.Closer io.Closer
}
func combineReaderAndCloser(dataSource io.Reader, closer io.Closer) *ReadCloserWrapper {
rc := new(ReadCloserWrapper)
rc.dataSource = dataSource
rc.closer = closer
return rc
}
// Read function
func (w *ReadCloserWrapper) Read(p []byte) (n int, err error) {
return w.dataSource.Read(p)
}
// Close function
func (w *ReadCloserWrapper) Close() error {
return w.closer.Close()
} }
// Update in to the object with the modTime given of the given size // Update in to the object with the modTime given of the given size
@ -1041,18 +1030,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.mo, o.mo.Update(ctx, in, src, options...) return o.mo, o.mo.Update(ctx, in, src, options...)
} }
in, mimeType, err := findMimeType(in) in, compressible, mimeType, err := checkCompressAndType(in)
if err != nil { if err != nil {
return err return err
} }
compressible := isCompressible(mimeType)
// Since we are storing the filesize in the name the new object may have different name than the old // Since we are storing the filesize in the name the new object may have different name than the old
// We'll make sure to delete the old object in this case // We'll make sure to delete the old object in this case
var newObject *Object var newObject *Object
origName := o.Remote() origName := o.Remote()
if o.meta.Mode != Uncompressed || compressible { if o.meta.Mode != Uncompressed || compressible {
newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType)
if newObject.Object.Remote() != o.Object.Remote() { if newObject.Object.Remote() != o.Object.Remote() {
if removeErr := o.Object.Remove(ctx); removeErr != nil { if removeErr := o.Object.Remove(ctx); removeErr != nil {
return removeErr return removeErr
@ -1064,7 +1052,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.Object, o.Object.Update(ctx, in, src, options...) return o.Object, o.Object.Update(ctx, in, src, options...)
} }
// If we are, just update the object and metadata // If we are, just update the object and metadata
newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true) newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType)
} }
if err != nil { if err != nil {
return err return err
@ -1163,13 +1151,13 @@ func (o *Object) MimeType(ctx context.Context) string {
// Hash returns the selected checksum of the file // Hash returns the selected checksum of the file
// If no checksum is available it returns "" // If no checksum is available it returns ""
func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) { func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) {
if ht != hash.MD5 {
return "", hash.ErrUnsupported
}
err := o.loadMetadataIfNotLoaded(ctx) err := o.loadMetadataIfNotLoaded(ctx)
if err != nil { if err != nil {
return "", err return "", err
} }
if ht&hash.MD5 == 0 {
return "", hash.ErrUnsupported
}
return hex.EncodeToString(o.meta.Hash), nil return hex.EncodeToString(o.meta.Hash), nil
} }
@ -1240,7 +1228,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read
fileReader = file fileReader = file
} }
// Return a ReadCloser // Return a ReadCloser
return combineReaderAndCloser(fileReader, chunkedReader), nil return ReadCloserWrapper{Reader: fileReader, Closer: chunkedReader}, nil
} }
// ObjectInfo describes a wrapped fs.ObjectInfo for being the source // ObjectInfo describes a wrapped fs.ObjectInfo for being the source
@ -1305,9 +1293,6 @@ func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) {
if ht != hash.MD5 { if ht != hash.MD5 {
return "", hash.ErrUnsupported return "", hash.ErrUnsupported
} }
if o.Size() != o.src.Size() {
return "", hash.ErrUnsupported
}
value, err := o.src.Hash(ctx, ht) value, err := o.src.Hash(ctx, ht)
if err == hash.ErrUnsupported { if err == hash.ErrUnsupported {
return "", hash.ErrUnsupported return "", hash.ErrUnsupported

View File

@ -8,7 +8,7 @@ Press (Experimental)
----------------------------------------- -----------------------------------------
The `press` remote adds compression to another remote. It is best used with remotes containing The `press` remote adds compression to another remote. It is best used with remotes containing
many large compressible files or on top of other remotes like crypt. many large compressible files.
Please read the [warnings](#warnings) before using this remote. Please read the [warnings](#warnings) before using this remote.