We now have a server that works... some of the time.

There still seem to be some weird timing and/or closing-related bugs, since
I can't load the eqldata project correctly unless I use --noserver.
This commit is contained in:
Avery Pennarun
2010-05-02 00:52:06 -04:00
parent d435c41bdb
commit 915a96b0ec
5 changed files with 146 additions and 38 deletions

104
ssnet.py
View File

@ -16,16 +16,27 @@ def _nb_clean(func, *args):
try:
return func(*args)
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
else:
return None
raise
def _try_peername(sock):
try:
return sock.getpeername()
except socket.error, e:
if e.args[0] not in (errno.ENOTCONN,):
raise
else:
return ('0.0.0.0',0)
class SockWrapper:
def __init__(self, rsock, wsock):
self.rsock = rsock
self.wsock = wsock
self.peername = self.rsock.getpeername()
self.peername = _try_peername(self.rsock)
self.shut_read = self.shut_write = False
self.buf = []
@ -45,11 +56,20 @@ class SockWrapper:
if not self.shut_write:
log('%r: done writing\n' % self)
self.shut_write = True
self.wsock.shutdown(socket.SHUT_WR)
try:
self.wsock.shutdown(socket.SHUT_WR)
except socket.error:
pass
def uwrite(self, buf):
self.wsock.setblocking(False)
return _nb_clean(self.wsock.send, buf)
try:
return _nb_clean(self.wsock.send, buf)
except socket.error:
# unexpected error... stream is dead
self.nowrite()
self.noread()
return 0
def write(self, buf):
assert(buf)
@ -59,7 +79,10 @@ class SockWrapper:
if self.shut_read:
return
self.rsock.setblocking(False)
return _nb_clean(self.rsock.recv, 65536)
try:
return _nb_clean(self.rsock.recv, 65536)
except socket.error:
return '' # unexpected error... we'll call it EOF
def fill(self):
if self.buf:
@ -133,6 +156,7 @@ class Mux(Handler):
Handler.__init__(self, [rsock, wsock])
self.rsock = rsock
self.wsock = wsock
self.new_channel = None
self.channels = {}
self.chani = 0
self.want = 0
@ -160,12 +184,18 @@ class Mux(Handler):
def got_packet(self, channel, cmd, data):
log('--got-packet--\n')
if cmd == CMD_PING:
self.mux.send(0, CMD_PONG, data)
self.send(0, CMD_PONG, data)
elif cmd == CMD_PONG:
log('received PING response\n')
elif cmd == CMD_EXIT:
self.ok = False
elif cmd == CMD_CONNECT:
assert(not self.channels.get(channel))
if self.new_channel:
self.new_channel(channel, data)
else:
c = self.channels[channel]
c.got_packet(cmd, data)
callback = self.channels[channel]
callback(cmd, data)
def flush(self):
self.wsock.setblocking(False)
@ -180,28 +210,30 @@ class Mux(Handler):
self.rsock.setblocking(False)
b = _nb_clean(self.rsock.recv, 32768)
if b == '': # EOF
ok = False
self.ok = False
if b:
self.inbuf += b
def handle(self):
log('inbuf is: %r\n' % self.inbuf)
if len(self.inbuf) >= (self.want or HDR_LEN):
(s1,s2,channel,cmd,datalen) = struct.unpack('!ccHHH',
self.inbuf[:HDR_LEN])
assert(s1 == 'S')
assert(s2 == 'S')
self.want = datalen + HDR_LEN
if self.want and len(self.inbuf) >= self.want:
data = self.inbuf[HDR_LEN:self.want]
self.inbuf = self.inbuf[self.want:]
self.got_packet(channel, cmd, data)
else:
self.fill()
self.fill()
log('inbuf is: (%d,%d) %r\n' % (self.want, len(self.inbuf), self.inbuf))
while 1:
if len(self.inbuf) >= (self.want or HDR_LEN):
(s1,s2,channel,cmd,datalen) = \
struct.unpack('!ccHHH', self.inbuf[:HDR_LEN])
assert(s1 == 'S')
assert(s2 == 'S')
self.want = datalen + HDR_LEN
if self.want and len(self.inbuf) >= self.want:
data = self.inbuf[HDR_LEN:self.want]
self.inbuf = self.inbuf[self.want:]
self.want = 0
self.got_packet(channel, cmd, data)
else:
break
def pre_select(self, r, w, x):
if self.inbuf < (self.want or HDR_LEN):
r.add(self.rsock)
r.add(self.rsock)
if self.outbuf:
w.add(self.wsock)
@ -218,9 +250,16 @@ class MuxWrapper(SockWrapper):
SockWrapper.__init__(self, mux.rsock, mux.wsock)
self.mux = mux
self.channel = channel
self.mux.channels[channel] = self
self.mux.channels[channel] = self.got_packet
log('Created MuxWrapper on channel %d\n' % channel)
def __del__(self):
self.nowrite()
SockWrapper.__del__(self)
def __repr__(self):
return 'SW%r:Mux#%d' % (self.peername,self.channel)
def noread(self):
if not self.shut_read:
self.shut_read = True
@ -231,6 +270,8 @@ class MuxWrapper(SockWrapper):
self.mux.send(self.channel, CMD_EOF, '')
def uwrite(self, buf):
if len(buf) > 65535:
buf = buf[:32768]
self.mux.send(self.channel, CMD_DATA, buf)
return len(buf)
@ -251,3 +292,14 @@ class MuxWrapper(SockWrapper):
else:
raise Exception('unknown command %d (%d bytes)'
% (cmd, len(data)))
def connect_dst(ip, port):
outsock = socket.socket()
outsock.setsockopt(socket.SOL_IP, socket.IP_TTL, 42)
try:
outsock.connect((ip,port))
except socket.error, e:
if e.args[0] not in [errno.ECONNREFUSED]:
raise
return SockWrapper(outsock,outsock)