From 954a23ed5d7a79798e784d8147f53db87d5f28c6 Mon Sep 17 00:00:00 2001 From: buengese Date: Mon, 28 Sep 2020 19:57:04 +0200 Subject: [PATCH] press: various fixups --- backend/press/press.go | 181 +++++++++++++++++++---------------------- docs/content/press.md | 2 +- 2 files changed, 84 insertions(+), 99 deletions(-) diff --git a/backend/press/press.go b/backend/press/press.go index 3c42fbaf2..68c85b520 100644 --- a/backend/press/press.go +++ b/backend/press/press.go @@ -13,7 +13,6 @@ import ( "fmt" "io" "io/ioutil" - "regexp" "strings" "time" @@ -36,8 +35,9 @@ const ( initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently maxChunkSize = 8388608 // at 256KB and 8 MB. - bufferSize = 8388608 - heuristicBytes = 1048576 + bufferSize = 8388608 + heuristicBytes = 1048576 + minCompressionRatio = 1.1 metaFileExt = ".meta" uncompressedFileExt = ".bin" @@ -49,8 +49,6 @@ const ( Gzip = 2 ) -var unconpressibleRegexp = regexp.MustCompile("(^(video|image|audio)/.*)|(^.*?/(x-7z-compressed|zip|gzip|x-rar-compressed|zstd|x-xz|lzip|warc))") - // Register with Fs func init() { // Build compression mode options. @@ -71,13 +69,21 @@ func init() { Help: "Remote to compress.", Required: true, }, { - Name: "compression_mode", + Name: "mode", Help: "Compression mode.", Default: "gzip", Examples: compressionModeOptions, }, { - Name: "compression_level", - Help: "gzip compression level -2 to 9", + Name: "level", + Help: `GZIP compression level (-2 to 9). + + Generally -1 (default, equivalent to 5) is recommended. + Levels 1 to 9 increase compressiong at the cost of speed.. Going past 6 + generally offers very little return. + + Level -2 uses Huffmann encoding only. Only use if you now what you + are doing + Level 0 turns off compression.`, Default: sgzip.DefaultCompression, Advanced: true, }}, @@ -87,8 +93,8 @@ func init() { // Options defines the configuration for this backend type Options struct { Remote string `config:"remote"` - CompressionMode string `config:"compression_mode"` - CompressionLevel int `config:"compression_level"` + CompressionMode string `config:"mode"` + CompressionLevel int `config:"level"` } /*** FILESYSTEM FUNCTIONS ***/ @@ -150,7 +156,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { // ANDed with the ones from wrappedFs f.features = (&fs.Features{ CaseInsensitive: true, - DuplicateFiles: true, + DuplicateFiles: false, ReadMimeType: false, WriteMimeType: false, GetTier: true, @@ -161,8 +167,8 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { // We support reading MIME types no matter the wrapped fs f.features.ReadMimeType = true // We can only support putstream if we have serverside copy or move - if wrappedFs.Features().Move == nil && wrappedFs.Features().Copy == nil { - f.features.PutStream = nil + if !operations.CanServerSideMove(wrappedFs) { + f.features.Disable("PutStream") } return f, err @@ -350,22 +356,39 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { return f.newObject(o, mo, meta), err } -// findMimeType attempts to find the mime type of the object so we can determine compressibility +// checkCompressAndType attempts to find the mime type of the object so we can determine compressibility // returns a multireader with the bytes that were read to determine mime type -func findMimeType(in io.Reader) (newReader io.Reader, mimeType string, err error) { +func checkCompressAndType(in io.Reader) (newReader io.Reader, compressible bool, mimeType string, err error) { in, wrap := accounting.UnWrap(in) - var b bytes.Buffer - _, err = io.CopyN(&b, in, heuristicBytes) + buf := make([]byte, heuristicBytes) + n, err := in.Read(buf) + buf = buf[:n] if err != nil && err != io.EOF { - return nil, "", err + return nil, false, "", err } - mime := mimetype.Detect(b.Bytes()) - in = io.MultiReader(bytes.NewReader(b.Bytes()), in) - return wrap(in), mime.String(), nil + mime := mimetype.Detect(buf) + compressible, err = isCompressible(bytes.NewReader(buf)) + if err != nil { + return nil, false, "", err + } + in = io.MultiReader(bytes.NewReader(buf), in) + return wrap(in), compressible, mime.String(), nil } -func isCompressible(mime string) bool { - return !unconpressibleRegexp.MatchString(mime) +// isCompressible checks the compression ratio of the provided data and returns true if the ratio exceeds +// the configured threshold +func isCompressible(r io.Reader) (bool, error) { + var b bytes.Buffer + w, err := sgzip.NewWriterLevel(&b, sgzip.DefaultCompression) + if err != nil { + return false, err + } + n, err := io.Copy(w, r) + if err != nil { + return false, err + } + ratio := float64(n) / float64(b.Len()) + return ratio > minCompressionRatio, nil } // Verifies an object hash @@ -394,7 +417,7 @@ type compressionResult struct { } // Put a compressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { // Unwrap reader accounting in, wrap := accounting.UnWrap(in) @@ -422,17 +445,20 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o closeErr := pipeWriter.Close() if closeErr != nil { fs.Errorf(nil, "Failed to close pipe: %v", closeErr) + if err == nil { + err = closeErr + } } results <- compressionResult{err: err, meta: gz.MetaData()} }() wrappedIn := wrap(bufio.NewReaderSize(pipeReader, bufferSize)) // Probably no longer needed as sgzip has it's own buffering - // If verifyCompressedObject is on, find a hash the destination supports to compute a hash of + // Find a hash the destination supports to compute a hash of // the compressed data. ht := f.Fs.Hashes().GetOne() var hasher *hash.MultiHasher var err error - if ht != hash.None && verifyCompressedObject { + if ht != hash.None { // unwrap the accounting again wrappedIn, wrap = accounting.UnWrap(wrappedIn) hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(ht)) @@ -483,16 +509,14 @@ func (f *Fs) putCompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, o } // Put an uncompressed version of a file. Returns a wrappable object and metadata. -func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string, verifyCompressedObject bool) (fs.Object, *ObjectMetadata, error) { +func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put putFn, mimeType string) (fs.Object, *ObjectMetadata, error) { // Unwrap the accounting, add our metadata hasher, then wrap it back on in, wrap := accounting.UnWrap(in) hs := hash.NewHashSet(hash.MD5) ht := f.Fs.Hashes().GetOne() - if verifyCompressedObject { - if !hs.Contains(ht) { - hs.Add(ht) - } + if !hs.Contains(ht) { + hs.Add(ht) } metaHasher, err := hash.NewMultiHasherTypes(hs) if err != nil { @@ -513,7 +537,7 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, return nil, nil, err } // Check the hashes of the compressed data if we were comparing them - if ht != hash.None && verifyCompressedObject { + if ht != hash.None { err := f.verifyObjectHash(ctx, o, metaHasher, ht) if err != nil { return nil, nil, err @@ -529,7 +553,7 @@ func (f *Fs) putUncompress(ctx context.Context, in io.Reader, src fs.ObjectInfo, } // This function will write a metadata struct to a metadata Object for an src. Returns a wrappable metadata object. -func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn, verifyCompressedObject bool) (mo fs.Object, err error) { +func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.ObjectInfo, options []fs.OpenOption, put putFn) (mo fs.Object, err error) { // Generate the metadata contents var b bytes.Buffer gzipWriter := gzip.NewWriter(&b) @@ -560,21 +584,21 @@ func (f *Fs) putMetadata(ctx context.Context, meta *ObjectMetadata, src fs.Objec // This function will put both the data and metadata for an Object. // putData is the function used for data, while putMeta is the function used for metadata. func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, - putData putFn, putMeta putFn, compressible bool, mimeType string, verifyCompressedObject bool) (*Object, error) { + putData putFn, putMeta putFn, compressible bool, mimeType string) (*Object, error) { // Put file then metadata var dataObject fs.Object var meta *ObjectMetadata var err error if compressible { - dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) + dataObject, meta, err = f.putCompress(ctx, in, src, options, putData, mimeType) } else { - dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType, verifyCompressedObject) + dataObject, meta, err = f.putUncompress(ctx, in, src, options, putData, mimeType) } if err != nil { return nil, err } - mo, err := f.putMetadata(ctx, meta, src, options, putMeta, verifyCompressedObject) + mo, err := f.putMetadata(ctx, meta, src, options, putMeta) // meta data upload may fail. in this case we try to remove the original object if err != nil { @@ -593,17 +617,17 @@ func (f *Fs) putWithCustomFunctions(ctx context.Context, in io.Reader, src fs.Ob // will return the object and the error, otherwise will return // nil and the error func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - // If there's already an existent objects we need to make sure to explcitly update it to make sure we don't leave + // If there's already an existent objects we need to make sure to explicitly update it to make sure we don't leave // orphaned data. Alternatively we could also deleted (which would simpler) but has the disadvantage that it // destroys all server-side versioning. o, err := f.NewObject(ctx, src.Remote()) if err == fs.ErrorObjectNotFound { // Get our file compressibility - in, mimeType, err := findMimeType(in) + in, compressible, mimeType, err := checkCompressAndType(in) if err != nil { return nil, err } - return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, isCompressible(mimeType), mimeType, true) + return f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Put, f.Fs.Put, compressible, mimeType) } if err != nil { return nil, err @@ -619,50 +643,33 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt } found := err == nil - in, mimeType, err := findMimeType(in) + in, compressible, mimeType, err := checkCompressAndType(in) if err != nil { return nil, err } - compressible := isCompressible(mimeType) - newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType, true) + newObj, err := f.putWithCustomFunctions(ctx, in, src, options, f.Fs.Features().PutStream, f.Fs.Put, compressible, mimeType) if err != nil { return nil, err } - // Our transfer is now complete if we allready had an object with the same name we can safely remove it now - // this is necessary to make sure we don't leave the remote in an inconsistent state. - if found { + // Our transfer is now complete. We have to make sure to remove the old object because our new object will + // have a different name except when both the old and the new object where uncompressed. + if found && (oldObj.(*Object).meta.Mode != Uncompressed || compressible) { err = oldObj.(*Object).Object.Remove(ctx) if err != nil { return nil, errors.Wrap(err, "Could remove original object") } } - moveFs, ok := f.Fs.(fs.Mover) - var wrapObj fs.Object - if ok { - wrapObj, err = moveFs.Move(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible)) + // If our new object is compressed we have to rename it with the correct size. + // Uncompressed objects don't store the size in the name so we they'll allready have the correct name. + if compressible { + wrapObj, err := operations.Move(ctx, f.Fs, nil, f.dataName(src.Remote(), newObj.size, compressible), newObj.Object) if err != nil { - return nil, errors.Wrap(err, "Couldn't rename streamed object.") + return nil, errors.Wrap(err, "Couldn't rename streamed Object.") } newObj.Object = wrapObj - return newObj, nil } - - // If we don't have move we'll need to resort to serverside copy and remove - copyFs, ok := f.Fs.(fs.Copier) - if ok { - wrapObj, err := copyFs.Copy(ctx, newObj.Object, f.dataName(src.Remote(), newObj.size, compressible)) - if err != nil { - return nil, errors.Wrap(err, "Could't copy streamed object.") - } - // Remove the original - err = newObj.Remove(ctx) - if err != nil { - return wrapObj, errors.Wrap(err, "Couldn't remove original streamed object. Remote may be in an incositent state.") - } - } - newObj.Object = wrapObj return newObj, nil } @@ -935,7 +942,6 @@ func (f *Fs) PublicLink(ctx context.Context, remote string, duration fs.Duration // ObjectMetadata describes the metadata for an Object. type ObjectMetadata struct { - Size int64 // Uncompressed size of the file. Mode int // Compression mode of the file. Hash []byte // MD5 hash of the file. MimeType string // Mime type of the file @@ -1009,25 +1015,8 @@ func (o *Object) Remove(ctx context.Context) error { // ReadCloserWrapper combines a Reader and a Closer to a ReadCloser type ReadCloserWrapper struct { - dataSource io.Reader - closer io.Closer -} - -func combineReaderAndCloser(dataSource io.Reader, closer io.Closer) *ReadCloserWrapper { - rc := new(ReadCloserWrapper) - rc.dataSource = dataSource - rc.closer = closer - return rc -} - -// Read function -func (w *ReadCloserWrapper) Read(p []byte) (n int, err error) { - return w.dataSource.Read(p) -} - -// Close function -func (w *ReadCloserWrapper) Close() error { - return w.closer.Close() + io.Reader + io.Closer } // Update in to the object with the modTime given of the given size @@ -1041,18 +1030,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return o.mo, o.mo.Update(ctx, in, src, options...) } - in, mimeType, err := findMimeType(in) + in, compressible, mimeType, err := checkCompressAndType(in) if err != nil { return err } - compressible := isCompressible(mimeType) // Since we are storing the filesize in the name the new object may have different name than the old // We'll make sure to delete the old object in this case var newObject *Object origName := o.Remote() if o.meta.Mode != Uncompressed || compressible { - newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType, true) + newObject, err = o.f.putWithCustomFunctions(ctx, in, o.f.wrapInfo(src, origName, src.Size()), options, o.f.Fs.Put, updateMeta, compressible, mimeType) if newObject.Object.Remote() != o.Object.Remote() { if removeErr := o.Object.Remove(ctx); removeErr != nil { return removeErr @@ -1064,7 +1052,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return o.Object, o.Object.Update(ctx, in, src, options...) } // If we are, just update the object and metadata - newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType, true) + newObject, err = o.f.putWithCustomFunctions(ctx, in, src, options, update, updateMeta, compressible, mimeType) } if err != nil { return err @@ -1163,13 +1151,13 @@ func (o *Object) MimeType(ctx context.Context) string { // Hash returns the selected checksum of the file // If no checksum is available it returns "" func (o *Object) Hash(ctx context.Context, ht hash.Type) (string, error) { + if ht != hash.MD5 { + return "", hash.ErrUnsupported + } err := o.loadMetadataIfNotLoaded(ctx) if err != nil { return "", err } - if ht&hash.MD5 == 0 { - return "", hash.ErrUnsupported - } return hex.EncodeToString(o.meta.Hash), nil } @@ -1240,7 +1228,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read fileReader = file } // Return a ReadCloser - return combineReaderAndCloser(fileReader, chunkedReader), nil + return ReadCloserWrapper{Reader: fileReader, Closer: chunkedReader}, nil } // ObjectInfo describes a wrapped fs.ObjectInfo for being the source @@ -1305,9 +1293,6 @@ func (o *ObjectInfo) Hash(ctx context.Context, ht hash.Type) (string, error) { if ht != hash.MD5 { return "", hash.ErrUnsupported } - if o.Size() != o.src.Size() { - return "", hash.ErrUnsupported - } value, err := o.src.Hash(ctx, ht) if err == hash.ErrUnsupported { return "", hash.ErrUnsupported diff --git a/docs/content/press.md b/docs/content/press.md index 2049c0210..ad11d539c 100644 --- a/docs/content/press.md +++ b/docs/content/press.md @@ -8,7 +8,7 @@ Press (Experimental) ----------------------------------------- The `press` remote adds compression to another remote. It is best used with remotes containing -many large compressible files or on top of other remotes like crypt. +many large compressible files. Please read the [warnings](#warnings) before using this remote.