mirror of
https://github.com/rclone/rclone.git
synced 2025-01-24 15:21:08 +01:00
5d6b8141ec
As of Go 1.16, the same functionality is now provided by package io or package os, and those implementations should be preferred in new code.
207 lines
5.0 KiB
Go
207 lines
5.0 KiB
Go
package union
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/backend/union/upstream"
|
|
"github.com/rclone/rclone/fs"
|
|
)
|
|
|
|
// Object describes a union Object
|
|
//
|
|
// This is a wrapped object which returns the Union Fs as its parent
|
|
type Object struct {
|
|
*upstream.Object
|
|
fs *Fs // what this object is part of
|
|
co []upstream.Entry
|
|
}
|
|
|
|
// Directory describes a union Directory
|
|
//
|
|
// This is a wrapped object contains all candidates
|
|
type Directory struct {
|
|
*upstream.Directory
|
|
cd []upstream.Entry
|
|
}
|
|
|
|
type entry interface {
|
|
upstream.Entry
|
|
candidates() []upstream.Entry
|
|
}
|
|
|
|
// UnWrapUpstream returns the upstream Object that this Object is wrapping
|
|
func (o *Object) UnWrapUpstream() *upstream.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// Fs returns the union Fs as the parent
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.fs
|
|
}
|
|
|
|
func (o *Object) candidates() []upstream.Entry {
|
|
return o.co
|
|
}
|
|
|
|
func (d *Directory) candidates() []upstream.Entry {
|
|
return d.cd
|
|
}
|
|
|
|
// Update in to the object with the modTime given of the given size
|
|
//
|
|
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
|
|
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
|
// return an error or update the object properly (rather than e.g. calling panic).
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err == fs.ErrorPermissionDenied {
|
|
// There are no candidates in this object which can be written to
|
|
// So attempt to create a new object instead
|
|
newO, err := o.fs.put(ctx, in, src, false, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Update current object
|
|
*o = *newO.(*Object)
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) == 1 {
|
|
obj := entries[0].(*upstream.Object)
|
|
return obj.Update(ctx, in, src, options...)
|
|
}
|
|
// Multi-threading
|
|
readers, errChan := multiReader(len(entries), in)
|
|
errs := Errors(make([]error, len(entries)+1))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.Update(ctx, readers[i], src, options...)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
if len(entries) > 1 {
|
|
// Drain the input buffer to allow other uploads to continue
|
|
_, _ = io.Copy(io.Discard, readers[i])
|
|
}
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
errs[len(entries)] = <-errChan
|
|
return errs.Err()
|
|
}
|
|
|
|
// Remove candidate objects selected by ACTION policy
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
errs := Errors(make([]error, len(entries)))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.Remove(ctx)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
return errs.Err()
|
|
}
|
|
|
|
// SetModTime sets the metadata on the object to set the modification date
|
|
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var wg sync.WaitGroup
|
|
errs := Errors(make([]error, len(entries)))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.SetModTime(ctx, t)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
wg.Wait()
|
|
return errs.Err()
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *Object) GetTier() string {
|
|
do, ok := o.Object.Object.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
do, ok := o.Object.Object.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// MimeType returns the content type of the Object if known
|
|
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
|
|
if do, ok := o.Object.Object.(fs.MimeTyper); ok {
|
|
mimeType = do.MimeType(ctx)
|
|
}
|
|
return mimeType
|
|
}
|
|
|
|
// SetTier performs changing storage tier of the Object if
|
|
// multiple storage classes supported
|
|
func (o *Object) SetTier(tier string) error {
|
|
do, ok := o.Object.Object.(fs.SetTierer)
|
|
if !ok {
|
|
return errors.New("underlying remote does not support SetTier")
|
|
}
|
|
return do.SetTier(tier)
|
|
}
|
|
|
|
// ModTime returns the modification date of the directory
|
|
// It returns the latest ModTime of all candidates
|
|
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
|
|
entries := d.candidates()
|
|
times := make([]time.Time, len(entries))
|
|
multithread(len(entries), func(i int) {
|
|
times[i] = entries[i].ModTime(ctx)
|
|
})
|
|
for _, ti := range times {
|
|
if t.Before(ti) {
|
|
t = ti
|
|
}
|
|
}
|
|
return t
|
|
}
|
|
|
|
// Size returns the size of the directory
|
|
// It returns the sum of all candidates
|
|
func (d *Directory) Size() (s int64) {
|
|
for _, e := range d.candidates() {
|
|
s += e.Size()
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.FullObject = (*Object)(nil)
|
|
)
|