mirror of
https://github.com/rclone/rclone.git
synced 2025-02-22 13:31:45 +01:00
smb: improve connection pooling efficiency
* Lower pacer minSleep to establish new connections faster * Use Echo requests to check whether connections are working (required an upgrade of go-smb2) * Only remount shares when needed * Use context for connection establishment * When returning a connection to the pool, only check the ones that encountered errors * Close connections in parallel
This commit is contained in:
parent
057fdb3a9d
commit
dc9c87279b
@ -2,8 +2,10 @@ package smb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
smb2 "github.com/cloudsoda/go-smb2"
|
||||
@ -11,14 +13,17 @@ import (
|
||||
"github.com/rclone/rclone/fs/accounting"
|
||||
"github.com/rclone/rclone/fs/config/obscure"
|
||||
"github.com/rclone/rclone/fs/fshttp"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// dial starts a client connection to the given SMB server. It is a
|
||||
// convenience function that connects to the given network address,
|
||||
// initiates the SMB handshake, and then sets up a Client.
|
||||
//
|
||||
// The context is only used for establishing the connection, not after.
|
||||
func (f *Fs) dial(ctx context.Context, network, addr string) (*conn, error) {
|
||||
dialer := fshttp.NewDialer(ctx)
|
||||
tconn, err := dialer.Dial(network, addr)
|
||||
tconn, err := dialer.DialContext(ctx, network, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -89,15 +94,7 @@ func (c *conn) close() (err error) {
|
||||
|
||||
// True if it's closed
|
||||
func (c *conn) closed() bool {
|
||||
var nopErr error
|
||||
if c.smbShare != nil {
|
||||
// stat the current directory
|
||||
_, nopErr = c.smbShare.Stat(".")
|
||||
} else {
|
||||
// list the shares
|
||||
_, nopErr = c.smbSession.ListSharenames()
|
||||
}
|
||||
return nopErr != nil
|
||||
return c.smbSession.Echo() != nil
|
||||
}
|
||||
|
||||
// Show that we are using a SMB session
|
||||
@ -118,23 +115,20 @@ func (f *Fs) getSessions() int32 {
|
||||
}
|
||||
|
||||
// Open a new connection to the SMB server.
|
||||
//
|
||||
// The context is only used for establishing the connection, not after.
|
||||
func (f *Fs) newConnection(ctx context.Context, share string) (c *conn, err error) {
|
||||
// As we are pooling these connections we need to decouple
|
||||
// them from the current context
|
||||
bgCtx := context.Background()
|
||||
|
||||
c, err = f.dial(bgCtx, "tcp", f.opt.Host+":"+f.opt.Port)
|
||||
c, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't connect SMB: %w", err)
|
||||
}
|
||||
if share != "" {
|
||||
// mount the specified share as well if user requested
|
||||
c.smbShare, err = c.smbSession.Mount(share)
|
||||
err = c.mountShare(share)
|
||||
if err != nil {
|
||||
_ = c.smbSession.Logoff()
|
||||
return nil, fmt.Errorf("couldn't initialize SMB: %w", err)
|
||||
}
|
||||
c.smbShare = c.smbShare.WithContext(bgCtx)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
@ -192,23 +186,30 @@ func (f *Fs) getConnection(ctx context.Context, share string) (c *conn, err erro
|
||||
// Return a SMB connection to the pool
|
||||
//
|
||||
// It nils the pointed to connection out so it can't be reused
|
||||
func (f *Fs) putConnection(pc **conn) {
|
||||
c := *pc
|
||||
*pc = nil
|
||||
|
||||
var nopErr error
|
||||
if c.smbShare != nil {
|
||||
// stat the current directory
|
||||
_, nopErr = c.smbShare.Stat(".")
|
||||
} else {
|
||||
// list the shares
|
||||
_, nopErr = c.smbSession.ListSharenames()
|
||||
}
|
||||
if nopErr != nil {
|
||||
fs.Debugf(f, "Connection failed, closing: %v", nopErr)
|
||||
_ = c.close()
|
||||
//
|
||||
// if err is not nil then it checks the connection is alive using an
|
||||
// ECHO request
|
||||
func (f *Fs) putConnection(pc **conn, err error) {
|
||||
if pc == nil {
|
||||
return
|
||||
}
|
||||
c := *pc
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
*pc = nil
|
||||
if err != nil {
|
||||
// If not a regular SMB error then check the connection
|
||||
if !(errors.Is(err, os.ErrNotExist) || errors.Is(err, os.ErrExist) || errors.Is(err, os.ErrPermission)) {
|
||||
echoErr := c.smbSession.Echo()
|
||||
if echoErr != nil {
|
||||
fs.Debugf(f, "Connection failed, closing: %v", echoErr)
|
||||
_ = c.close()
|
||||
return
|
||||
}
|
||||
fs.Debugf(f, "Connection OK after error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
f.poolMu.Lock()
|
||||
f.pool = append(f.pool, c)
|
||||
@ -235,15 +236,18 @@ func (f *Fs) drainPool(ctx context.Context) (err error) {
|
||||
if len(f.pool) != 0 {
|
||||
fs.Debugf(f, "Closing %d unused connections", len(f.pool))
|
||||
}
|
||||
|
||||
g, _ := errgroup.WithContext(ctx)
|
||||
for i, c := range f.pool {
|
||||
if !c.closed() {
|
||||
cErr := c.close()
|
||||
if cErr != nil {
|
||||
err = cErr
|
||||
g.Go(func() (err error) {
|
||||
if !c.closed() {
|
||||
err = c.close()
|
||||
}
|
||||
}
|
||||
f.pool[i] = nil
|
||||
f.pool[i] = nil
|
||||
return err
|
||||
})
|
||||
}
|
||||
err = g.Wait()
|
||||
f.pool = nil
|
||||
return err
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
minSleep = 100 * time.Millisecond
|
||||
minSleep = 10 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
)
|
||||
@ -207,7 +207,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
return nil, err
|
||||
}
|
||||
stat, err := cn.smbShare.Stat(f.toSambaPath(dir))
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
if err != nil {
|
||||
// ignore stat error here
|
||||
return f, nil
|
||||
@ -268,7 +268,7 @@ func (f *Fs) findObjectSeparate(ctx context.Context, share, path string) (fs.Obj
|
||||
return nil, err
|
||||
}
|
||||
stat, err := cn.smbShare.Stat(f.toSambaPath(path))
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
if err != nil {
|
||||
return nil, translateError(err, false)
|
||||
}
|
||||
@ -290,7 +290,7 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
|
||||
return err
|
||||
}
|
||||
err = cn.smbShare.MkdirAll(f.toSambaPath(path), 0o755)
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -305,7 +305,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||
return err
|
||||
}
|
||||
err = cn.smbShare.Remove(f.toSambaPath(path))
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -375,7 +375,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (_ fs.Objec
|
||||
return nil, err
|
||||
}
|
||||
err = cn.smbShare.Rename(f.toSambaPath(srcPath), f.toSambaPath(dstPath))
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
if err != nil {
|
||||
return nil, translateError(err, false)
|
||||
}
|
||||
@ -412,7 +412,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.putConnection(&cn)
|
||||
defer f.putConnection(&cn, err)
|
||||
|
||||
_, err = cn.smbShare.Stat(dstPath)
|
||||
if os.IsNotExist(err) {
|
||||
@ -430,7 +430,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.putConnection(&cn)
|
||||
defer f.putConnection(&cn, err)
|
||||
|
||||
if share == "" {
|
||||
shares, err := cn.smbSession.ListSharenames()
|
||||
@ -474,7 +474,7 @@ func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) {
|
||||
return nil, err
|
||||
}
|
||||
stat, err := cn.smbShare.Statfs(dir)
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -556,7 +556,7 @@ func (f *Fs) ensureDirectory(ctx context.Context, share, _path string) error {
|
||||
return err
|
||||
}
|
||||
err = cn.smbShare.MkdirAll(f.toSambaPath(dir), 0o755)
|
||||
f.putConnection(&cn)
|
||||
f.putConnection(&cn, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -604,7 +604,7 @@ func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer o.fs.putConnection(&cn)
|
||||
defer o.fs.putConnection(&cn, err)
|
||||
|
||||
err = cn.smbShare.Chtimes(reqDir, t, t)
|
||||
if err != nil {
|
||||
@ -650,24 +650,25 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||
}
|
||||
fl, err := cn.smbShare.OpenFile(filename, os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
o.fs.putConnection(&cn)
|
||||
o.fs.putConnection(&cn, err)
|
||||
return nil, fmt.Errorf("failed to open: %w", err)
|
||||
}
|
||||
pos, err := fl.Seek(offset, io.SeekStart)
|
||||
if err != nil {
|
||||
o.fs.putConnection(&cn)
|
||||
o.fs.putConnection(&cn, err)
|
||||
return nil, fmt.Errorf("failed to seek: %w", err)
|
||||
}
|
||||
if pos != offset {
|
||||
o.fs.putConnection(&cn)
|
||||
return nil, fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos)
|
||||
err = fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos)
|
||||
o.fs.putConnection(&cn, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
in = readers.NewLimitedReadCloser(fl, limit)
|
||||
in = &boundReadCloser{
|
||||
rc: in,
|
||||
close: func() error {
|
||||
o.fs.putConnection(&cn)
|
||||
o.fs.putConnection(&cn, nil)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
@ -697,7 +698,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
o.fs.putConnection(&cn)
|
||||
o.fs.putConnection(&cn, err)
|
||||
}()
|
||||
|
||||
fl, err := cn.smbShare.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
|
||||
@ -757,7 +758,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
err = cn.smbShare.Remove(filename)
|
||||
o.fs.putConnection(&cn)
|
||||
o.fs.putConnection(&cn, err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -25,7 +25,7 @@ require (
|
||||
github.com/aws/smithy-go v1.22.1
|
||||
github.com/buengese/sgzip v0.1.1
|
||||
github.com/cloudinary/cloudinary-go/v2 v2.9.0
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed
|
||||
github.com/colinmarc/hdfs/v2 v2.4.0
|
||||
github.com/coreos/go-semver v0.3.1
|
||||
github.com/coreos/go-systemd/v22 v22.5.0
|
||||
|
4
go.sum
4
go.sum
@ -181,8 +181,8 @@ github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vc
|
||||
github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
|
||||
github.com/cloudinary/cloudinary-go/v2 v2.9.0 h1:8C76QklmuV4qmKAC7cUnu9D68X9kCkFMuLspPikECCo=
|
||||
github.com/cloudinary/cloudinary-go/v2 v2.9.0/go.mod h1:ireC4gqVetsjVhYlwjUJwKTbZuWjEIynbR9zQTlqsvo=
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6 h1:mLY/79N73URZ2J/oRKTxmfhCgxThzBmjQ6XOjX5tYjI=
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw=
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed h1:KrdJUJWhJ1UWhvaP6SBsvG356KjqfdDjcS/4xTswAU4=
|
||||
github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw=
|
||||
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
|
||||
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
|
||||
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
|
||||
|
Loading…
Reference in New Issue
Block a user