Whoops, we were spinning in the server when the mux outbuf was full.

Once again, the buffering gets mixed up with the selecting.  Seems to be the
story of my life.
This commit is contained in:
Avery Pennarun 2010-05-02 06:17:43 -04:00
parent 6c2dc54b9e
commit ca14231aae
2 changed files with 18 additions and 8 deletions

View File

@ -35,12 +35,14 @@ def main():
handlers = filter(lambda s: s.ok, handlers)
for s in handlers:
s.pre_select(r,w,x)
debug2('Waiting: %d[%d,%d,%d]...\n'
% (len(handlers), len(r), len(w), len(x)))
debug2('Waiting: %d[%d,%d,%d] (fullness=%d/%d)...\n'
% (len(handlers), len(r), len(w), len(x),
mux.fullness, mux.too_full))
(r,w,x) = select.select(r,w,x)
#log('r=%r w=%r x=%r\n' % (r,w,x))
ready = set(r) | set(w) | set(x)
for s in handlers:
#debug2('check: %r: %r\n' % (s, s.socks & ready))
if s.socks & ready:
s.callback()
mux.check_fullness()

View File

@ -101,6 +101,9 @@ class SockWrapper:
except socket.error, e:
self.seterr(e)
def too_full(self):
return False # fullness is determined by the socket's select() state
def uwrite(self, buf):
if self.connect_to:
return 0 # still connecting
@ -181,14 +184,16 @@ class Proxy(Handler):
if self.wrap1.connect_to:
w.add(self.wrap1.rsock)
elif self.wrap1.buf:
w.add(self.wrap2.wsock)
if not self.wrap2.too_full():
w.add(self.wrap2.wsock)
elif not self.wrap1.shut_read:
r.add(self.wrap1.rsock)
if self.wrap2.connect_to:
w.add(self.wrap2.rsock)
elif self.wrap2.buf:
w.add(self.wrap1.wsock)
if not self.wrap1.too_full():
w.add(self.wrap1.wsock)
elif not self.wrap2.shut_read:
r.add(self.wrap2.rsock)
@ -232,7 +237,9 @@ class Mux(Handler):
return sum(len(b) for b in self.outbuf)
def check_fullness(self):
if self.fullness > 65536:
if self.fullness > 32768:
if not self.too_full:
self.send(0, CMD_PING, 'rttest')
self.too_full = True
#ob = []
#for b in self.outbuf:
@ -247,8 +254,6 @@ class Mux(Handler):
self.outbuf.append(p)
debug2(' > channel=%d cmd=%s len=%d (fullness=%d)\n'
% (channel, cmd_to_name[cmd], len(data), self.fullness))
if self.fullness < 32768 and self.fullness+len(data) > 32768:
self.send(0, CMD_PING, 'rttest')
self.fullness += len(data)
def got_packet(self, channel, cmd, data):
@ -274,7 +279,7 @@ class Mux(Handler):
self.wsock.setblocking(False)
if self.outbuf and self.outbuf[0]:
wrote = _nb_clean(os.write, self.wsock.fileno(), self.outbuf[0])
debug2('wrote: %d/%d\n' % (wrote, len(self.outbuf[0])))
debug2('mux wrote: %d/%d\n' % (wrote, len(self.outbuf[0])))
if wrote:
self.outbuf[0] = self.outbuf[0][wrote:]
while self.outbuf and not self.outbuf[0]:
@ -346,6 +351,9 @@ class MuxWrapper(SockWrapper):
self.shut_write = True
self.mux.send(self.channel, CMD_EOF, '')
def too_full(self):
return self.mux.too_full
def uwrite(self, buf):
if self.mux.too_full:
return 0 # too much already enqueued