From 5d1390927d7ac323ca33acc9ed91c1a7738bf752 Mon Sep 17 00:00:00 2001 From: Avery Pennarun Date: Sun, 2 May 2010 05:06:51 -0400 Subject: [PATCH] Don't overfill the mux's output buffer. Otherwise a single busy stream can ruin it for everybody. --- client.py | 2 ++ server.py | 2 ++ ssnet.py | 21 +++++++++++++++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/client.py b/client.py index a48c2ef..05225a5 100644 --- a/client.py +++ b/client.py @@ -87,6 +87,8 @@ def _main(listener, listenport, use_server, remotename, subnets): for s in handlers: if s.socks & ready: s.callback() + mux.callback() + mux.check_fullness() def main(listenip, use_server, remotename, subnets): diff --git a/server.py b/server.py index b0699e6..7a1c679 100644 --- a/server.py +++ b/server.py @@ -43,3 +43,5 @@ def main(): for s in handlers: if s.socks & ready: s.callback() + mux.callback() + mux.check_fullness() diff --git a/ssnet.py b/ssnet.py index e5c78a2..964bd34 100644 --- a/ssnet.py +++ b/ssnet.py @@ -215,6 +215,7 @@ class Mux(Handler): self.want = 0 self.inbuf = '' self.outbuf = [] + self.too_full = False self.send(0, CMD_PING, 'chicken') def next_channel(self): @@ -225,7 +226,18 @@ class Mux(Handler): self.chani = 1 if not self.channels.get(self.chani): return self.chani + + def amount_queued(self): + return sum(len(b) for b in self.outbuf) + def check_fullness(self): + self.too_full = (self.amount_queued() > 32768) + ob = [] + for b in self.outbuf: + (s1,s2,c) = struct.unpack('!ccH', b[:4]) + ob.append(c) + log('outbuf: %d %r\n' % (self.amount_queued(), ob,)) + def send(self, channel, cmd, data): data = str(data) assert(len(data) <= 65535) @@ -233,8 +245,6 @@ class Mux(Handler): self.outbuf.append(p) debug2(' > channel=%d cmd=%s len=%d\n' % (channel, cmd_to_name[cmd], len(data))) - #log('Mux: send queue is %d/%d\n' - # % (len(self.outbuf), sum(len(b) for b in self.outbuf))) def got_packet(self, channel, cmd, data): debug2('< channel=%d cmd=%s len=%d\n' @@ -309,6 +319,7 @@ class MuxWrapper(SockWrapper): self.mux = mux self.channel = channel self.mux.channels[channel] = self.got_packet + self.socks = [] debug2('new channel: %d\n' % channel) def __del__(self): @@ -328,8 +339,10 @@ class MuxWrapper(SockWrapper): self.mux.send(self.channel, CMD_EOF, '') def uwrite(self, buf): - if len(buf) > 65535: - buf = buf[:32768] + if self.mux.too_full: + return 0 # too much already enqueued + if len(buf) > 2048: + buf = buf[:2048] self.mux.send(self.channel, CMD_DATA, buf) return len(buf)