diff --git a/sshuttle/helpers.py b/sshuttle/helpers.py index 969228c..c1f4baf 100644 --- a/sshuttle/helpers.py +++ b/sshuttle/helpers.py @@ -2,7 +2,8 @@ import sys import socket import errno import os - +import threading +import traceback if sys.platform != "win32": import fcntl @@ -275,3 +276,47 @@ class RWPair: f.close() except Exception: pass + + +class SocketRWShim: + __slots__ = ('_r', '_w', '_on_end', '_s1', '_s2', '_t1', '_t2') + + def __init__(self, r, w, on_end=None) -> None: + self._r = r + self._w = w + self._on_end = on_end + + self._s1, self._s2 = socket.socketpair() + debug3("[SocketShim] r=%r w=%r | s1=%r s2=%r" % (self._r, self._w, self._s1, self._s2)) + + def stream_reader_to_sock(): + try: + for data in iter(lambda: self._r.read(16384), b''): + self._s1.sendall(data) + # debug3("[SocketRWShim] <<<<< r.read() %d %r..." % (len(data), data[:min(32, len(data))])) + except Exception: + traceback.print_exc(file=sys.stderr) + finally: + debug2("[SocketRWShim] Thread 'stream_reader_to_sock' exiting") + self._s1.close() + self._on_end and self._on_end() + + def stream_sock_to_writer(): + try: + for data in iter(lambda: self._s1.recv(16384), b''): + while data: + n = self._w.write(data) + data = data[n:] + # debug3("[SocketRWShim] <<<<< w.write() %d %r..." % (len(data), data[:min(32, len(data))])) + except Exception: + traceback.print_exc(file=sys.stderr) + finally: + debug2("[SocketRWShim] Thread 'stream_sock_to_writer' exiting") + self._s1.close() + self._on_end and self._on_end() + + self._t1 = threading.Thread(target=stream_reader_to_sock, name='stream_reader_to_sock', daemon=True).start() + self._t2 = threading.Thread(target=stream_sock_to_writer, name='stream_sock_to_writer', daemon=True).start() + + def makefiles(self): + return self._s2.makefile("rb", buffering=0), self._s2.makefile("wb", buffering=0) diff --git a/sshuttle/ssh.py b/sshuttle/ssh.py index b8b947a..8c37fa2 100644 --- a/sshuttle/ssh.py +++ b/sshuttle/ssh.py @@ -12,7 +12,7 @@ import ipaddress from urllib.parse import urlparse import sshuttle.helpers as helpers -from sshuttle.helpers import debug2, which, get_path, Fatal +from sshuttle.helpers import debug2, which, get_path, SocketRWShim, Fatal def get_module_source(name): @@ -226,42 +226,14 @@ def connect(ssh_cmd, rhostport, python, stderr, add_cmd_delimiter, options): # Either to use sockets as stdio for subprocess. Or to use pipes but with a select() alternative # https://stackoverflow.com/questions/4993119/redirect-io-of-process-to-windows-socket - (s1, s2) = socket.socketpair() pstdin = ssubprocess.PIPE pstdout = ssubprocess.PIPE preexec_fn = None def get_server_io(): - import threading - - def stream_stdout_to_sock(): - try: - fd = p.stdout.fileno() - for data in iter(lambda: os.read(fd, 16384), b''): - s1.sendall(data) - # debug3("<<<<< p.stdout.read() %d %r...", len(data), data[:min(32, len(data))]) - - finally: - debug2("Thread 'stream_stdout_to_sock' exiting") - s1.close() - p.terminate() - - def stream_sock_to_stdin(): - try: - for data in iter(lambda: s1.recv(16384), b''): - # debug3("<<<<< p.stdout.write() %d %r...", len(data), data[:min(32, len(data))]) - while data: - n = p.stdin.write(data) - data = data[n:] - finally: - debug2("Thread 'stream_sock_to_stdin' exiting") - s1.close() - p.terminate() - - threading.Thread(target=stream_stdout_to_sock, name='stream_stdout_to_sock', daemon=True).start() - threading.Thread(target=stream_sock_to_stdin, name='stream_sock_to_stdin', daemon=True).start() - return s2.makefile("rb", buffering=0), s2.makefile("wb", buffering=0) + shim = SocketRWShim(p.stdout, p.stdin, on_end=lambda: p.terminate()) + return shim.makefiles() # See: stackoverflow.com/questions/48671215/howto-workaround-of-close-fds-true-and-redirect-stdout-stderr-on-windows close_fds = False if sys.platform == 'win32' else True