Stop the progres reporter thread on error.

This commit is contained in:
Jakub Roztocil 2013-04-16 04:55:45 -03:00
parent 6c3b983c18
commit 1fc8396c4b
4 changed files with 56 additions and 53 deletions

View File

@ -139,10 +139,7 @@ def main(args=sys.argv[1:], env=Environment()):
env.stderr.write('\n') env.stderr.write('\n')
else: else:
raise raise
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
if download:
download.finish()
if traceback: if traceback:
raise raise
env.stderr.write('\n') env.stderr.write('\n')
@ -160,4 +157,8 @@ def main(args=sys.argv[1:], env=Environment()):
error('%s: %s', type(e).__name__, str(e)) error('%s: %s', type(e).__name__, str(e))
exit_status = ExitStatus.ERROR exit_status = ExitStatus.ERROR
finally:
if download and not download.finished:
download.failed()
return exit_status return exit_status

View File

@ -22,7 +22,7 @@ PARTIAL_CONTENT = 206
CLEAR_LINE = '\r\033[K' CLEAR_LINE = '\r\033[K'
PROGRESS = ( PROGRESS = (
'{percentage: 6.2f}%' '{percentage: 6.2f} %'
' {downloaded: >10}' ' {downloaded: >10}'
' {speed: >10}/s' ' {speed: >10}/s'
' {eta: >8} ETA' ' {eta: >8} ETA'
@ -160,11 +160,11 @@ class Download(object):
self._output_file = output_file self._output_file = output_file
self._resume = resume self._resume = resume
self._resumed_from = 0 self._resumed_from = 0
self._finished = False self.finished = False
self._progress = Progress() self._status = Status()
self._progress_reporter = ProgressReporter( self._progress_reporter = ProgressReporter(
progress=self._progress, status=self._status,
output=progress_file output=progress_file
) )
@ -197,7 +197,7 @@ class Download(object):
:return: RawStream, output_file :return: RawStream, output_file
""" """
assert not self._progress.time_started assert not self._status.time_started
try: try:
total_size = int(response.headers['Content-Length']) total_size = int(response.headers['Content-Length'])
@ -232,7 +232,7 @@ class Download(object):
) )
self._output_file = open(get_unique_filename(fn), mode='a+b') self._output_file = open(get_unique_filename(fn), mode='a+b')
self._progress.started( self._status.started(
resumed_from=self._resumed_from, resumed_from=self._resumed_from,
total_size=total_size total_size=total_size
) )
@ -241,9 +241,7 @@ class Download(object):
msg=HTTPResponse(response), msg=HTTPResponse(response),
with_headers=False, with_headers=False,
with_body=True, with_body=True,
on_body_chunk_downloaded=self._on_progress, on_body_chunk_downloaded=self._chunk_downloaded,
# TODO: Find the optimal chunk size.
# The smaller it is the slower it gets, but gives better feedback.
chunk_size=1024 * 8 chunk_size=1024 * 8
) )
@ -260,19 +258,23 @@ class Download(object):
return stream, self._output_file return stream, self._output_file
def finish(self): def finish(self):
assert not self._finished assert not self.finished
self._finished = True self.finished = True
self._progress.finished() self._status.finished()
def failed(self):
self.finish()
self._progress_reporter.stop()
@property @property
def interrupted(self): def interrupted(self):
return ( return (
self._finished self.finished
and self._progress.total_size and self._status.total_size
and self._progress.total_size != self._progress.downloaded and self._status.total_size != self._status.downloaded
) )
def _on_progress(self, chunk): def _chunk_downloaded(self, chunk):
""" """
A download progress callback. A download progress callback.
@ -281,10 +283,11 @@ class Download(object):
:type chunk: bytes :type chunk: bytes
""" """
self._progress.chunk_downloaded(len(chunk)) self._status.chunk_downloaded(len(chunk))
class Progress(object): class Status(object):
"""Holds details about the downland status."""
def __init__(self): def __init__(self):
self.downloaded = 0 self.downloaded = 0
@ -315,14 +318,19 @@ class Progress(object):
class ProgressReporter(object): class ProgressReporter(object):
"""
Reports download progress based on its status.
def __init__(self, progress, output, tick=.1, update_interval=1): Uses threading to periodically update the status (speed, ETA, etc.).
"""
def __init__(self, status, output, tick=.1, update_interval=1):
""" """
:type progress: Progress :type status: Status
:type output: file :type output: file
""" """
self.progress = progress self.status = status
self.output = output self.output = output
self._tick = tick self._tick = tick
self._update_interval = update_interval self._update_interval = update_interval
@ -330,9 +338,16 @@ class ProgressReporter(object):
self._status_line = '' self._status_line = ''
self._prev_bytes = 0 self._prev_bytes = 0
self._prev_time = time() self._prev_time = time()
self._stop = False
def stop(self):
"""Stop reporting on next tick."""
self._stop = True
def report(self): def report(self):
if self.progress.has_finished: if self._stop:
return
if self.status.has_finished:
self.sum_up() self.sum_up()
else: else:
self.report_speed() self.report_speed()
@ -343,31 +358,28 @@ class ProgressReporter(object):
now = time() now = time()
if now - self._prev_time >= self._update_interval: if now - self._prev_time >= self._update_interval:
downloaded = self.status.downloaded
downloaded = self.progress.downloaded
try: try:
# TODO: Use a longer interval for the speed/eta calculation?
speed = ((downloaded - self._prev_bytes) speed = ((downloaded - self._prev_bytes)
/ (now - self._prev_time)) / (now - self._prev_time))
except ZeroDivisionError: except ZeroDivisionError:
speed = 0 speed = 0
if not self.progress.total_size: if not self.status.total_size:
self._status_line = PROGRESS_NO_CONTENT_LENGTH.format( self._status_line = PROGRESS_NO_CONTENT_LENGTH.format(
downloaded=humanize_bytes(downloaded), downloaded=humanize_bytes(downloaded),
speed=humanize_bytes(speed), speed=humanize_bytes(speed),
) )
else: else:
try: try:
percentage = downloaded / self.progress.total_size * 100 percentage = downloaded / self.status.total_size * 100
except ZeroDivisionError: except ZeroDivisionError:
percentage = 0 percentage = 0
if not speed: if not speed:
eta = '-:--:--' eta = '-:--:--'
else: else:
s = int((self.progress.total_size - downloaded) / speed) s = int((self.status.total_size - downloaded) / speed)
h, s = divmod(s, 60 * 60) h, s = divmod(s, 60 * 60)
m, s = divmod(s, 60) m, s = divmod(s, 60)
eta = '{}:{:0>2}:{:0>2}'.format(h, m, s) eta = '{}:{:0>2}:{:0>2}'.format(h, m, s)
@ -396,15 +408,15 @@ class ProgressReporter(object):
else 0) else 0)
def sum_up(self): def sum_up(self):
actually_downloaded = (self.progress.downloaded actually_downloaded = (self.status.downloaded
- self.progress.resumed_from) - self.status.resumed_from)
time_taken = self.progress.time_finished - self.progress.time_started time_taken = self.status.time_finished - self.status.time_started
self.output.write(CLEAR_LINE) self.output.write(CLEAR_LINE)
self.output.write(SUMMARY.format( self.output.write(SUMMARY.format(
downloaded=humanize_bytes(actually_downloaded), downloaded=humanize_bytes(actually_downloaded),
total=(self.progress.total_size total=(self.status.total_size
and humanize_bytes(self.progress.total_size)), and humanize_bytes(self.status.total_size)),
speed=humanize_bytes(actually_downloaded / time_taken), speed=humanize_bytes(actually_downloaded / time_taken),
time=time_taken, time=time_taken,
)) ))

View File

@ -1,2 +1,3 @@
tox
httpbin httpbin
docutils docutils

View File

@ -1540,6 +1540,7 @@ class Response(object):
self.status_code = status_code self.status_code = status_code
# noinspection PyTypeChecker
class DownloadTest(BaseTestCase): class DownloadTest(BaseTestCase):
# TODO: Download tests. # TODO: Download tests.
@ -1562,31 +1563,19 @@ class DownloadTest(BaseTestCase):
def test_download_no_Content_Length(self): def test_download_no_Content_Length(self):
download = Download(output_file=open(os.devnull, 'w')) download = Download(output_file=open(os.devnull, 'w'))
download.start(Response(url=httpbin('/'))) download.start(Response(url=httpbin('/')))
download._on_progress(b'12345') download._chunk_downloaded(b'12345')
download.finish() download.finish()
self.assertFalse(download.interrupted) self.assertFalse(download.interrupted)
def test_download_with_Content_Length(self): def test_download_interrupted(self):
download = Download( download = Download(
output_file=open(os.devnull, 'w'), output_file=open(os.devnull, 'w')
progress_file=BytesIO(),
) )
download.start(Response( download.start(Response(
url=httpbin('/'), url=httpbin('/'),
headers={'Content-Length': 5} headers={'Content-Length': 5}
)) ))
download._on_progress(b'12345') download._chunk_downloaded(b'1234')
time.sleep(1.5)
download.finish()
self.assertFalse(download.interrupted)
def test_download_interrupted(self):
download = Download(output_file=open(os.devnull, 'w'))
download.start(Response(
url=httpbin('/'),
headers={'Content-Length': 5}
))
download._on_progress(b'1234')
download.finish() download.finish()
self.assertTrue(download.interrupted) self.assertTrue(download.interrupted)