mirror of
https://github.com/sshuttle/sshuttle.git
synced 2025-02-17 19:01:03 +01:00
add SocketRWShim helper
This commit is contained in:
parent
c4255a23f0
commit
ace8642950
@ -2,7 +2,8 @@ import sys
|
|||||||
import socket
|
import socket
|
||||||
import errno
|
import errno
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
|
import traceback
|
||||||
|
|
||||||
if sys.platform != "win32":
|
if sys.platform != "win32":
|
||||||
import fcntl
|
import fcntl
|
||||||
@ -275,3 +276,47 @@ class RWPair:
|
|||||||
f.close()
|
f.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SocketRWShim:
|
||||||
|
__slots__ = ('_r', '_w', '_on_end', '_s1', '_s2', '_t1', '_t2')
|
||||||
|
|
||||||
|
def __init__(self, r, w, on_end=None) -> None:
|
||||||
|
self._r = r
|
||||||
|
self._w = w
|
||||||
|
self._on_end = on_end
|
||||||
|
|
||||||
|
self._s1, self._s2 = socket.socketpair()
|
||||||
|
debug3("[SocketShim] r=%r w=%r | s1=%r s2=%r" % (self._r, self._w, self._s1, self._s2))
|
||||||
|
|
||||||
|
def stream_reader_to_sock():
|
||||||
|
try:
|
||||||
|
for data in iter(lambda: self._r.read(16384), b''):
|
||||||
|
self._s1.sendall(data)
|
||||||
|
# debug3("[SocketRWShim] <<<<< r.read() %d %r..." % (len(data), data[:min(32, len(data))]))
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc(file=sys.stderr)
|
||||||
|
finally:
|
||||||
|
debug2("[SocketRWShim] Thread 'stream_reader_to_sock' exiting")
|
||||||
|
self._s1.close()
|
||||||
|
self._on_end and self._on_end()
|
||||||
|
|
||||||
|
def stream_sock_to_writer():
|
||||||
|
try:
|
||||||
|
for data in iter(lambda: self._s1.recv(16384), b''):
|
||||||
|
while data:
|
||||||
|
n = self._w.write(data)
|
||||||
|
data = data[n:]
|
||||||
|
# debug3("[SocketRWShim] <<<<< w.write() %d %r..." % (len(data), data[:min(32, len(data))]))
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc(file=sys.stderr)
|
||||||
|
finally:
|
||||||
|
debug2("[SocketRWShim] Thread 'stream_sock_to_writer' exiting")
|
||||||
|
self._s1.close()
|
||||||
|
self._on_end and self._on_end()
|
||||||
|
|
||||||
|
self._t1 = threading.Thread(target=stream_reader_to_sock, name='stream_reader_to_sock', daemon=True).start()
|
||||||
|
self._t2 = threading.Thread(target=stream_sock_to_writer, name='stream_sock_to_writer', daemon=True).start()
|
||||||
|
|
||||||
|
def makefiles(self):
|
||||||
|
return self._s2.makefile("rb", buffering=0), self._s2.makefile("wb", buffering=0)
|
||||||
|
@ -12,7 +12,7 @@ import ipaddress
|
|||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
import sshuttle.helpers as helpers
|
import sshuttle.helpers as helpers
|
||||||
from sshuttle.helpers import debug2, which, get_path, Fatal
|
from sshuttle.helpers import debug2, which, get_path, SocketRWShim, Fatal
|
||||||
|
|
||||||
|
|
||||||
def get_module_source(name):
|
def get_module_source(name):
|
||||||
@ -226,42 +226,14 @@ def connect(ssh_cmd, rhostport, python, stderr, add_cmd_delimiter, options):
|
|||||||
# Either to use sockets as stdio for subprocess. Or to use pipes but with a select() alternative
|
# Either to use sockets as stdio for subprocess. Or to use pipes but with a select() alternative
|
||||||
# https://stackoverflow.com/questions/4993119/redirect-io-of-process-to-windows-socket
|
# https://stackoverflow.com/questions/4993119/redirect-io-of-process-to-windows-socket
|
||||||
|
|
||||||
(s1, s2) = socket.socketpair()
|
|
||||||
pstdin = ssubprocess.PIPE
|
pstdin = ssubprocess.PIPE
|
||||||
pstdout = ssubprocess.PIPE
|
pstdout = ssubprocess.PIPE
|
||||||
|
|
||||||
preexec_fn = None
|
preexec_fn = None
|
||||||
|
|
||||||
def get_server_io():
|
def get_server_io():
|
||||||
import threading
|
shim = SocketRWShim(p.stdout, p.stdin, on_end=lambda: p.terminate())
|
||||||
|
return shim.makefiles()
|
||||||
def stream_stdout_to_sock():
|
|
||||||
try:
|
|
||||||
fd = p.stdout.fileno()
|
|
||||||
for data in iter(lambda: os.read(fd, 16384), b''):
|
|
||||||
s1.sendall(data)
|
|
||||||
# debug3("<<<<< p.stdout.read() %d %r...", len(data), data[:min(32, len(data))])
|
|
||||||
|
|
||||||
finally:
|
|
||||||
debug2("Thread 'stream_stdout_to_sock' exiting")
|
|
||||||
s1.close()
|
|
||||||
p.terminate()
|
|
||||||
|
|
||||||
def stream_sock_to_stdin():
|
|
||||||
try:
|
|
||||||
for data in iter(lambda: s1.recv(16384), b''):
|
|
||||||
# debug3("<<<<< p.stdout.write() %d %r...", len(data), data[:min(32, len(data))])
|
|
||||||
while data:
|
|
||||||
n = p.stdin.write(data)
|
|
||||||
data = data[n:]
|
|
||||||
finally:
|
|
||||||
debug2("Thread 'stream_sock_to_stdin' exiting")
|
|
||||||
s1.close()
|
|
||||||
p.terminate()
|
|
||||||
|
|
||||||
threading.Thread(target=stream_stdout_to_sock, name='stream_stdout_to_sock', daemon=True).start()
|
|
||||||
threading.Thread(target=stream_sock_to_stdin, name='stream_sock_to_stdin', daemon=True).start()
|
|
||||||
return s2.makefile("rb", buffering=0), s2.makefile("wb", buffering=0)
|
|
||||||
|
|
||||||
# See: stackoverflow.com/questions/48671215/howto-workaround-of-close-fds-true-and-redirect-stdout-stderr-on-windows
|
# See: stackoverflow.com/questions/48671215/howto-workaround-of-close-fds-true-and-redirect-stdout-stderr-on-windows
|
||||||
close_fds = False if sys.platform == 'win32' else True
|
close_fds = False if sys.platform == 'win32' else True
|
||||||
|
Loading…
Reference in New Issue
Block a user