// Package sftp provides a filesystem interface using github.com/pkg/sftp //go:build !plan9 // +build !plan9 package sftp import ( "bytes" "context" "errors" "fmt" "io" "io/ioutil" "os" "path" "regexp" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/pkg/sftp" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/env" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/readers" sshagent "github.com/xanzy/ssh-agent" "golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh/knownhosts" ) const ( defaultShellType = "unix" shellTypeNotSupported = "none" hashCommandNotSupported = "none" minSleep = 100 * time.Millisecond maxSleep = 2 * time.Second decayConstant = 2 // bigger for slower decay, exponential keepAliveInterval = time.Minute // send keepalives every this long while running commands ) var ( currentUser = env.CurrentUser() posixWinAbsPathRegex = regexp.MustCompile(`^/[a-zA-Z]\:($|/)`) // E.g. "/C:" or anything starting with "/C:/" unixShellEscapeRegex = regexp.MustCompile("[^A-Za-z0-9_.,:/\\@\u0080-\uFFFFFFFF\n-]") ) func init() { fsi := &fs.RegInfo{ Name: "sftp", Description: "SSH/SFTP", NewFs: NewFs, Options: []fs.Option{{ Name: "host", Help: "SSH host to connect to.\n\nE.g. \"example.com\".", Required: true, }, { Name: "user", Help: "SSH username.", Default: currentUser, }, { Name: "port", Help: "SSH port number.", Default: 22, }, { Name: "pass", Help: "SSH password, leave blank to use ssh-agent.", IsPassword: true, }, { Name: "key_pem", Help: "Raw PEM-encoded private key.\n\nIf specified, will override key_file parameter.", }, { Name: "key_file", Help: "Path to PEM-encoded private key file.\n\nLeave blank or set key-use-agent to use ssh-agent." + env.ShellExpandHelp, }, { Name: "key_file_pass", Help: `The passphrase to decrypt the PEM-encoded private key file. Only PEM encrypted key files (old OpenSSH format) are supported. Encrypted keys in the new OpenSSH format can't be used.`, IsPassword: true, }, { Name: "pubkey_file", Help: `Optional path to public key file. Set this if you have a signed certificate you want to use for authentication.` + env.ShellExpandHelp, }, { Name: "known_hosts_file", Help: `Optional path to known_hosts file. Set this value to enable server host key validation.` + env.ShellExpandHelp, Advanced: true, Examples: []fs.OptionExample{{ Value: "~/.ssh/known_hosts", Help: "Use OpenSSH's known_hosts file.", }}, }, { Name: "key_use_agent", Help: `When set forces the usage of the ssh-agent. When key-file is also set, the ".pub" file of the specified key-file is read and only the associated key is requested from the ssh-agent. This allows to avoid ` + "`Too many authentication failures for *username*`" + ` errors when the ssh-agent contains many keys.`, Default: false, }, { Name: "use_insecure_cipher", Help: `Enable the use of insecure ciphers and key exchange methods. This enables the use of the following insecure ciphers and key exchange methods: - aes128-cbc - aes192-cbc - aes256-cbc - 3des-cbc - diffie-hellman-group-exchange-sha256 - diffie-hellman-group-exchange-sha1 Those algorithms are insecure and may allow plaintext data to be recovered by an attacker.`, Default: false, Examples: []fs.OptionExample{ { Value: "false", Help: "Use default Cipher list.", }, { Value: "true", Help: "Enables the use of the aes128-cbc cipher and diffie-hellman-group-exchange-sha256, diffie-hellman-group-exchange-sha1 key exchange.", }, }, }, { Name: "disable_hashcheck", Default: false, Help: "Disable the execution of SSH commands to determine if remote file hashing is available.\n\nLeave blank or set to false to enable hashing (recommended), set to true to disable hashing.", }, { Name: "ask_password", Default: false, Help: `Allow asking for SFTP password when needed. If this is set and no password is supplied then rclone will: - ask for a password - not contact the ssh agent `, Advanced: true, }, { Name: "path_override", Default: "", Help: `Override path used by SSH shell commands. This allows checksum calculation when SFTP and SSH paths are different. This issue affects among others Synology NAS boxes. E.g. if shared folders can be found in directories representing volumes: rclone sync /home/local/directory remote:/directory --sftp-path-override /volume2/directory E.g. if home directory can be found in a shared folder called "home": rclone sync /home/local/directory remote:/home/directory --sftp-path-override /volume1/homes/USER/directory`, Advanced: true, }, { Name: "set_modtime", Default: true, Help: "Set the modified time on the remote if set.", Advanced: true, }, { Name: "shell_type", Default: "", Help: "The type of SSH shell on remote server, if any.\n\nLeave blank for autodetect.", Advanced: true, Examples: []fs.OptionExample{ { Value: shellTypeNotSupported, Help: "No shell access", }, { Value: "unix", Help: "Unix shell", }, { Value: "powershell", Help: "PowerShell", }, { Value: "cmd", Help: "Windows Command Prompt", }, }, }, { Name: "md5sum_command", Default: "", Help: "The command used to read md5 hashes.\n\nLeave blank for autodetect.", Advanced: true, }, { Name: "sha1sum_command", Default: "", Help: "The command used to read sha1 hashes.\n\nLeave blank for autodetect.", Advanced: true, }, { Name: "skip_links", Default: false, Help: "Set to skip any symlinks and any other non regular files.", Advanced: true, }, { Name: "subsystem", Default: "sftp", Help: "Specifies the SSH2 subsystem on the remote host.", Advanced: true, }, { Name: "server_command", Default: "", Help: `Specifies the path or command to run a sftp server on the remote host. The subsystem option is ignored when server_command is defined.`, Advanced: true, }, { Name: "use_fstat", Default: false, Help: `If set use fstat instead of stat. Some servers limit the amount of open files and calling Stat after opening the file will throw an error from the server. Setting this flag will call Fstat instead of Stat which is called on an already open file handle. It has been found that this helps with IBM Sterling SFTP servers which have "extractability" level set to 1 which means only 1 file can be opened at any given time. `, Advanced: true, }, { Name: "disable_concurrent_reads", Default: false, Help: `If set don't use concurrent reads. Normally concurrent reads are safe to use and not using them will degrade performance, so this option is disabled by default. Some servers limit the amount number of times a file can be downloaded. Using concurrent reads can trigger this limit, so if you have a server which returns Failed to copy: file does not exist Then you may need to enable this flag. If concurrent reads are disabled, the use_fstat option is ignored. `, Advanced: true, }, { Name: "disable_concurrent_writes", Default: false, Help: `If set don't use concurrent writes. Normally rclone uses concurrent writes to upload files. This improves the performance greatly, especially for distant servers. This option disables concurrent writes should that be necessary. `, Advanced: true, }, { Name: "idle_timeout", Default: fs.Duration(60 * time.Second), Help: `Max time before closing idle connections. If no connections have been returned to the connection pool in the time given, rclone will empty the connection pool. Set to 0 to keep connections indefinitely. `, Advanced: true, }, { Name: "chunk_size", Help: `Upload and download chunk size. This controls the maximum packet size used in the SFTP protocol. The RFC limits this to 32768 bytes (32k), however a lot of servers support larger sizes and setting it larger will increase transfer speed dramatically on high latency links. Only use a setting higher than 32k if you always connect to the same server or after sufficiently broad testing. For example using the value of 252k with OpenSSH works well with its maximum packet size of 256k. If you get the error "failed to send packet header: EOF" when copying a large file, try lowering this number. `, Default: 32 * fs.Kibi, Advanced: true, }, { Name: "concurrency", Help: `The maximum number of outstanding requests for one file This controls the maximum number of outstanding requests for one file. Increasing it will increase throughput on high latency links at the cost of using more memory. `, Default: 64, Advanced: true, }, { Name: "set_env", Default: fs.SpaceSepList{}, Help: `Environment variables to pass to sftp and commands Set environment variables in the form: VAR=value to be passed to the sftp client and to any commands run (eg md5sum). Pass multiple variables space separated, eg VAR1=value VAR2=value and pass variables with spaces in in quotes, eg "VAR3=value with space" "VAR4=value with space" VAR5=nospacehere `, Advanced: true, }}, } fs.Register(fsi) } // Options defines the configuration for this backend type Options struct { Host string `config:"host"` User string `config:"user"` Port string `config:"port"` Pass string `config:"pass"` KeyPem string `config:"key_pem"` KeyFile string `config:"key_file"` KeyFilePass string `config:"key_file_pass"` PubKeyFile string `config:"pubkey_file"` KnownHostsFile string `config:"known_hosts_file"` KeyUseAgent bool `config:"key_use_agent"` UseInsecureCipher bool `config:"use_insecure_cipher"` DisableHashCheck bool `config:"disable_hashcheck"` AskPassword bool `config:"ask_password"` PathOverride string `config:"path_override"` SetModTime bool `config:"set_modtime"` ShellType string `config:"shell_type"` Md5sumCommand string `config:"md5sum_command"` Sha1sumCommand string `config:"sha1sum_command"` SkipLinks bool `config:"skip_links"` Subsystem string `config:"subsystem"` ServerCommand string `config:"server_command"` UseFstat bool `config:"use_fstat"` DisableConcurrentReads bool `config:"disable_concurrent_reads"` DisableConcurrentWrites bool `config:"disable_concurrent_writes"` IdleTimeout fs.Duration `config:"idle_timeout"` ChunkSize fs.SizeSuffix `config:"chunk_size"` Concurrency int `config:"concurrency"` SetEnv fs.SpaceSepList `config:"set_env"` } // Fs stores the interface to the remote SFTP files type Fs struct { name string root string absRoot string shellRoot string shellType string opt Options // parsed options ci *fs.ConfigInfo // global config m configmap.Mapper // config features *fs.Features // optional features config *ssh.ClientConfig url string mkdirLock *stringLock cachedHashes *hash.Set poolMu sync.Mutex pool []*conn drain *time.Timer // used to drain the pool when we stop using the connections pacer *fs.Pacer // pacer for operations savedpswd string sessions int32 // count in use sessions } // Object is a remote SFTP file that has been stat'd (so it exists, but is not necessarily open for reading) type Object struct { fs *Fs remote string size int64 // size of the object modTime time.Time // modification time of the object mode os.FileMode // mode bits from the file md5sum *string // Cached MD5 checksum sha1sum *string // Cached SHA1 checksum } // dial starts a client connection to the given SSH server. It is a // convenience function that connects to the given network address, // initiates the SSH handshake, and then sets up a Client. func (f *Fs) dial(ctx context.Context, network, addr string, sshConfig *ssh.ClientConfig) (*ssh.Client, error) { dialer := fshttp.NewDialer(ctx) conn, err := dialer.Dial(network, addr) if err != nil { return nil, err } c, chans, reqs, err := ssh.NewClientConn(conn, addr, sshConfig) if err != nil { return nil, err } fs.Debugf(f, "New connection %s->%s to %q", c.LocalAddr(), c.RemoteAddr(), c.ServerVersion()) return ssh.NewClient(c, chans, reqs), nil } // conn encapsulates an ssh client and corresponding sftp client type conn struct { sshClient *ssh.Client sftpClient *sftp.Client err chan error } // Wait for connection to close func (c *conn) wait() { c.err <- c.sshClient.Conn.Wait() } // Send a keepalive over the ssh connection func (c *conn) sendKeepAlive() { _, _, err := c.sshClient.SendRequest("keepalive@openssh.com", true, nil) if err != nil { fs.Debugf(nil, "Failed to send keep alive: %v", err) } } // Send keepalives every interval over the ssh connection until done is closed func (c *conn) sendKeepAlives(interval time.Duration) (done chan struct{}) { done = make(chan struct{}) go func() { t := time.NewTicker(interval) defer t.Stop() for { select { case <-t.C: c.sendKeepAlive() case <-done: return } } }() return done } // Closes the connection func (c *conn) close() error { sftpErr := c.sftpClient.Close() sshErr := c.sshClient.Close() if sftpErr != nil { return sftpErr } return sshErr } // Returns an error if closed func (c *conn) closed() error { select { case err := <-c.err: return err default: } return nil } // Show that we are using an ssh session // // Call removeSession() when done func (f *Fs) addSession() { atomic.AddInt32(&f.sessions, 1) } // Show the ssh session is no longer in use func (f *Fs) removeSession() { atomic.AddInt32(&f.sessions, -1) } // getSessions shows whether there are any sessions in use func (f *Fs) getSessions() int32 { return atomic.LoadInt32(&f.sessions) } // Open a new connection to the SFTP server. func (f *Fs) sftpConnection(ctx context.Context) (c *conn, err error) { // Rate limit rate of new connections c = &conn{ err: make(chan error, 1), } c.sshClient, err = f.dial(ctx, "tcp", f.opt.Host+":"+f.opt.Port, f.config) if err != nil { return nil, fmt.Errorf("couldn't connect SSH: %w", err) } c.sftpClient, err = f.newSftpClient(c.sshClient) if err != nil { _ = c.sshClient.Close() return nil, fmt.Errorf("couldn't initialise SFTP: %w", err) } go c.wait() return c, nil } // Set any environment variables on the ssh.Session func (f *Fs) setEnv(s *ssh.Session) error { for _, env := range f.opt.SetEnv { equal := strings.IndexRune(env, '=') if equal < 0 { return fmt.Errorf("no = found in env var %q", env) } // fs.Debugf(f, "Setting env %q = %q", env[:equal], env[equal+1:]) err := s.Setenv(env[:equal], env[equal+1:]) if err != nil { return fmt.Errorf("failed to set env var %q: %w", env[:equal], err) } } return nil } // Creates a new SFTP client on conn, using the specified subsystem // or sftp server, and zero or more option functions func (f *Fs) newSftpClient(conn *ssh.Client, opts ...sftp.ClientOption) (*sftp.Client, error) { s, err := conn.NewSession() if err != nil { return nil, err } err = f.setEnv(s) if err != nil { return nil, err } pw, err := s.StdinPipe() if err != nil { return nil, err } pr, err := s.StdoutPipe() if err != nil { return nil, err } if f.opt.ServerCommand != "" { if err := s.Start(f.opt.ServerCommand); err != nil { return nil, err } } else { if err := s.RequestSubsystem(f.opt.Subsystem); err != nil { return nil, err } } opts = opts[:len(opts):len(opts)] // make sure we don't overwrite the callers opts opts = append(opts, sftp.UseFstat(f.opt.UseFstat), sftp.UseConcurrentReads(!f.opt.DisableConcurrentReads), sftp.UseConcurrentWrites(!f.opt.DisableConcurrentWrites), sftp.MaxPacketUnchecked(int(f.opt.ChunkSize)), sftp.MaxConcurrentRequestsPerFile(f.opt.Concurrency), ) return sftp.NewClientPipe(pr, pw, opts...) } // Get an SFTP connection from the pool, or open a new one func (f *Fs) getSftpConnection(ctx context.Context) (c *conn, err error) { accounting.LimitTPS(ctx) f.poolMu.Lock() for len(f.pool) > 0 { c = f.pool[0] f.pool = f.pool[1:] err := c.closed() if err == nil { break } fs.Errorf(f, "Discarding closed SSH connection: %v", err) c = nil } f.poolMu.Unlock() if c != nil { return c, nil } err = f.pacer.Call(func() (bool, error) { c, err = f.sftpConnection(ctx) if err != nil { return true, err } return false, nil }) return c, err } // Return an SFTP connection to the pool // // It nils the pointed to connection out so it can't be reused // // if err is not nil then it checks the connection is alive using a // Getwd request func (f *Fs) putSftpConnection(pc **conn, err error) { c := *pc *pc = nil if err != nil { // work out if this is an expected error isRegularError := false var statusErr *sftp.StatusError var pathErr *os.PathError switch { case errors.Is(err, os.ErrNotExist): isRegularError = true case errors.As(err, &statusErr): isRegularError = true case errors.As(err, &pathErr): isRegularError = true } // If not a regular SFTP error code then check the connection if !isRegularError { _, nopErr := c.sftpClient.Getwd() if nopErr != nil { fs.Debugf(f, "Connection failed, closing: %v", nopErr) _ = c.close() return } fs.Debugf(f, "Connection OK after error: %v", err) } } f.poolMu.Lock() f.pool = append(f.pool, c) if f.opt.IdleTimeout > 0 { f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer } f.poolMu.Unlock() } // Drain the pool of any connections func (f *Fs) drainPool(ctx context.Context) (err error) { f.poolMu.Lock() defer f.poolMu.Unlock() if sessions := f.getSessions(); sessions != 0 { fs.Debugf(f, "Not closing %d unused connections as %d sessions active", len(f.pool), sessions) if f.opt.IdleTimeout > 0 { f.drain.Reset(time.Duration(f.opt.IdleTimeout)) // nudge on the pool emptying timer } return nil } if f.opt.IdleTimeout > 0 { f.drain.Stop() } if len(f.pool) != 0 { fs.Debugf(f, "Closing %d unused connections", len(f.pool)) } for i, c := range f.pool { if cErr := c.closed(); cErr == nil { cErr = c.close() if cErr != nil { err = cErr } } f.pool[i] = nil } f.pool = nil return err } // NewFs creates a new Fs object from the name and root. It connects to // the host specified in the config file. func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) { // This will hold the Fs object. We need to create it here // so we can refer to it in the SSH callback, but it's populated // in NewFsWithConnection f := &Fs{ ci: fs.GetConfig(ctx), } // Parse config into Options struct opt := new(Options) err := configstruct.Set(m, opt) if err != nil { return nil, err } if opt.User == "" { opt.User = currentUser } if opt.Port == "" { opt.Port = "22" } sshConfig := &ssh.ClientConfig{ User: opt.User, Auth: []ssh.AuthMethod{}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: f.ci.ConnectTimeout, ClientVersion: "SSH-2.0-" + f.ci.UserAgent, } if opt.KnownHostsFile != "" { hostcallback, err := knownhosts.New(env.ShellExpand(opt.KnownHostsFile)) if err != nil { return nil, fmt.Errorf("couldn't parse known_hosts_file: %w", err) } sshConfig.HostKeyCallback = hostcallback } if opt.UseInsecureCipher { sshConfig.Config.SetDefaults() sshConfig.Config.Ciphers = append(sshConfig.Config.Ciphers, "aes128-cbc", "aes192-cbc", "aes256-cbc", "3des-cbc") sshConfig.Config.KeyExchanges = append(sshConfig.Config.KeyExchanges, "diffie-hellman-group-exchange-sha1", "diffie-hellman-group-exchange-sha256") } keyFile := env.ShellExpand(opt.KeyFile) pubkeyFile := env.ShellExpand(opt.PubKeyFile) //keyPem := env.ShellExpand(opt.KeyPem) // Add ssh agent-auth if no password or file or key PEM specified if (opt.Pass == "" && keyFile == "" && !opt.AskPassword && opt.KeyPem == "") || opt.KeyUseAgent { sshAgentClient, _, err := sshagent.New() if err != nil { return nil, fmt.Errorf("couldn't connect to ssh-agent: %w", err) } signers, err := sshAgentClient.Signers() if err != nil { return nil, fmt.Errorf("couldn't read ssh agent signers: %w", err) } if keyFile != "" { pubBytes, err := ioutil.ReadFile(keyFile + ".pub") if err != nil { return nil, fmt.Errorf("failed to read public key file: %w", err) } pub, _, _, _, err := ssh.ParseAuthorizedKey(pubBytes) if err != nil { return nil, fmt.Errorf("failed to parse public key file: %w", err) } pubM := pub.Marshal() found := false for _, s := range signers { if bytes.Equal(pubM, s.PublicKey().Marshal()) { sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(s)) found = true break } } if !found { return nil, errors.New("private key not found in the ssh-agent") } } else { sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signers...)) } } // Load key file if specified if keyFile != "" || opt.KeyPem != "" { var key []byte if opt.KeyPem == "" { key, err = ioutil.ReadFile(keyFile) if err != nil { return nil, fmt.Errorf("failed to read private key file: %w", err) } } else { // wrap in quotes because the config is a coming as a literal without them. opt.KeyPem, err = strconv.Unquote("\"" + opt.KeyPem + "\"") if err != nil { return nil, fmt.Errorf("pem key not formatted properly: %w", err) } key = []byte(opt.KeyPem) } clearpass := "" if opt.KeyFilePass != "" { clearpass, err = obscure.Reveal(opt.KeyFilePass) if err != nil { return nil, err } } var signer ssh.Signer if clearpass == "" { signer, err = ssh.ParsePrivateKey(key) } else { signer, err = ssh.ParsePrivateKeyWithPassphrase(key, []byte(clearpass)) } if err != nil { return nil, fmt.Errorf("failed to parse private key file: %w", err) } // If a public key has been specified then use that if pubkeyFile != "" { certfile, err := ioutil.ReadFile(pubkeyFile) if err != nil { return nil, fmt.Errorf("unable to read cert file: %w", err) } pk, _, _, _, err := ssh.ParseAuthorizedKey(certfile) if err != nil { return nil, fmt.Errorf("unable to parse cert file: %w", err) } // And the signer for this, which includes the private key signer // This is what we'll pass to the ssh client. // Normally the ssh client will use the public key built // into the private key, but we need to tell it to use the user // specified public key cert. This signer is specific to the // cert and will include the private key signer. Now ssh // knows everything it needs. cert, ok := pk.(*ssh.Certificate) if !ok { return nil, errors.New("public key file is not a certificate file: " + pubkeyFile) } pubsigner, err := ssh.NewCertSigner(cert, signer) if err != nil { return nil, fmt.Errorf("error generating cert signer: %w", err) } sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(pubsigner)) } else { sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signer)) } } // Auth from password if specified if opt.Pass != "" { clearpass, err := obscure.Reveal(opt.Pass) if err != nil { return nil, err } sshConfig.Auth = append(sshConfig.Auth, ssh.Password(clearpass), ssh.KeyboardInteractive(func(user, instruction string, questions []string, echos []bool) ([]string, error) { return f.keyboardInteractiveReponse(user, instruction, questions, echos, clearpass) }), ) } // Config for password if none was defined and we're allowed to // We don't ask now; we ask if the ssh connection succeeds if opt.Pass == "" && opt.AskPassword { sshConfig.Auth = append(sshConfig.Auth, ssh.PasswordCallback(f.getPass), ssh.KeyboardInteractive(func(user, instruction string, questions []string, echos []bool) ([]string, error) { pass, _ := f.getPass() return f.keyboardInteractiveReponse(user, instruction, questions, echos, pass) }), ) } return NewFsWithConnection(ctx, f, name, root, m, opt, sshConfig) } // Do the keyboard interactive challenge // // Just send the password back for all questions func (f *Fs) keyboardInteractiveReponse(user, instruction string, questions []string, echos []bool, pass string) ([]string, error) { fs.Debugf(f, "Keyboard interactive auth requested") answers := make([]string, len(questions)) for i := range answers { answers[i] = pass } return answers, nil } // If we're in password mode and ssh connection succeeds then this // callback is called. First time around we ask the user, and then // save it so on reconnection we give back the previous string. // This removes the ability to let the user correct a mistaken entry, // but means that reconnects are transparent. // We'll re-use config.Pass for this, 'cos we know it's not been // specified. func (f *Fs) getPass() (string, error) { for f.savedpswd == "" { _, _ = fmt.Fprint(os.Stderr, "Enter SFTP password: ") f.savedpswd = config.ReadPassword() } return f.savedpswd, nil } // NewFsWithConnection creates a new Fs object from the name and root and an ssh.ClientConfig. It connects to // the host specified in the ssh.ClientConfig func NewFsWithConnection(ctx context.Context, f *Fs, name string, root string, m configmap.Mapper, opt *Options, sshConfig *ssh.ClientConfig) (fs.Fs, error) { // Populate the Filesystem Object f.name = name f.root = root f.absRoot = root f.shellRoot = root f.opt = *opt f.m = m f.config = sshConfig f.url = "sftp://" + opt.User + "@" + opt.Host + ":" + opt.Port + "/" + root f.mkdirLock = newStringLock() f.pacer = fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))) f.savedpswd = "" // set the pool drainer timer going if f.opt.IdleTimeout > 0 { f.drain = time.AfterFunc(time.Duration(f.opt.IdleTimeout), func() { _ = f.drainPool(ctx) }) } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, SlowHash: true, }).Fill(ctx, f) // Make a connection and pool it to return errors early c, err := f.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("NewFs: %w", err) } // Check remote shell type, try to auto-detect if not configured and save to config for later if f.opt.ShellType != "" { f.shellType = f.opt.ShellType fs.Debugf(f, "Shell type %q from config", f.shellType) } else { session, err := c.sshClient.NewSession() if err != nil { f.shellType = shellTypeNotSupported fs.Debugf(f, "Failed to get shell session for shell type detection command: %v", err) } else { var stdout, stderr bytes.Buffer session.Stdout = &stdout session.Stderr = &stderr shellCmd := "echo ${ShellId}%ComSpec%" fs.Debugf(f, "Running shell type detection remote command: %s", shellCmd) err = session.Run(shellCmd) _ = session.Close() if err != nil { f.shellType = defaultShellType fs.Debugf(f, "Remote command failed: %v (stdout=%v) (stderr=%v)", err, bytes.TrimSpace(stdout.Bytes()), bytes.TrimSpace(stderr.Bytes())) } else { outBytes := stdout.Bytes() fs.Debugf(f, "Remote command result: %s", outBytes) outString := string(bytes.TrimSpace(stdout.Bytes())) if strings.HasPrefix(outString, "Microsoft.PowerShell") { // If PowerShell: "Microsoft.PowerShell%ComSpec%" f.shellType = "powershell" } else if !strings.HasSuffix(outString, "%ComSpec%") { // If Command Prompt: "${ShellId}C:\WINDOWS\system32\cmd.exe" f.shellType = "cmd" } else { // If Unix: "%ComSpec%" f.shellType = "unix" } } } // Save permanently in config to avoid the extra work next time fs.Debugf(f, "Shell type %q detected (set option shell_type to override)", f.shellType) f.m.Set("shell_type", f.shellType) } // Ensure we have absolute path to root // It appears that WS FTP doesn't like relative paths, // and the openssh sftp tool also uses absolute paths. if !path.IsAbs(f.root) { path, err := c.sftpClient.RealPath(f.root) if err != nil { fs.Debugf(f, "Failed to resolve path - using relative paths: %v", err) } else { f.absRoot = path fs.Debugf(f, "Relative path resolved to %q", f.absRoot) } } f.putSftpConnection(&c, err) if root != "" { // Check to see if the root is actually an existing file, // and if so change the filesystem root to its parent directory. oldAbsRoot := f.absRoot remote := path.Base(root) f.root = path.Dir(root) f.absRoot = path.Dir(f.absRoot) if f.root == "." { f.root = "" } _, err = f.NewObject(ctx, remote) if err != nil { if err != fs.ErrorObjectNotFound && err != fs.ErrorIsDir { return nil, err } // File doesn't exist so keep the old f f.root = root f.absRoot = oldAbsRoot err = nil } else { // File exists so change fs to point to the parent and return it with an error err = fs.ErrorIsFile } } else { err = nil } fs.Debugf(f, "Using root directory %q", f.absRoot) return f, err } // Name returns the configured name of the file system func (f *Fs) Name() string { return f.name } // Root returns the root for the filesystem func (f *Fs) Root() string { return f.root } // String returns the URL for the filesystem func (f *Fs) String() string { return f.url } // Features returns the optional features of this Fs func (f *Fs) Features() *fs.Features { return f.features } // Precision is the remote sftp file system's modtime precision, which we have no way of knowing. We estimate at 1s func (f *Fs) Precision() time.Duration { return time.Second } // NewObject creates a new remote sftp file object func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { o := &Object{ fs: f, remote: remote, } err := o.stat(ctx) if err != nil { return nil, err } return o, nil } // dirExists returns true,nil if the directory exists, false, nil if // it doesn't or false, err func (f *Fs) dirExists(ctx context.Context, dir string) (bool, error) { if dir == "" { dir = "." } c, err := f.getSftpConnection(ctx) if err != nil { return false, fmt.Errorf("dirExists: %w", err) } info, err := c.sftpClient.Stat(dir) f.putSftpConnection(&c, err) if err != nil { if os.IsNotExist(err) { return false, nil } return false, fmt.Errorf("dirExists stat failed: %w", err) } if !info.IsDir() { return false, fs.ErrorIsFile } return true, nil } // List the objects and directories in dir into entries. The // entries can be returned in any order but should be for a // complete directory. // // dir should be "" to list the root, and should not have // trailing slashes. // // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { root := path.Join(f.absRoot, dir) ok, err := f.dirExists(ctx, root) if err != nil { return nil, fmt.Errorf("List failed: %w", err) } if !ok { return nil, fs.ErrorDirNotFound } sftpDir := root if sftpDir == "" { sftpDir = "." } c, err := f.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("List: %w", err) } infos, err := c.sftpClient.ReadDir(sftpDir) f.putSftpConnection(&c, err) if err != nil { return nil, fmt.Errorf("error listing %q: %w", dir, err) } for _, info := range infos { remote := path.Join(dir, info.Name()) // If file is a symlink (not a regular file is the best cross platform test we can do), do a stat to // pick up the size and type of the destination, instead of the size and type of the symlink. if !info.Mode().IsRegular() && !info.IsDir() { if f.opt.SkipLinks { // skip non regular file if SkipLinks is set continue } oldInfo := info info, err = f.stat(ctx, remote) if err != nil { if !os.IsNotExist(err) { fs.Errorf(remote, "stat of non-regular file failed: %v", err) } info = oldInfo } } if info.IsDir() { d := fs.NewDir(remote, info.ModTime()) entries = append(entries, d) } else { o := &Object{ fs: f, remote: remote, } o.setMetadata(info) entries = append(entries, o) } } return entries, nil } // Put data from into a new remote sftp file object described by and func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { err := f.mkParentDir(ctx, src.Remote()) if err != nil { return nil, fmt.Errorf("Put mkParentDir failed: %w", err) } // Temporary object under construction o := &Object{ fs: f, remote: src.Remote(), } err = o.Update(ctx, in, src, options...) if err != nil { return nil, err } return o, nil } // PutStream uploads to the remote path with the modTime given of indeterminate size func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { return f.Put(ctx, in, src, options...) } // mkParentDir makes the parent of remote if necessary and any // directories above that func (f *Fs) mkParentDir(ctx context.Context, remote string) error { parent := path.Dir(remote) return f.mkdir(ctx, path.Join(f.absRoot, parent)) } // mkdir makes the directory and parents using native paths func (f *Fs) mkdir(ctx context.Context, dirPath string) error { f.mkdirLock.Lock(dirPath) defer f.mkdirLock.Unlock(dirPath) if dirPath == "." || dirPath == "/" { return nil } ok, err := f.dirExists(ctx, dirPath) if err != nil { return fmt.Errorf("mkdir dirExists failed: %w", err) } if ok { return nil } parent := path.Dir(dirPath) err = f.mkdir(ctx, parent) if err != nil { return err } c, err := f.getSftpConnection(ctx) if err != nil { return fmt.Errorf("mkdir: %w", err) } err = c.sftpClient.Mkdir(dirPath) f.putSftpConnection(&c, err) if err != nil { return fmt.Errorf("mkdir %q failed: %w", dirPath, err) } return nil } // Mkdir makes the root directory of the Fs object func (f *Fs) Mkdir(ctx context.Context, dir string) error { root := path.Join(f.absRoot, dir) return f.mkdir(ctx, root) } // Rmdir removes the root directory of the Fs object func (f *Fs) Rmdir(ctx context.Context, dir string) error { // Check to see if directory is empty as some servers will // delete recursively with RemoveDirectory entries, err := f.List(ctx, dir) if err != nil { return fmt.Errorf("Rmdir: %w", err) } if len(entries) != 0 { return fs.ErrorDirectoryNotEmpty } // Remove the directory root := path.Join(f.absRoot, dir) c, err := f.getSftpConnection(ctx) if err != nil { return fmt.Errorf("Rmdir: %w", err) } err = c.sftpClient.RemoveDirectory(root) f.putSftpConnection(&c, err) return err } // Move renames a remote sftp file object func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { srcObj, ok := src.(*Object) if !ok { fs.Debugf(src, "Can't move - not same remote type") return nil, fs.ErrorCantMove } err := f.mkParentDir(ctx, remote) if err != nil { return nil, fmt.Errorf("Move mkParentDir failed: %w", err) } c, err := f.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("Move: %w", err) } err = c.sftpClient.Rename( srcObj.path(), path.Join(f.absRoot, remote), ) f.putSftpConnection(&c, err) if err != nil { return nil, fmt.Errorf("Move Rename failed: %w", err) } dstObj, err := f.NewObject(ctx, remote) if err != nil { return nil, fmt.Errorf("Move NewObject failed: %w", err) } return dstObj, nil } // DirMove moves src, srcRemote to this remote at dstRemote // using server-side move operations. // // Will only be called if src.Fs().Name() == f.Name() // // If it isn't possible then return fs.ErrorCantDirMove // // If destination exists then return fs.ErrorDirExists func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { srcFs, ok := src.(*Fs) if !ok { fs.Debugf(srcFs, "Can't move directory - not same remote type") return fs.ErrorCantDirMove } srcPath := path.Join(srcFs.absRoot, srcRemote) dstPath := path.Join(f.absRoot, dstRemote) // Check if destination exists ok, err := f.dirExists(ctx, dstPath) if err != nil { return fmt.Errorf("DirMove dirExists dst failed: %w", err) } if ok { return fs.ErrorDirExists } // Make sure the parent directory exists err = f.mkdir(ctx, path.Dir(dstPath)) if err != nil { return fmt.Errorf("DirMove mkParentDir dst failed: %w", err) } // Do the move c, err := f.getSftpConnection(ctx) if err != nil { return fmt.Errorf("DirMove: %w", err) } err = c.sftpClient.Rename( srcPath, dstPath, ) f.putSftpConnection(&c, err) if err != nil { return fmt.Errorf("DirMove Rename(%q,%q) failed: %w", srcPath, dstPath, err) } return nil } // run runds cmd on the remote end returning standard output func (f *Fs) run(ctx context.Context, cmd string) ([]byte, error) { f.addSession() // Show session in use defer f.removeSession() c, err := f.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("run: get SFTP connection: %w", err) } defer f.putSftpConnection(&c, err) // Send keepalives while the connection is open defer close(c.sendKeepAlives(keepAliveInterval)) session, err := c.sshClient.NewSession() if err != nil { return nil, fmt.Errorf("run: get SFTP session: %w", err) } err = f.setEnv(session) if err != nil { return nil, err } defer func() { _ = session.Close() }() var stdout, stderr bytes.Buffer session.Stdout = &stdout session.Stderr = &stderr fs.Debugf(f, "Running remote command: %s", cmd) err = session.Run(cmd) if err != nil { return nil, fmt.Errorf("failed to run %q: %s: %w", cmd, bytes.TrimSpace(stderr.Bytes()), err) } fs.Debugf(f, "Remote command result: %s", bytes.TrimSpace(stdout.Bytes())) return stdout.Bytes(), nil } // Hashes returns the supported hash types of the filesystem func (f *Fs) Hashes() hash.Set { ctx := context.TODO() if f.cachedHashes != nil { return *f.cachedHashes } hashSet := hash.NewHashSet() f.cachedHashes = &hashSet if f.opt.DisableHashCheck || f.shellType == shellTypeNotSupported { return hashSet } // look for a hash command which works checkHash := func(hashType hash.Type, commands []struct{ hashFile, hashEmpty string }, expected string, hashCommand *string, changed *bool) bool { if *hashCommand == hashCommandNotSupported { return false } if *hashCommand != "" { return true } fs.Debugf(f, "Checking default %v hash commands", hashType) *changed = true for _, command := range commands { output, err := f.run(ctx, command.hashEmpty) if err != nil { fs.Debugf(f, "Hash command skipped: %v", err) continue } output = bytes.TrimSpace(output) if parseHash(output) == expected { *hashCommand = command.hashFile fs.Debugf(f, "Hash command accepted") return true } fs.Debugf(f, "Hash command skipped: Wrong output") } *hashCommand = hashCommandNotSupported return false } changed := false md5Commands := []struct { hashFile, hashEmpty string }{ {"md5sum", "md5sum"}, {"md5 -r", "md5 -r"}, {"rclone md5sum", "rclone md5sum"}, } sha1Commands := []struct { hashFile, hashEmpty string }{ {"sha1sum", "sha1sum"}, {"sha1 -r", "sha1 -r"}, {"rclone sha1sum", "rclone sha1sum"}, } if f.shellType == "powershell" { md5Commands = append(md5Commands, struct { hashFile, hashEmpty string }{ "&{param($Path);Get-FileHash -Algorithm MD5 -LiteralPath $Path -ErrorAction Stop|Select-Object -First 1 -ExpandProperty Hash|ForEach-Object{\"$($_.ToLower()) ${Path}\"}}", "Get-FileHash -Algorithm MD5 -InputStream ([System.IO.MemoryStream]::new()) -ErrorAction Stop|Select-Object -First 1 -ExpandProperty Hash|ForEach-Object{$_.ToLower()}", }) sha1Commands = append(sha1Commands, struct { hashFile, hashEmpty string }{ "&{param($Path);Get-FileHash -Algorithm SHA1 -LiteralPath $Path -ErrorAction Stop|Select-Object -First 1 -ExpandProperty Hash|ForEach-Object{\"$($_.ToLower()) ${Path}\"}}", "Get-FileHash -Algorithm SHA1 -InputStream ([System.IO.MemoryStream]::new()) -ErrorAction Stop|Select-Object -First 1 -ExpandProperty Hash|ForEach-Object{$_.ToLower()}", }) } md5Works := checkHash(hash.MD5, md5Commands, "d41d8cd98f00b204e9800998ecf8427e", &f.opt.Md5sumCommand, &changed) sha1Works := checkHash(hash.SHA1, sha1Commands, "da39a3ee5e6b4b0d3255bfef95601890afd80709", &f.opt.Sha1sumCommand, &changed) if changed { // Save permanently in config to avoid the extra work next time fs.Debugf(f, "Setting hash command for %v to %q (set sha1sum_command to override)", hash.MD5, f.opt.Md5sumCommand) f.m.Set("md5sum_command", f.opt.Md5sumCommand) fs.Debugf(f, "Setting hash command for %v to %q (set md5sum_command to override)", hash.SHA1, f.opt.Sha1sumCommand) f.m.Set("sha1sum_command", f.opt.Sha1sumCommand) } if sha1Works { hashSet.Add(hash.SHA1) } if md5Works { hashSet.Add(hash.MD5) } return hashSet } // About gets usage stats func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { // If server implements the vendor-specific VFS statistics extension prefer that // (OpenSSH implements it on using syscall.Statfs on Linux and API function GetDiskFreeSpace on Windows) c, err := f.getSftpConnection(ctx) if err != nil { return nil, err } var vfsStats *sftp.StatVFS if _, found := c.sftpClient.HasExtension("statvfs@openssh.com"); found { fs.Debugf(f, "Server has VFS statistics extension") aboutPath := f.absRoot if aboutPath == "" { aboutPath = "/" } fs.Debugf(f, "About path %q", aboutPath) vfsStats, err = c.sftpClient.StatVFS(aboutPath) } f.putSftpConnection(&c, err) // Return to pool asap, if running shell command below it will be re-used if vfsStats != nil { total := vfsStats.TotalSpace() free := vfsStats.FreeSpace() used := total - free return &fs.Usage{ Total: fs.NewUsageValue(int64(total)), Used: fs.NewUsageValue(int64(used)), Free: fs.NewUsageValue(int64(free)), }, nil } else if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, err } fs.Debugf(f, "Failed to retrieve VFS statistics, trying shell command instead: %v", err) } else { fs.Debugf(f, "Server does not have the VFS statistics extension, trying shell command instead") } // Fall back to shell command method if possible if f.shellType == shellTypeNotSupported || f.shellType == "cmd" { fs.Debugf(f, "About shell command is not available for shell type %q (set option shell_type to override)", f.shellType) return nil, fmt.Errorf("not supported with shell type %q", f.shellType) } aboutShellPath := f.remoteShellPath("") if aboutShellPath == "" { aboutShellPath = "/" } fs.Debugf(f, "About path %q", aboutShellPath) aboutShellPathArg, err := f.quoteOrEscapeShellPath(aboutShellPath) if err != nil { return nil, err } // PowerShell if f.shellType == "powershell" { shellCmd := "Get-Item " + aboutShellPathArg + " -ErrorAction Stop|Select-Object -First 1 -ExpandProperty PSDrive|ForEach-Object{\"$($_.Used) $($_.Free)\"}" fs.Debugf(f, "About using shell command for shell type %q", f.shellType) stdout, err := f.run(ctx, shellCmd) if err != nil { fs.Debugf(f, "About shell command for shell type %q failed (set option shell_type to override): %v", f.shellType, err) return nil, fmt.Errorf("powershell command failed: %w", err) } split := strings.Fields(string(stdout)) usage := &fs.Usage{} if len(split) == 2 { usedValue, usedErr := strconv.ParseInt(split[0], 10, 64) if usedErr == nil { usage.Used = fs.NewUsageValue(usedValue) } freeValue, freeErr := strconv.ParseInt(split[1], 10, 64) if freeErr == nil { usage.Free = fs.NewUsageValue(freeValue) if usedErr == nil { usage.Total = fs.NewUsageValue(usedValue + freeValue) } } } return usage, nil } // Unix/default shell shellCmd := "df -k " + aboutShellPathArg fs.Debugf(f, "About using shell command for shell type %q", f.shellType) stdout, err := f.run(ctx, shellCmd) if err != nil { fs.Debugf(f, "About shell command for shell type %q failed (set option shell_type to override): %v", f.shellType, err) return nil, fmt.Errorf("your remote may not have the required df utility: %w", err) } usageTotal, usageUsed, usageAvail := parseUsage(stdout) usage := &fs.Usage{} if usageTotal >= 0 { usage.Total = fs.NewUsageValue(usageTotal) } if usageUsed >= 0 { usage.Used = fs.NewUsageValue(usageUsed) } if usageAvail >= 0 { usage.Free = fs.NewUsageValue(usageAvail) } return usage, nil } // Shutdown the backend, closing any background tasks and any // cached connections. func (f *Fs) Shutdown(ctx context.Context) error { return f.drainPool(ctx) } // Fs is the filesystem this remote sftp file object is located within func (o *Object) Fs() fs.Info { return o.fs } // String returns the URL to the remote SFTP file func (o *Object) String() string { if o == nil { return "" } return o.remote } // Remote the name of the remote SFTP file, relative to the fs root func (o *Object) Remote() string { return o.remote } // Hash returns the selected checksum of the file // If no checksum is available it returns "" func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) { if o.fs.opt.DisableHashCheck { return "", nil } _ = o.fs.Hashes() var hashCmd string if r == hash.MD5 { if o.md5sum != nil { return *o.md5sum, nil } hashCmd = o.fs.opt.Md5sumCommand } else if r == hash.SHA1 { if o.sha1sum != nil { return *o.sha1sum, nil } hashCmd = o.fs.opt.Sha1sumCommand } else { return "", hash.ErrUnsupported } if hashCmd == "" || hashCmd == hashCommandNotSupported { return "", hash.ErrUnsupported } shellPathArg, err := o.fs.quoteOrEscapeShellPath(o.shellPath()) if err != nil { return "", fmt.Errorf("failed to calculate %v hash: %w", r, err) } outBytes, err := o.fs.run(ctx, hashCmd+" "+shellPathArg) if err != nil { return "", fmt.Errorf("failed to calculate %v hash: %w", r, err) } hashString := parseHash(outBytes) fs.Debugf(o, "Parsed hash: %s", hashString) if r == hash.MD5 { o.md5sum = &hashString } else if r == hash.SHA1 { o.sha1sum = &hashString } return hashString, nil } // quoteOrEscapeShellPath makes path a valid string argument in configured shell // and also ensures it cannot cause unintended behavior. func quoteOrEscapeShellPath(shellType string, shellPath string) (string, error) { // PowerShell if shellType == "powershell" { return "'" + strings.ReplaceAll(shellPath, "'", "''") + "'", nil } // Windows Command Prompt if shellType == "cmd" { if strings.Contains(shellPath, "\"") { return "", fmt.Errorf("path is not valid in shell type %s: %s", shellType, shellPath) } return "\"" + shellPath + "\"", nil } // Unix shell safe := unixShellEscapeRegex.ReplaceAllString(shellPath, `\$0`) return strings.ReplaceAll(safe, "\n", "'\n'"), nil } // quoteOrEscapeShellPath makes path a valid string argument in configured shell func (f *Fs) quoteOrEscapeShellPath(shellPath string) (string, error) { return quoteOrEscapeShellPath(f.shellType, shellPath) } // remotePath returns the native SFTP path of the file or directory at the remote given func (f *Fs) remotePath(remote string) string { return path.Join(f.absRoot, remote) } // remoteShellPath returns the SSH shell path of the file or directory at the remote given func (f *Fs) remoteShellPath(remote string) string { if f.opt.PathOverride != "" { shellPath := path.Join(f.opt.PathOverride, remote) fs.Debugf(f, "Shell path redirected to %q with option path_override", shellPath) return shellPath } shellPath := path.Join(f.absRoot, remote) if f.shellType == "powershell" || f.shellType == "cmd" { // If remote shell is powershell or cmd, then server is probably Windows. // The sftp package converts everything to POSIX paths: Forward slashes, and // absolute paths starts with a slash. An absolute path on a Windows server will // then look like this "/C:/Windows/System32". We must remove the "/" prefix // to make this a valid path for shell commands. In case of PowerShell there is a // possibility that it is a Unix server, with PowerShell Core shell, but assuming // root folders with names such as "C:" are rare, we just take this risk, // and option path_override can always be used to work around corner cases. if posixWinAbsPathRegex.MatchString(shellPath) { shellPath = strings.TrimPrefix(shellPath, "/") fs.Debugf(f, "Shell path adjusted to %q (set option path_override to override)", shellPath) return shellPath } } fs.Debugf(f, "Shell path %q", shellPath) return shellPath } // Converts a byte array from the SSH session returned by // an invocation of md5sum/sha1sum to a hash string // as expected by the rest of this application func parseHash(bytes []byte) string { // For strings with backslash *sum writes a leading \ // https://unix.stackexchange.com/q/313733/94054 return strings.ToLower(strings.Split(strings.TrimLeft(string(bytes), "\\"), " ")[0]) // Split at hash / filename separator / all convert to lowercase } // Parses the byte array output from the SSH session // returned by an invocation of df into // the disk size, used space, and available space on the disk, in that order. // Only works when `df` has output info on only one disk func parseUsage(bytes []byte) (spaceTotal int64, spaceUsed int64, spaceAvail int64) { spaceTotal, spaceUsed, spaceAvail = -1, -1, -1 lines := strings.Split(string(bytes), "\n") if len(lines) < 2 { return } split := strings.Fields(lines[1]) if len(split) < 6 { return } spaceTotal, err := strconv.ParseInt(split[1], 10, 64) if err != nil { spaceTotal = -1 } spaceUsed, err = strconv.ParseInt(split[2], 10, 64) if err != nil { spaceUsed = -1 } spaceAvail, err = strconv.ParseInt(split[3], 10, 64) if err != nil { spaceAvail = -1 } return spaceTotal * 1024, spaceUsed * 1024, spaceAvail * 1024 } // Size returns the size in bytes of the remote sftp file func (o *Object) Size() int64 { return o.size } // ModTime returns the modification time of the remote sftp file func (o *Object) ModTime(ctx context.Context) time.Time { return o.modTime } // path returns the native SFTP path of the object func (o *Object) path() string { return o.fs.remotePath(o.remote) } // shellPath returns the SSH shell path of the object func (o *Object) shellPath() string { return o.fs.remoteShellPath(o.remote) } // setMetadata updates the info in the object from the stat result passed in func (o *Object) setMetadata(info os.FileInfo) { o.modTime = info.ModTime() o.size = info.Size() o.mode = info.Mode() } // statRemote stats the file or directory at the remote given func (f *Fs) stat(ctx context.Context, remote string) (info os.FileInfo, err error) { c, err := f.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("stat: %w", err) } absPath := path.Join(f.absRoot, remote) info, err = c.sftpClient.Stat(absPath) f.putSftpConnection(&c, err) return info, err } // stat updates the info in the Object func (o *Object) stat(ctx context.Context) error { info, err := o.fs.stat(ctx, o.remote) if err != nil { if os.IsNotExist(err) { return fs.ErrorObjectNotFound } return fmt.Errorf("stat failed: %w", err) } if info.IsDir() { return fs.ErrorIsDir } o.setMetadata(info) return nil } // SetModTime sets the modification and access time to the specified time // // it also updates the info field func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { if !o.fs.opt.SetModTime { return nil } c, err := o.fs.getSftpConnection(ctx) if err != nil { return fmt.Errorf("SetModTime: %w", err) } err = c.sftpClient.Chtimes(o.path(), modTime, modTime) o.fs.putSftpConnection(&c, err) if err != nil { return fmt.Errorf("SetModTime failed: %w", err) } err = o.stat(ctx) if err != nil { return fmt.Errorf("SetModTime stat failed: %w", err) } return nil } // Storable returns whether the remote sftp file is a regular file (not a directory, symbolic link, block device, character device, named pipe, etc.) func (o *Object) Storable() bool { return o.mode.IsRegular() } // objectReader represents a file open for reading on the SFTP server type objectReader struct { f *Fs sftpFile *sftp.File pipeReader *io.PipeReader done chan struct{} } func (f *Fs) newObjectReader(sftpFile *sftp.File) *objectReader { pipeReader, pipeWriter := io.Pipe() file := &objectReader{ f: f, sftpFile: sftpFile, pipeReader: pipeReader, done: make(chan struct{}), } // Show connection in use f.addSession() go func() { // Use sftpFile.WriteTo to pump data so that it gets a // chance to build the window up. _, err := sftpFile.WriteTo(pipeWriter) // Close the pipeWriter so the pipeReader fails with // the same error or EOF if err == nil _ = pipeWriter.CloseWithError(err) // signal that we've finished close(file.done) }() return file } // Read from a remote sftp file object reader func (file *objectReader) Read(p []byte) (n int, err error) { n, err = file.pipeReader.Read(p) return n, err } // Close a reader of a remote sftp file func (file *objectReader) Close() (err error) { // Close the sftpFile - this will likely cause the WriteTo to error err = file.sftpFile.Close() // Close the pipeReader so writes to the pipeWriter fail _ = file.pipeReader.Close() // Wait for the background process to finish <-file.done // Show connection no longer in use file.f.removeSession() return err } // Open a remote sftp file object for reading. Seek is supported func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { var offset, limit int64 = 0, -1 for _, option := range options { switch x := option.(type) { case *fs.SeekOption: offset = x.Offset case *fs.RangeOption: offset, limit = x.Decode(o.Size()) default: if option.Mandatory() { fs.Logf(o, "Unsupported mandatory option: %v", option) } } } c, err := o.fs.getSftpConnection(ctx) if err != nil { return nil, fmt.Errorf("Open: %w", err) } sftpFile, err := c.sftpClient.Open(o.path()) o.fs.putSftpConnection(&c, err) if err != nil { return nil, fmt.Errorf("Open failed: %w", err) } if offset > 0 { off, err := sftpFile.Seek(offset, io.SeekStart) if err != nil || off != offset { return nil, fmt.Errorf("Open Seek failed: %w", err) } } in = readers.NewLimitedReadCloser(o.fs.newObjectReader(sftpFile), limit) return in, nil } type sizeReader struct { io.Reader size int64 } // Size returns the expected size of the stream // // It is used in sftpFile.ReadFrom as a hint to work out the // concurrency needed func (sr *sizeReader) Size() int64 { return sr.size } // Update a remote sftp file using the data and ModTime from func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { o.fs.addSession() // Show session in use defer o.fs.removeSession() // Clear the hash cache since we are about to update the object o.md5sum = nil o.sha1sum = nil c, err := o.fs.getSftpConnection(ctx) if err != nil { return fmt.Errorf("Update: %w", err) } file, err := c.sftpClient.OpenFile(o.path(), os.O_WRONLY|os.O_CREATE|os.O_TRUNC) o.fs.putSftpConnection(&c, err) if err != nil { return fmt.Errorf("Update Create failed: %w", err) } // remove the file if upload failed remove := func() { c, removeErr := o.fs.getSftpConnection(ctx) if removeErr != nil { fs.Debugf(src, "Failed to open new SSH connection for delete: %v", removeErr) return } removeErr = c.sftpClient.Remove(o.path()) o.fs.putSftpConnection(&c, removeErr) if removeErr != nil { fs.Debugf(src, "Failed to remove: %v", removeErr) } else { fs.Debugf(src, "Removed after failed upload: %v", err) } } _, err = file.ReadFrom(&sizeReader{Reader: in, size: src.Size()}) if err != nil { remove() return fmt.Errorf("Update ReadFrom failed: %w", err) } err = file.Close() if err != nil { remove() return fmt.Errorf("Update Close failed: %w", err) } // Set the mod time - this stats the object if o.fs.opt.SetModTime == true err = o.SetModTime(ctx, src.ModTime(ctx)) if err != nil { return fmt.Errorf("Update SetModTime failed: %w", err) } // Stat the file after the upload to read its stats back if o.fs.opt.SetModTime == false if !o.fs.opt.SetModTime { err = o.stat(ctx) if err == fs.ErrorObjectNotFound { // In the specific case of o.fs.opt.SetModTime == false // if the object wasn't found then don't return an error fs.Debugf(o, "Not found after upload with set_modtime=false so returning best guess") o.modTime = src.ModTime(ctx) o.size = src.Size() o.mode = os.FileMode(0666) // regular file } else if err != nil { return fmt.Errorf("Update stat failed: %w", err) } } return nil } // Remove a remote sftp file object func (o *Object) Remove(ctx context.Context) error { c, err := o.fs.getSftpConnection(ctx) if err != nil { return fmt.Errorf("Remove: %w", err) } err = c.sftpClient.Remove(o.path()) o.fs.putSftpConnection(&c, err) return err } // Check the interfaces are satisfied var ( _ fs.Fs = &Fs{} _ fs.PutStreamer = &Fs{} _ fs.Mover = &Fs{} _ fs.DirMover = &Fs{} _ fs.Abouter = &Fs{} _ fs.Shutdowner = &Fs{} _ fs.Object = &Object{} )