Do non-blocking connect().

This way we don't freeze the entire proxy when someone tries to connect to a
nonexistent IP address (oops).
This commit is contained in:
Avery Pennarun 2010-05-02 02:43:10 -04:00
parent 81c89ce9be
commit 9b23fd2c01

View File

@ -46,13 +46,15 @@ def _try_peername(sock):
class SockWrapper: class SockWrapper:
def __init__(self, rsock, wsock, peername=None): def __init__(self, rsock, wsock, connect_to=None, peername=None):
self.exc = None self.exc = None
self.rsock = rsock self.rsock = rsock
self.wsock = wsock self.wsock = wsock
self.shut_read = self.shut_write = False self.shut_read = self.shut_write = False
self.buf = [] self.buf = []
self.connect_to = connect_to
self.peername = peername or _try_peername(self.rsock) self.peername = peername or _try_peername(self.rsock)
self.try_connect()
def __del__(self): def __del__(self):
debug1('%r: deleting\n' % self) debug1('%r: deleting\n' % self)
@ -66,6 +68,23 @@ class SockWrapper:
if not self.exc: if not self.exc:
self.exc = e self.exc = e
def try_connect(self):
if not self.connect_to:
return # already connected
self.rsock.setsockopt(socket.SOL_IP, socket.IP_TTL, 42)
self.rsock.setblocking(False)
try:
self.rsock.connect(self.connect_to)
self.connect_to = None
except socket.error, e:
if e.args[0] in [errno.EINPROGRESS, errno.EALREADY]:
pass # not connected yet
elif e.args[0] in [errno.ECONNREFUSED, errno.ETIMEDOUT]:
# a "normal" kind of error
self.seterr(e)
else:
raise # error we've never heard of?! barf completely.
def noread(self): def noread(self):
if not self.shut_read: if not self.shut_read:
debug2('%r: done reading\n' % self) debug2('%r: done reading\n' % self)
@ -82,6 +101,8 @@ class SockWrapper:
self.seterr(e) self.seterr(e)
def uwrite(self, buf): def uwrite(self, buf):
if self.connect_to:
return 0 # still connecting
self.wsock.setblocking(False) self.wsock.setblocking(False)
try: try:
return _nb_clean(os.write, self.wsock.fileno(), buf) return _nb_clean(os.write, self.wsock.fileno(), buf)
@ -97,6 +118,8 @@ class SockWrapper:
return self.uwrite(buf) return self.uwrite(buf)
def uread(self): def uread(self):
if self.connect_to:
return None # still connecting
if self.shut_read: if self.shut_read:
return return
self.rsock.setblocking(False) self.rsock.setblocking(False)
@ -154,16 +177,23 @@ class Proxy(Handler):
self.wrap2 = wrap2 self.wrap2 = wrap2
def pre_select(self, r, w, x): def pre_select(self, r, w, x):
if self.wrap1.buf: if self.wrap1.connect_to:
w.add(self.wrap1.rsock)
elif self.wrap1.buf:
w.add(self.wrap2.wsock) w.add(self.wrap2.wsock)
elif not self.wrap1.shut_read: elif not self.wrap1.shut_read:
r.add(self.wrap1.rsock) r.add(self.wrap1.rsock)
if self.wrap2.buf:
if self.wrap2.connect_to:
w.add(self.wrap2.rsock)
elif self.wrap2.buf:
w.add(self.wrap1.wsock) w.add(self.wrap1.wsock)
elif not self.wrap2.shut_read: elif not self.wrap2.shut_read:
r.add(self.wrap2.rsock) r.add(self.wrap2.rsock)
def callback(self): def callback(self):
self.wrap1.try_connect()
self.wrap2.try_connect()
self.wrap1.fill() self.wrap1.fill()
self.wrap2.fill() self.wrap2.fill()
self.wrap1.copy_to(self.wrap2) self.wrap1.copy_to(self.wrap2)
@ -324,14 +354,6 @@ class MuxWrapper(SockWrapper):
def connect_dst(ip, port): def connect_dst(ip, port):
debug2('Connecting to %s:%d\n' % (ip, port)) debug2('Connecting to %s:%d\n' % (ip, port))
outsock = socket.socket() outsock = socket.socket()
outsock.setsockopt(socket.SOL_IP, socket.IP_TTL, 42) return SockWrapper(outsock, outsock,
e = None connect_to = (ip,port),
try: peername = '%s:%d' % (ip,port))
outsock.connect((ip,port))
except socket.error, e:
if e.args[0] not in [errno.ECONNREFUSED]:
raise
sw = SockWrapper(outsock, outsock, peername = '%s:%d' % (ip,port))
if e:
sw.seterr(e)
return sw