diff --git a/backend/smb/connpool.go b/backend/smb/connpool.go index 2c3fb2f80..1120533a8 100644 --- a/backend/smb/connpool.go +++ b/backend/smb/connpool.go @@ -2,8 +2,10 @@ package smb import ( "context" + "errors" "fmt" "net" + "os" "time" smb2 "github.com/cloudsoda/go-smb2" @@ -11,14 +13,17 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/fshttp" + "golang.org/x/sync/errgroup" ) // dial starts a client connection to the given SMB server. It is a // convenience function that connects to the given network address, // initiates the SMB handshake, and then sets up a Client. +// +// The context is only used for establishing the connection, not after. func (f *Fs) dial(ctx context.Context, network, addr string) (*conn, error) { dialer := fshttp.NewDialer(ctx) - tconn, err := dialer.Dial(network, addr) + tconn, err := dialer.DialContext(ctx, network, addr) if err != nil { return nil, err } @@ -89,15 +94,7 @@ func (c *conn) close() (err error) { // True if it's closed func (c *conn) closed() bool { - var nopErr error - if c.smbShare != nil { - // stat the current directory - _, nopErr = c.smbShare.Stat(".") - } else { - // list the shares - _, nopErr = c.smbSession.ListSharenames() - } - return nopErr != nil + return c.smbSession.Echo() != nil } // Show that we are using a SMB session @@ -118,23 +115,20 @@ func (f *Fs) getSessions() int32 { } // Open a new connection to the SMB server. +// +// The context is only used for establishing the connection, not after. func (f *Fs) newConnection(ctx context.Context, share string) (c *conn, err error) { - // As we are pooling these connections we need to decouple - // them from the current context - bgCtx := context.Background() - - c, err = f.dial(bgCtx, "tcp", f.opt.Host+":"+f.opt.Port) + c, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port) if err != nil { return nil, fmt.Errorf("couldn't connect SMB: %w", err) } if share != "" { // mount the specified share as well if user requested - c.smbShare, err = c.smbSession.Mount(share) + err = c.mountShare(share) if err != nil { _ = c.smbSession.Logoff() return nil, fmt.Errorf("couldn't initialize SMB: %w", err) } - c.smbShare = c.smbShare.WithContext(bgCtx) } return c, nil } @@ -192,23 +186,30 @@ func (f *Fs) getConnection(ctx context.Context, share string) (c *conn, err erro // Return a SMB connection to the pool // // It nils the pointed to connection out so it can't be reused -func (f *Fs) putConnection(pc **conn) { - c := *pc - *pc = nil - - var nopErr error - if c.smbShare != nil { - // stat the current directory - _, nopErr = c.smbShare.Stat(".") - } else { - // list the shares - _, nopErr = c.smbSession.ListSharenames() - } - if nopErr != nil { - fs.Debugf(f, "Connection failed, closing: %v", nopErr) - _ = c.close() +// +// if err is not nil then it checks the connection is alive using an +// ECHO request +func (f *Fs) putConnection(pc **conn, err error) { + if pc == nil { return } + c := *pc + if c == nil { + return + } + *pc = nil + if err != nil { + // If not a regular SMB error then check the connection + if !(errors.Is(err, os.ErrNotExist) || errors.Is(err, os.ErrExist) || errors.Is(err, os.ErrPermission)) { + echoErr := c.smbSession.Echo() + if echoErr != nil { + fs.Debugf(f, "Connection failed, closing: %v", echoErr) + _ = c.close() + return + } + fs.Debugf(f, "Connection OK after error: %v", err) + } + } f.poolMu.Lock() f.pool = append(f.pool, c) @@ -235,15 +236,18 @@ func (f *Fs) drainPool(ctx context.Context) (err error) { if len(f.pool) != 0 { fs.Debugf(f, "Closing %d unused connections", len(f.pool)) } + + g, _ := errgroup.WithContext(ctx) for i, c := range f.pool { - if !c.closed() { - cErr := c.close() - if cErr != nil { - err = cErr + g.Go(func() (err error) { + if !c.closed() { + err = c.close() } - } - f.pool[i] = nil + f.pool[i] = nil + return err + }) } + err = g.Wait() f.pool = nil return err } diff --git a/backend/smb/smb.go b/backend/smb/smb.go index 0fb1df8b9..c6911fb76 100644 --- a/backend/smb/smb.go +++ b/backend/smb/smb.go @@ -25,7 +25,7 @@ import ( ) const ( - minSleep = 100 * time.Millisecond + minSleep = 10 * time.Millisecond maxSleep = 2 * time.Second decayConstant = 2 // bigger for slower decay, exponential ) @@ -207,7 +207,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, err } stat, err := cn.smbShare.Stat(f.toSambaPath(dir)) - f.putConnection(&cn) + f.putConnection(&cn, err) if err != nil { // ignore stat error here return f, nil @@ -268,7 +268,7 @@ func (f *Fs) findObjectSeparate(ctx context.Context, share, path string) (fs.Obj return nil, err } stat, err := cn.smbShare.Stat(f.toSambaPath(path)) - f.putConnection(&cn) + f.putConnection(&cn, err) if err != nil { return nil, translateError(err, false) } @@ -290,7 +290,7 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { return err } err = cn.smbShare.MkdirAll(f.toSambaPath(path), 0o755) - f.putConnection(&cn) + f.putConnection(&cn, err) return err } @@ -305,7 +305,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error { return err } err = cn.smbShare.Remove(f.toSambaPath(path)) - f.putConnection(&cn) + f.putConnection(&cn, err) return err } @@ -375,7 +375,7 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (_ fs.Objec return nil, err } err = cn.smbShare.Rename(f.toSambaPath(srcPath), f.toSambaPath(dstPath)) - f.putConnection(&cn) + f.putConnection(&cn, err) if err != nil { return nil, translateError(err, false) } @@ -412,7 +412,7 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string if err != nil { return err } - defer f.putConnection(&cn) + defer f.putConnection(&cn, err) _, err = cn.smbShare.Stat(dstPath) if os.IsNotExist(err) { @@ -430,7 +430,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e if err != nil { return nil, err } - defer f.putConnection(&cn) + defer f.putConnection(&cn, err) if share == "" { shares, err := cn.smbSession.ListSharenames() @@ -474,7 +474,7 @@ func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) { return nil, err } stat, err := cn.smbShare.Statfs(dir) - f.putConnection(&cn) + f.putConnection(&cn, err) if err != nil { return nil, err } @@ -556,7 +556,7 @@ func (f *Fs) ensureDirectory(ctx context.Context, share, _path string) error { return err } err = cn.smbShare.MkdirAll(f.toSambaPath(dir), 0o755) - f.putConnection(&cn) + f.putConnection(&cn, err) return err } @@ -604,7 +604,7 @@ func (o *Object) SetModTime(ctx context.Context, t time.Time) (err error) { if err != nil { return err } - defer o.fs.putConnection(&cn) + defer o.fs.putConnection(&cn, err) err = cn.smbShare.Chtimes(reqDir, t, t) if err != nil { @@ -650,24 +650,25 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read } fl, err := cn.smbShare.OpenFile(filename, os.O_RDONLY, 0) if err != nil { - o.fs.putConnection(&cn) + o.fs.putConnection(&cn, err) return nil, fmt.Errorf("failed to open: %w", err) } pos, err := fl.Seek(offset, io.SeekStart) if err != nil { - o.fs.putConnection(&cn) + o.fs.putConnection(&cn, err) return nil, fmt.Errorf("failed to seek: %w", err) } if pos != offset { - o.fs.putConnection(&cn) - return nil, fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos) + err = fmt.Errorf("failed to seek: wrong position (expected=%d, reported=%d)", offset, pos) + o.fs.putConnection(&cn, err) + return nil, err } in = readers.NewLimitedReadCloser(fl, limit) in = &boundReadCloser{ rc: in, close: func() error { - o.fs.putConnection(&cn) + o.fs.putConnection(&cn, nil) return nil }, } @@ -697,7 +698,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } defer func() { - o.fs.putConnection(&cn) + o.fs.putConnection(&cn, err) }() fl, err := cn.smbShare.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) @@ -757,7 +758,7 @@ func (o *Object) Remove(ctx context.Context) (err error) { } err = cn.smbShare.Remove(filename) - o.fs.putConnection(&cn) + o.fs.putConnection(&cn, err) return err } diff --git a/go.mod b/go.mod index b02c39149..08a076848 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/aws/smithy-go v1.22.1 github.com/buengese/sgzip v0.1.1 github.com/cloudinary/cloudinary-go/v2 v2.9.0 - github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6 + github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed github.com/colinmarc/hdfs/v2 v2.4.0 github.com/coreos/go-semver v0.3.1 github.com/coreos/go-systemd/v22 v22.5.0 diff --git a/go.sum b/go.sum index d37d1c8e4..bb76c2a76 100644 --- a/go.sum +++ b/go.sum @@ -181,8 +181,8 @@ github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vc github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA= github.com/cloudinary/cloudinary-go/v2 v2.9.0 h1:8C76QklmuV4qmKAC7cUnu9D68X9kCkFMuLspPikECCo= github.com/cloudinary/cloudinary-go/v2 v2.9.0/go.mod h1:ireC4gqVetsjVhYlwjUJwKTbZuWjEIynbR9zQTlqsvo= -github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6 h1:mLY/79N73URZ2J/oRKTxmfhCgxThzBmjQ6XOjX5tYjI= -github.com/cloudsoda/go-smb2 v0.0.0-20241223203758-52b943b88fd6/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw= +github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed h1:KrdJUJWhJ1UWhvaP6SBsvG356KjqfdDjcS/4xTswAU4= +github.com/cloudsoda/go-smb2 v0.0.0-20250124173933-e6bbeea507ed/go.mod h1:0aLYPsmguHbok591y6hI5yAqU0drbUzrPEO10ZpgTTw= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=