Don't overfill the mux's output buffer.

Otherwise a single busy stream can ruin it for everybody.
This commit is contained in:
Avery Pennarun 2010-05-02 05:06:51 -04:00
parent da06286427
commit 5d1390927d
3 changed files with 21 additions and 4 deletions

View File

@ -87,6 +87,8 @@ def _main(listener, listenport, use_server, remotename, subnets):
for s in handlers: for s in handlers:
if s.socks & ready: if s.socks & ready:
s.callback() s.callback()
mux.callback()
mux.check_fullness()
def main(listenip, use_server, remotename, subnets): def main(listenip, use_server, remotename, subnets):

View File

@ -43,3 +43,5 @@ def main():
for s in handlers: for s in handlers:
if s.socks & ready: if s.socks & ready:
s.callback() s.callback()
mux.callback()
mux.check_fullness()

View File

@ -215,6 +215,7 @@ class Mux(Handler):
self.want = 0 self.want = 0
self.inbuf = '' self.inbuf = ''
self.outbuf = [] self.outbuf = []
self.too_full = False
self.send(0, CMD_PING, 'chicken') self.send(0, CMD_PING, 'chicken')
def next_channel(self): def next_channel(self):
@ -225,7 +226,18 @@ class Mux(Handler):
self.chani = 1 self.chani = 1
if not self.channels.get(self.chani): if not self.channels.get(self.chani):
return self.chani return self.chani
def amount_queued(self):
return sum(len(b) for b in self.outbuf)
def check_fullness(self):
self.too_full = (self.amount_queued() > 32768)
ob = []
for b in self.outbuf:
(s1,s2,c) = struct.unpack('!ccH', b[:4])
ob.append(c)
log('outbuf: %d %r\n' % (self.amount_queued(), ob,))
def send(self, channel, cmd, data): def send(self, channel, cmd, data):
data = str(data) data = str(data)
assert(len(data) <= 65535) assert(len(data) <= 65535)
@ -233,8 +245,6 @@ class Mux(Handler):
self.outbuf.append(p) self.outbuf.append(p)
debug2(' > channel=%d cmd=%s len=%d\n' debug2(' > channel=%d cmd=%s len=%d\n'
% (channel, cmd_to_name[cmd], len(data))) % (channel, cmd_to_name[cmd], len(data)))
#log('Mux: send queue is %d/%d\n'
# % (len(self.outbuf), sum(len(b) for b in self.outbuf)))
def got_packet(self, channel, cmd, data): def got_packet(self, channel, cmd, data):
debug2('< channel=%d cmd=%s len=%d\n' debug2('< channel=%d cmd=%s len=%d\n'
@ -309,6 +319,7 @@ class MuxWrapper(SockWrapper):
self.mux = mux self.mux = mux
self.channel = channel self.channel = channel
self.mux.channels[channel] = self.got_packet self.mux.channels[channel] = self.got_packet
self.socks = []
debug2('new channel: %d\n' % channel) debug2('new channel: %d\n' % channel)
def __del__(self): def __del__(self):
@ -328,8 +339,10 @@ class MuxWrapper(SockWrapper):
self.mux.send(self.channel, CMD_EOF, '') self.mux.send(self.channel, CMD_EOF, '')
def uwrite(self, buf): def uwrite(self, buf):
if len(buf) > 65535: if self.mux.too_full:
buf = buf[:32768] return 0 # too much already enqueued
if len(buf) > 2048:
buf = buf[:2048]
self.mux.send(self.channel, CMD_DATA, buf) self.mux.send(self.channel, CMD_DATA, buf)
return len(buf) return len(buf)