httpie-cli/tests/test_downloads.py

193 lines
6.7 KiB
Python
Raw Normal View History

2014-04-24 14:07:31 +02:00
import os
import tempfile
2014-04-24 14:07:31 +02:00
import time
from unittest import mock
2019-08-29 08:53:56 +02:00
from urllib.request import urlopen
2014-04-24 14:07:31 +02:00
2014-04-24 17:08:40 +02:00
import pytest
2014-04-24 15:48:01 +02:00
from requests.structures import CaseInsensitiveDict
2014-04-24 14:07:31 +02:00
from httpie.downloads import (
2014-04-24 18:20:23 +02:00
parse_content_range, filename_from_content_disposition, filename_from_url,
2016-02-29 08:00:17 +01:00
get_unique_filename, ContentRangeError, Downloader,
2014-04-24 14:07:31 +02:00
)
from .utils import http, MockEnvironment
2014-04-24 14:07:31 +02:00
class Response:
2014-04-24 15:48:01 +02:00
# noinspection PyDefaultArgument
def __init__(self, url, headers={}, status_code=200):
self.url = url
self.headers = CaseInsensitiveDict(headers)
self.status_code = status_code
2014-04-24 14:07:31 +02:00
class TestDownloadUtils:
2014-04-24 15:48:01 +02:00
def test_Content_Range_parsing(self):
2014-04-24 14:07:31 +02:00
parse = parse_content_range
assert parse('bytes 100-199/200', 100) == 200
assert parse('bytes 100-199/*', 100) == 200
2014-04-24 14:07:31 +02:00
# single byte
assert parse('bytes 100-100/*', 100) == 101
2014-04-24 14:07:31 +02:00
# missing
2014-04-24 17:08:40 +02:00
pytest.raises(ContentRangeError, parse, None, 100)
2014-04-24 14:07:31 +02:00
# syntax error
2014-04-24 17:08:40 +02:00
pytest.raises(ContentRangeError, parse, 'beers 100-199/*', 100)
2014-04-24 14:07:31 +02:00
# unexpected range
2014-04-24 17:08:40 +02:00
pytest.raises(ContentRangeError, parse, 'bytes 100-199/*', 99)
2014-04-24 14:07:31 +02:00
# invalid instance-length
2014-04-24 17:08:40 +02:00
pytest.raises(ContentRangeError, parse, 'bytes 100-199/199', 100)
2014-04-24 14:07:31 +02:00
# invalid byte-range-resp-spec
2014-04-24 17:08:40 +02:00
pytest.raises(ContentRangeError, parse, 'bytes 100-99/199', 100)
2014-04-24 14:07:31 +02:00
@pytest.mark.parametrize('header, expected_filename', [
('attachment; filename=hello-WORLD_123.txt', 'hello-WORLD_123.txt'),
('attachment; filename=".hello-WORLD_123.txt"', 'hello-WORLD_123.txt'),
('attachment; filename="white space.txt"', 'white space.txt'),
(r'attachment; filename="\"quotes\".txt"', '"quotes".txt'),
('attachment; filename=/etc/hosts', 'hosts'),
('attachment; filename=', None)
])
def test_Content_Disposition_parsing(self, header, expected_filename):
assert filename_from_content_disposition(header) == expected_filename
2014-04-24 14:07:31 +02:00
def test_filename_from_url(self):
assert 'foo.txt' == filename_from_url(
2014-04-24 14:07:31 +02:00
url='http://example.org/foo',
content_type='text/plain'
)
assert 'foo.html' == filename_from_url(
2014-04-24 14:07:31 +02:00
url='http://example.org/foo',
content_type='text/html; charset=utf8'
)
assert 'foo' == filename_from_url(
2014-04-24 14:07:31 +02:00
url='http://example.org/foo',
content_type=None
)
assert 'foo' == filename_from_url(
2014-04-24 14:07:31 +02:00
url='http://example.org/foo',
content_type='x-foo/bar'
)
2014-04-24 14:07:31 +02:00
@pytest.mark.parametrize(
'orig_name, unique_on_attempt, expected',
[
# Simple
('foo.bar', 0, 'foo.bar'),
('foo.bar', 1, 'foo.bar-1'),
('foo.bar', 10, 'foo.bar-10'),
# Trim
('A' * 20, 0, 'A' * 10),
('A' * 20, 1, 'A' * 8 + '-1'),
('A' * 20, 10, 'A' * 7 + '-10'),
# Trim before ext
('A' * 20 + '.txt', 0, 'A' * 6 + '.txt'),
('A' * 20 + '.txt', 1, 'A' * 4 + '.txt-1'),
# Trim at the end
('foo.' + 'A' * 20, 0, 'foo.' + 'A' * 6),
('foo.' + 'A' * 20, 1, 'foo.' + 'A' * 4 + '-1'),
('foo.' + 'A' * 20, 10, 'foo.' + 'A' * 3 + '-10'),
]
)
@mock.patch('httpie.downloads.get_filename_max_length')
def test_unique_filename(self, get_filename_max_length,
orig_name, unique_on_attempt,
expected):
2014-04-24 15:48:01 +02:00
def attempts(unique_on_attempt=0):
2014-04-24 14:07:31 +02:00
# noinspection PyUnresolvedReferences,PyUnusedLocal
def exists(filename):
if exists.attempt == unique_on_attempt:
return False
exists.attempt += 1
return True
2014-04-24 15:48:01 +02:00
2014-04-24 14:07:31 +02:00
exists.attempt = 0
return exists
get_filename_max_length.return_value = 10
actual = get_unique_filename(orig_name, attempts(unique_on_attempt))
assert expected == actual
2014-04-24 14:07:31 +02:00
class TestDownloads:
2014-04-24 14:07:31 +02:00
# TODO: more tests
def test_actual_download(self, httpbin_both, httpbin):
robots_txt = '/robots.txt'
body = urlopen(httpbin + robots_txt).read().decode()
env = MockEnvironment(stdin_isatty=True, stdout_isatty=False)
r = http('--download', httpbin_both.url + robots_txt, env=env)
assert 'Downloading' in r.stderr
assert '[K' in r.stderr
assert 'Done' in r.stderr
assert body == r
2014-04-24 14:07:31 +02:00
def test_download_with_Content_Length(self, httpbin_both):
with open(os.devnull, 'w') as devnull:
downloader = Downloader(output_file=devnull, progress_file=devnull)
downloader.start(
initial_url='/',
final_response=Response(
url=httpbin_both.url + '/',
headers={'Content-Length': 10}
)
)
time.sleep(1.1)
downloader.chunk_downloaded(b'12345')
time.sleep(1.1)
downloader.chunk_downloaded(b'12345')
downloader.finish()
assert not downloader.interrupted
downloader._progress_reporter.join()
2014-04-24 14:07:31 +02:00
def test_download_no_Content_Length(self, httpbin_both):
with open(os.devnull, 'w') as devnull:
downloader = Downloader(output_file=devnull, progress_file=devnull)
downloader.start(
final_response=Response(url=httpbin_both.url + '/'),
initial_url='/'
)
time.sleep(1.1)
downloader.chunk_downloaded(b'12345')
downloader.finish()
assert not downloader.interrupted
downloader._progress_reporter.join()
2014-04-24 14:07:31 +02:00
def test_download_interrupted(self, httpbin_both):
with open(os.devnull, 'w') as devnull:
downloader = Downloader(output_file=devnull, progress_file=devnull)
downloader.start(
final_response=Response(
url=httpbin_both.url + '/',
headers={'Content-Length': 5}
),
initial_url='/'
)
downloader.chunk_downloaded(b'1234')
downloader.finish()
assert downloader.interrupted
downloader._progress_reporter.join()
def test_download_with_redirect_original_url_used_for_filename(self, httpbin):
# Redirect from `/redirect/1` to `/get`.
expected_filename = '1.json'
orig_cwd = os.getcwd()
2020-08-15 15:25:05 +02:00
with tempfile.TemporaryDirectory() as tmp_dirname:
os.chdir(tmp_dirname)
try:
assert os.listdir('.') == []
http('--download', httpbin.url + '/redirect/1')
assert os.listdir('.') == [expected_filename]
finally:
os.chdir(orig_cwd)