mirror of
https://github.com/rclone/rclone.git
synced 2025-01-12 09:18:38 +01:00
b33140ddeb
This adds a :writeback tag to upstreams. If set on a single upstream then it writes back objects not found into that upstream. Fixes #6934
234 lines
5.8 KiB
Go
234 lines
5.8 KiB
Go
package union
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/backend/union/upstream"
|
|
"github.com/rclone/rclone/fs"
|
|
)
|
|
|
|
// Object describes a union Object
|
|
//
|
|
// This is a wrapped object which returns the Union Fs as its parent
|
|
type Object struct {
|
|
*upstream.Object
|
|
fs *Fs // what this object is part of
|
|
co []upstream.Entry
|
|
writebackMu sync.Mutex
|
|
}
|
|
|
|
// Directory describes a union Directory
|
|
//
|
|
// This is a wrapped object contains all candidates
|
|
type Directory struct {
|
|
*upstream.Directory
|
|
cd []upstream.Entry
|
|
}
|
|
|
|
type entry interface {
|
|
upstream.Entry
|
|
candidates() []upstream.Entry
|
|
}
|
|
|
|
// Update o with the contents of newO excluding the lock
|
|
func (o *Object) update(newO *Object) {
|
|
o.Object = newO.Object
|
|
o.fs = newO.fs
|
|
o.co = newO.co
|
|
}
|
|
|
|
// UnWrapUpstream returns the upstream Object that this Object is wrapping
|
|
func (o *Object) UnWrapUpstream() *upstream.Object {
|
|
return o.Object
|
|
}
|
|
|
|
// Fs returns the union Fs as the parent
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.fs
|
|
}
|
|
|
|
func (o *Object) candidates() []upstream.Entry {
|
|
return o.co
|
|
}
|
|
|
|
func (d *Directory) candidates() []upstream.Entry {
|
|
return d.cd
|
|
}
|
|
|
|
// Update in to the object with the modTime given of the given size
|
|
//
|
|
// When called from outside an Fs by rclone, src.Size() will always be >= 0.
|
|
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
|
// return an error or update the object properly (rather than e.g. calling panic).
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err == fs.ErrorPermissionDenied {
|
|
// There are no candidates in this object which can be written to
|
|
// So attempt to create a new object instead
|
|
newO, err := o.fs.put(ctx, in, src, false, options...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Update current object
|
|
o.update(newO.(*Object))
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) == 1 {
|
|
obj := entries[0].(*upstream.Object)
|
|
return obj.Update(ctx, in, src, options...)
|
|
}
|
|
// Multi-threading
|
|
readers, errChan := multiReader(len(entries), in)
|
|
errs := Errors(make([]error, len(entries)+1))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.Update(ctx, readers[i], src, options...)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
if len(entries) > 1 {
|
|
// Drain the input buffer to allow other uploads to continue
|
|
_, _ = io.Copy(io.Discard, readers[i])
|
|
}
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
errs[len(entries)] = <-errChan
|
|
return errs.Err()
|
|
}
|
|
|
|
// Remove candidate objects selected by ACTION policy
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
errs := Errors(make([]error, len(entries)))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.Remove(ctx)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
return errs.Err()
|
|
}
|
|
|
|
// SetModTime sets the metadata on the object to set the modification date
|
|
func (o *Object) SetModTime(ctx context.Context, t time.Time) error {
|
|
entries, err := o.fs.actionEntries(o.candidates()...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var wg sync.WaitGroup
|
|
errs := Errors(make([]error, len(entries)))
|
|
multithread(len(entries), func(i int) {
|
|
if o, ok := entries[i].(*upstream.Object); ok {
|
|
err := o.SetModTime(ctx, t)
|
|
if err != nil {
|
|
errs[i] = fmt.Errorf("%s: %w", o.UpstreamFs().Name(), err)
|
|
}
|
|
} else {
|
|
errs[i] = fs.ErrorNotAFile
|
|
}
|
|
})
|
|
wg.Wait()
|
|
return errs.Err()
|
|
}
|
|
|
|
// GetTier returns storage tier or class of the Object
|
|
func (o *Object) GetTier() string {
|
|
do, ok := o.Object.Object.(fs.GetTierer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.GetTier()
|
|
}
|
|
|
|
// ID returns the ID of the Object if known, or "" if not
|
|
func (o *Object) ID() string {
|
|
do, ok := o.Object.Object.(fs.IDer)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return do.ID()
|
|
}
|
|
|
|
// MimeType returns the content type of the Object if known
|
|
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
|
|
if do, ok := o.Object.Object.(fs.MimeTyper); ok {
|
|
mimeType = do.MimeType(ctx)
|
|
}
|
|
return mimeType
|
|
}
|
|
|
|
// SetTier performs changing storage tier of the Object if
|
|
// multiple storage classes supported
|
|
func (o *Object) SetTier(tier string) error {
|
|
do, ok := o.Object.Object.(fs.SetTierer)
|
|
if !ok {
|
|
return errors.New("underlying remote does not support SetTier")
|
|
}
|
|
return do.SetTier(tier)
|
|
}
|
|
|
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
|
// Need some sort of locking to prevent multiple downloads
|
|
o.writebackMu.Lock()
|
|
defer o.writebackMu.Unlock()
|
|
|
|
// FIXME what if correct object is already in o.co
|
|
|
|
newObj, err := o.Object.Writeback(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if newObj != nil {
|
|
o.Object = newObj
|
|
o.co = append(o.co, newObj) // FIXME should this append or overwrite or update?
|
|
}
|
|
return o.Object.Object.Open(ctx, options...)
|
|
}
|
|
|
|
// ModTime returns the modification date of the directory
|
|
// It returns the latest ModTime of all candidates
|
|
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
|
|
entries := d.candidates()
|
|
times := make([]time.Time, len(entries))
|
|
multithread(len(entries), func(i int) {
|
|
times[i] = entries[i].ModTime(ctx)
|
|
})
|
|
for _, ti := range times {
|
|
if t.Before(ti) {
|
|
t = ti
|
|
}
|
|
}
|
|
return t
|
|
}
|
|
|
|
// Size returns the size of the directory
|
|
// It returns the sum of all candidates
|
|
func (d *Directory) Size() (s int64) {
|
|
for _, e := range d.candidates() {
|
|
s += e.Size()
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.FullObject = (*Object)(nil)
|
|
)
|