Improve some debugging information to find the weird data problem.

Turns out list.pop() removes the *last* item, not the first one.  Oops.  It
all works great for queues of only one item... :)
This commit is contained in:
Avery Pennarun 2010-05-02 01:18:55 -04:00
parent 915a96b0ec
commit bfd506dcdc
3 changed files with 37 additions and 21 deletions

View File

@ -46,7 +46,8 @@ def _main(listener, listenport, use_server, remotename, subnets):
def onaccept(): def onaccept():
sock,srcip = listener.accept() sock,srcip = listener.accept()
dstip = original_dst(sock) dstip = original_dst(sock)
log('Incoming connection from %r to %r.\n' % (srcip,dstip)) log('Accept: %r:%r -> %r:%r.\n' % (srcip[0],srcip[1],
dstip[0],dstip[1]))
if dstip == sock.getsockname(): if dstip == sock.getsockname():
log("-- ignored: that's my address!\n") log("-- ignored: that's my address!\n")
sock.close() sock.close()
@ -72,11 +73,10 @@ def _main(listener, listenport, use_server, remotename, subnets):
handlers = filter(lambda s: s.ok, handlers) handlers = filter(lambda s: s.ok, handlers)
for s in handlers: for s in handlers:
s.pre_select(r,w,x) s.pre_select(r,w,x)
log('\n')
log('Waiting: %d[%d,%d,%d]...\n' log('Waiting: %d[%d,%d,%d]...\n'
% (len(handlers), len(r), len(w), len(x))) % (len(handlers), len(r), len(w), len(x)))
(r,w,x) = select.select(r,w,x) (r,w,x) = select.select(r,w,x)
log('r=%r w=%r x=%r\n' % (r,w,x)) #log('r=%r w=%r x=%r\n' % (r,w,x))
ready = set(r) | set(w) | set(x) ready = set(r) | set(w) | set(x)
for s in handlers: for s in handlers:
if s.socks & ready: if s.socks & ready:

View File

@ -32,11 +32,10 @@ def main():
handlers = filter(lambda s: s.ok, handlers) handlers = filter(lambda s: s.ok, handlers)
for s in handlers: for s in handlers:
s.pre_select(r,w,x) s.pre_select(r,w,x)
log('\n')
log('Waiting: %d[%d,%d,%d]...\n' log('Waiting: %d[%d,%d,%d]...\n'
% (len(handlers), len(r), len(w), len(x))) % (len(handlers), len(r), len(w), len(x)))
(r,w,x) = select.select(r,w,x) (r,w,x) = select.select(r,w,x)
log('r=%r w=%r x=%r\n' % (r,w,x)) #log('r=%r w=%r x=%r\n' % (r,w,x))
ready = set(r) | set(w) | set(x) ready = set(r) | set(w) | set(x)
for s in handlers: for s in handlers:
if s.socks & ready: if s.socks & ready:

View File

@ -3,6 +3,7 @@ from helpers import *
HDR_LEN = 8 HDR_LEN = 8
CMD_EXIT = 0x4200 CMD_EXIT = 0x4200
CMD_PING = 0x4201 CMD_PING = 0x4201
CMD_PONG = 0x4202 CMD_PONG = 0x4202
@ -11,12 +12,23 @@ CMD_CLOSE = 0x4204
CMD_EOF = 0x4205 CMD_EOF = 0x4205
CMD_DATA = 0x4206 CMD_DATA = 0x4206
cmd_to_name = {
CMD_EXIT: 'EXIT',
CMD_PING: 'PING',
CMD_PONG: 'PONG',
CMD_CONNECT: 'CONNECT',
CMD_CLOSE: 'CLOSE',
CMD_EOF: 'EOF',
CMD_DATA: 'DATA',
}
def _nb_clean(func, *args): def _nb_clean(func, *args):
try: try:
return func(*args) return func(*args)
except socket.error, e: except OSError, e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN): if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise raise
else: else:
return None return None
@ -26,7 +38,7 @@ def _try_peername(sock):
try: try:
return sock.getpeername() return sock.getpeername()
except socket.error, e: except socket.error, e:
if e.args[0] not in (errno.ENOTCONN,): if e.args[0] not in (errno.ENOTCONN, errno.ENOTSOCK):
raise raise
else: else:
return ('0.0.0.0',0) return ('0.0.0.0',0)
@ -64,8 +76,8 @@ class SockWrapper:
def uwrite(self, buf): def uwrite(self, buf):
self.wsock.setblocking(False) self.wsock.setblocking(False)
try: try:
return _nb_clean(self.wsock.send, buf) return _nb_clean(os.write, self.wsock.fileno(), buf)
except socket.error: except OSError:
# unexpected error... stream is dead # unexpected error... stream is dead
self.nowrite() self.nowrite()
self.noread() self.noread()
@ -80,8 +92,8 @@ class SockWrapper:
return return
self.rsock.setblocking(False) self.rsock.setblocking(False)
try: try:
return _nb_clean(self.rsock.recv, 65536) return _nb_clean(os.read, self.rsock.fileno(), 65536)
except socket.error: except OSError:
return '' # unexpected error... we'll call it EOF return '' # unexpected error... we'll call it EOF
def fill(self): def fill(self):
@ -98,7 +110,7 @@ class SockWrapper:
wrote = outwrap.write(self.buf[0]) wrote = outwrap.write(self.buf[0])
self.buf[0] = self.buf[0][wrote:] self.buf[0] = self.buf[0][wrote:]
while self.buf and not self.buf[0]: while self.buf and not self.buf[0]:
self.buf.pop(0) self.buf[0:1] = []
if not self.buf and self.shut_read: if not self.buf and self.shut_read:
outwrap.nowrite() outwrap.nowrite()
@ -178,11 +190,14 @@ class Mux(Handler):
assert(len(data) <= 65535) assert(len(data) <= 65535)
p = struct.pack('!ccHHH', 'S', 'S', channel, cmd, len(data)) + data p = struct.pack('!ccHHH', 'S', 'S', channel, cmd, len(data)) + data
self.outbuf.append(p) self.outbuf.append(p)
log('Mux: send queue is %d/%d\n' log(' > channel=%d cmd=%s len=%d\n'
% (len(self.outbuf), sum(len(b) for b in self.outbuf))) % (channel, cmd_to_name[cmd], len(data)))
#log('Mux: send queue is %d/%d\n'
# % (len(self.outbuf), sum(len(b) for b in self.outbuf)))
def got_packet(self, channel, cmd, data): def got_packet(self, channel, cmd, data):
log('--got-packet--\n') log('< channel=%d cmd=%s len=%d\n'
% (channel, cmd_to_name[cmd], len(data)))
if cmd == CMD_PING: if cmd == CMD_PING:
self.send(0, CMD_PONG, data) self.send(0, CMD_PONG, data)
elif cmd == CMD_PONG: elif cmd == CMD_PONG:
@ -200,15 +215,16 @@ class Mux(Handler):
def flush(self): def flush(self):
self.wsock.setblocking(False) self.wsock.setblocking(False)
if self.outbuf and self.outbuf[0]: if self.outbuf and self.outbuf[0]:
wrote = _nb_clean(self.wsock.send, self.outbuf[0]) wrote = _nb_clean(os.write, self.wsock.fileno(), self.outbuf[0])
if wrote: if wrote:
self.outbuf[0] = self.outbuf[0][wrote:] self.outbuf[0] = self.outbuf[0][wrote:]
while self.outbuf and not self.outbuf[0]: while self.outbuf and not self.outbuf[0]:
self.outbuf.pop() self.outbuf[0:1] = []
def fill(self): def fill(self):
self.rsock.setblocking(False) self.rsock.setblocking(False)
b = _nb_clean(self.rsock.recv, 32768) b = _nb_clean(os.read, self.rsock.fileno(), 32768)
#log('<<< %r\n' % b)
if b == '': # EOF if b == '': # EOF
self.ok = False self.ok = False
if b: if b:
@ -216,7 +232,8 @@ class Mux(Handler):
def handle(self): def handle(self):
self.fill() self.fill()
log('inbuf is: (%d,%d) %r\n' % (self.want, len(self.inbuf), self.inbuf)) #log('inbuf is: (%d,%d) %r\n'
# % (self.want, len(self.inbuf), self.inbuf))
while 1: while 1:
if len(self.inbuf) >= (self.want or HDR_LEN): if len(self.inbuf) >= (self.want or HDR_LEN):
(s1,s2,channel,cmd,datalen) = \ (s1,s2,channel,cmd,datalen) = \
@ -251,7 +268,7 @@ class MuxWrapper(SockWrapper):
self.mux = mux self.mux = mux
self.channel = channel self.channel = channel
self.mux.channels[channel] = self.got_packet self.mux.channels[channel] = self.got_packet
log('Created MuxWrapper on channel %d\n' % channel) log('new channel: %d\n' % channel)
def __del__(self): def __del__(self):
self.nowrite() self.nowrite()