s3: Implement paged listing interface ListP

This commit is contained in:
Nick Craig-Wood 2024-11-25 12:50:27 +00:00
parent 37120ef7bd
commit fad579c4a2

View File

@ -4481,7 +4481,7 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *types.Ob
} }
// listDir lists files and directories to out // listDir lists files and directories to out
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool, callback func(fs.DirEntry) error) (err error) {
// List the objects and directories // List the objects and directories
err = f.list(ctx, listOpt{ err = f.list(ctx, listOpt{
bucket: bucket, bucket: bucket,
@ -4497,16 +4497,16 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB
return err return err
} }
if entry != nil { if entry != nil {
entries = append(entries, entry) return callback(entry)
} }
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return err
} }
// bucket must be present if listing succeeded // bucket must be present if listing succeeded
f.cache.MarkOK(bucket) f.cache.MarkOK(bucket)
return entries, nil return nil
} }
// listBuckets lists the buckets to out // listBuckets lists the buckets to out
@ -4539,14 +4539,46 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error)
// This should return ErrDirNotFound if the directory isn't // This should return ErrDirNotFound if the directory isn't
// found. // found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
return list.WithListP(ctx, dir, f)
}
// ListP lists the objects and directories of the Fs starting
// from dir non recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
func (f *Fs) ListP(ctx context.Context, dir string, callback fs.ListRCallback) error {
list := list.NewHelper(callback)
bucket, directory := f.split(dir) bucket, directory := f.split(dir)
if bucket == "" { if bucket == "" {
if directory != "" { if directory != "" {
return nil, fs.ErrorListBucketRequired return fs.ErrorListBucketRequired
}
entries, err := f.listBuckets(ctx)
if err != nil {
return err
}
for _, entry := range entries {
err = list.Add(entry)
if err != nil {
return err
}
}
} else {
err := f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", list.Add)
if err != nil {
return err
} }
return f.listBuckets(ctx)
} }
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") return list.Flush()
} }
// ListR lists the objects and directories of the Fs starting // ListR lists the objects and directories of the Fs starting
@ -6843,6 +6875,7 @@ var (
_ fs.Copier = &Fs{} _ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{} _ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{} _ fs.ListRer = &Fs{}
_ fs.ListPer = &Fs{}
_ fs.Commander = &Fs{} _ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{} _ fs.CleanUpper = &Fs{}
_ fs.OpenChunkWriter = &Fs{} _ fs.OpenChunkWriter = &Fs{}