httpie-cli/tests/tests.py
Jakub Roztocil e0cc63c7eb Cleanup
2014-01-25 15:09:28 +01:00

1756 lines
50 KiB
Python
Executable File

#!/usr/bin/env python
# coding=utf8
"""
Many of the test cases here use httpbin.org.
To make it run faster and offline you can::
# Install `httpbin` locally
pip install git+https://github.com/kennethreitz/httpbin.git
# Run it
httpbin
# Run the tests against it
HTTPBIN_URL=http://localhost:5000 python setup.py test
# Test all Python environments
HTTPBIN_URL=http://localhost:5000 tox
"""
import subprocess
import os
import sys
import json
#noinspection PyCompatibility
import argparse
import tempfile
import unittest
import shutil
import time
from requests.structures import CaseInsensitiveDict
try:
#noinspection PyCompatibility
from urllib.request import urlopen
except ImportError:
# noinspection PyUnresolvedReferences
#noinspection PyCompatibility
from urllib2 import urlopen
try:
from unittest import skipIf, skip
except ImportError:
skip = lambda msg: lambda self: None
# noinspection PyUnusedLocal
def skipIf(cond, reason):
def decorator(test_method):
if cond:
return lambda self: None
return test_method
return decorator
from requests import __version__ as requests_version
#################################################################
# Utils/setup
#################################################################
# HACK: Prepend ../ to PYTHONPATH so that we can import httpie form there.
TESTS_ROOT = os.path.abspath(os.path.dirname(__file__))
sys.path.insert(0, os.path.realpath(os.path.join(TESTS_ROOT, '..')))
from httpie import ExitStatus
from httpie import input
from httpie.cli import parser
from httpie.models import Environment
from httpie.core import main
from httpie.output import BINARY_SUPPRESSED_NOTICE
from httpie.input import ParseError
from httpie.compat import is_windows, is_py26, bytes, str
from httpie.downloads import (
parse_content_range,
filename_from_content_disposition,
filename_from_url,
get_unique_filename,
ContentRangeError,
Download,
)
CRLF = '\r\n'
HTTPBIN_URL = os.environ.get('HTTPBIN_URL',
'http://httpbin.org').rstrip('/')
OK = 'HTTP/1.1 200'
OK_COLOR = (
'HTTP\x1b[39m\x1b[38;5;245m/\x1b[39m\x1b'
'[38;5;37m1.1\x1b[39m\x1b[38;5;245m \x1b[39m\x1b[38;5;37m200'
'\x1b[39m\x1b[38;5;245m \x1b[39m\x1b[38;5;136mOK'
)
COLOR = '\x1b['
def patharg(path):
"""Back slashes need to be escaped in ITEM args, even in Windows paths."""
return path.replace('\\', '\\\\\\')
# Test files
FILE_PATH = os.path.join(TESTS_ROOT, 'fixtures', 'file.txt')
FILE2_PATH = os.path.join(TESTS_ROOT, 'fixtures', 'file2.txt')
BIN_FILE_PATH = os.path.join(TESTS_ROOT, 'fixtures', 'file.bin')
JSON_FILE_PATH = os.path.join(TESTS_ROOT, 'fixtures', 'test.json')
FILE_PATH_ARG = patharg(FILE_PATH)
FILE2_PATH_ARG = patharg(FILE2_PATH)
BIN_FILE_PATH_ARG = patharg(BIN_FILE_PATH)
JSON_FILE_PATH_ARG = patharg(JSON_FILE_PATH)
with open(FILE_PATH) as f:
# Strip because we don't want new lines in the data so that we can
# easily count occurrences also when embedded in JSON (where the new
# line would be escaped).
FILE_CONTENT = f.read().strip()
with open(BIN_FILE_PATH, 'rb') as f:
BIN_FILE_CONTENT = f.read()
with open(JSON_FILE_PATH, 'rb') as f:
JSON_FILE_CONTENT = f.read()
def httpbin(path):
url = HTTPBIN_URL + path
return url
def mk_config_dir():
return tempfile.mkdtemp(prefix='httpie_test_config_dir_')
class TestEnvironment(Environment):
colors = 0
stdin_isatty = True,
stdout_isatty = True
is_windows = False
_shutil = shutil # we need it in __del__ (would get gc'd)
def __init__(self, **kwargs):
if 'stdout' not in kwargs:
kwargs['stdout'] = tempfile.TemporaryFile('w+b')
if 'stderr' not in kwargs:
kwargs['stderr'] = tempfile.TemporaryFile('w+t')
self.delete_config_dir = False
if 'config_dir' not in kwargs:
kwargs['config_dir'] = mk_config_dir()
self.delete_config_dir = True
super(TestEnvironment, self).__init__(**kwargs)
def __del__(self):
if self.delete_config_dir:
self._shutil.rmtree(self.config_dir)
def has_docutils():
try:
#noinspection PyUnresolvedReferences
import docutils
return True
except ImportError:
return False
def get_readme_errors():
p = subprocess.Popen([
'rst2pseudoxml.py',
'--report=1',
'--exit-status=1',
os.path.join(TESTS_ROOT, '..', 'README.rst')
], stderr=subprocess.PIPE, stdout=subprocess.PIPE)
err = p.communicate()[1]
if p.returncode:
return err
class BytesResponse(bytes):
stderr = json = exit_status = None
class StrResponse(str):
stderr = json = exit_status = None
def http(*args, **kwargs):
"""
Invoke `httpie.core.main()` with `args` and `kwargs`,
and return a unicode response.
Return a `StrResponse`, or `BytesResponse` if unable to decode the output.
The response has the following attributes:
`stderr`: text written to stderr
`exit_status`: the exit status
`json`: decoded JSON (if possible) or `None`
Exceptions are propagated except for SystemExit.
"""
env = kwargs.get('env')
if not env:
env = kwargs['env'] = TestEnvironment()
stdout = env.stdout
stderr = env.stderr
try:
try:
exit_status = main(args=['--traceback'] + list(args), **kwargs)
if '--download' in args:
# Let the progress reporter thread finish.
time.sleep(.5)
except Exception:
sys.stderr.write(stderr.read())
raise
except SystemExit:
exit_status = ExitStatus.ERROR
stdout.seek(0)
stderr.seek(0)
output = stdout.read()
try:
r = StrResponse(output.decode('utf8'))
except UnicodeDecodeError:
r = BytesResponse(output)
else:
if COLOR not in r:
# De-serialize JSON body if possible.
if r.strip().startswith('{'):
#noinspection PyTypeChecker
r.json = json.loads(r)
elif r.count('Content-Type:') == 1 and 'application/json' in r:
try:
j = r.strip()[r.strip().rindex('\r\n\r\n'):]
except ValueError:
pass
else:
try:
r.json = json.loads(j)
except ValueError:
pass
r.stderr = stderr.read()
r.exit_status = exit_status
return r
finally:
stdout.close()
stderr.close()
class BaseTestCase(unittest.TestCase):
maxDiff = 100000
if is_py26:
def assertIn(self, member, container, msg=None):
self.assertTrue(member in container, msg)
def assertNotIn(self, member, container, msg=None):
self.assertTrue(member not in container, msg)
def assertDictEqual(self, d1, d2, msg=None):
self.assertEqual(set(d1.keys()), set(d2.keys()), msg)
self.assertEqual(sorted(d1.values()), sorted(d2.values()), msg)
def assertIsNone(self, obj, msg=None):
self.assertEqual(obj, None, msg=msg)
#################################################################
# High-level tests using httpbin.
#################################################################
class HTTPieTest(BaseTestCase):
def test_GET(self):
r = http(
'GET',
httpbin('/get')
)
self.assertIn(OK, r)
def test_DELETE(self):
r = http(
'DELETE',
httpbin('/delete')
)
self.assertIn(OK, r)
def test_PUT(self):
r = http(
'PUT',
httpbin('/put'),
'foo=bar'
)
self.assertIn(OK, r)
self.assertIn(r'\"foo\": \"bar\"', r)
def test_POST_JSON_data(self):
r = http(
'POST',
httpbin('/post'),
'foo=bar'
)
self.assertIn(OK, r)
self.assertIn(r'\"foo\": \"bar\"', r)
def test_POST_form(self):
r = http(
'--form',
'POST',
httpbin('/post'),
'foo=bar'
)
self.assertIn(OK, r)
self.assertIn('"foo": "bar"', r)
def test_POST_form_multiple_values(self):
r = http(
'--form',
'POST',
httpbin('/post'),
'foo=bar',
'foo=baz',
)
self.assertIn(OK, r)
self.assertDictEqual(r.json['form'], {
'foo': ['bar', 'baz']
})
def test_POST_stdin(self):
with open(FILE_PATH) as f:
env = TestEnvironment(
stdin=f,
stdin_isatty=False,
)
r = http(
'--form',
'POST',
httpbin('/post'),
env=env
)
self.assertIn(OK, r)
self.assertIn(FILE_CONTENT, r)
def test_headers(self):
r = http(
'GET',
httpbin('/headers'),
'Foo:bar'
)
self.assertIn(OK, r)
self.assertIn('"User-Agent": "HTTPie', r)
self.assertIn('"Foo": "bar"', r)
class QuerystringTest(BaseTestCase):
def test_query_string_params_in_url(self):
r = http(
'--print=Hhb',
'GET',
httpbin('/get?a=1&b=2')
)
path = '/get?a=1&b=2'
url = httpbin(path)
self.assertIn(OK, r)
self.assertIn('GET %s HTTP/1.1' % path, r)
self.assertIn('"url": "%s"' % url, r)
def test_query_string_params_items(self):
r = http(
'--print=Hhb',
'GET',
httpbin('/get'),
'a==1',
'b==2'
)
path = '/get?a=1&b=2'
url = httpbin(path)
self.assertIn(OK, r)
self.assertIn('GET %s HTTP/1.1' % path, r)
self.assertIn('"url": "%s"' % url, r)
def test_query_string_params_in_url_and_items_with_duplicates(self):
r = http(
'--print=Hhb',
'GET',
httpbin('/get?a=1&a=1'),
'a==1',
'a==1',
'b==2',
)
path = '/get?a=1&a=1&a=1&a=1&b=2'
url = httpbin(path)
self.assertIn(OK, r)
self.assertIn('GET %s HTTP/1.1' % path, r)
self.assertIn('"url": "%s"' % url, r)
class AutoContentTypeAndAcceptHeadersTest(BaseTestCase):
"""
Test that Accept and Content-Type correctly defaults to JSON,
but can still be overridden. The same with Content-Type when --form
-f is used.
"""
def test_GET_no_data_no_auto_headers(self):
# https://github.com/jkbr/httpie/issues/62
r = http(
'GET',
httpbin('/headers')
)
self.assertIn(OK, r)
self.assertIn('"Accept": "*/*"', r)
self.assertNotIn('"Content-Type": "application/json', r)
def test_POST_no_data_no_auto_headers(self):
# JSON headers shouldn't be automatically set for POST with no data.
r = http(
'POST',
httpbin('/post')
)
self.assertIn(OK, r)
self.assertIn('"Accept": "*/*"', r)
self.assertNotIn('"Content-Type": "application/json', r)
def test_POST_with_data_auto_JSON_headers(self):
r = http(
'POST',
httpbin('/post'),
'a=b'
)
self.assertIn(OK, r)
self.assertIn('"Accept": "application/json"', r)
self.assertIn('"Content-Type": "application/json; charset=utf-8', r)
def test_GET_with_data_auto_JSON_headers(self):
# JSON headers should automatically be set also for GET with data.
r = http(
'POST',
httpbin('/post'),
'a=b'
)
self.assertIn(OK, r)
self.assertIn('"Accept": "application/json"', r)
self.assertIn('"Content-Type": "application/json; charset=utf-8', r)
def test_POST_explicit_JSON_auto_JSON_accept(self):
r = http(
'--json',
'POST',
httpbin('/post')
)
self.assertIn(OK, r)
self.assertEqual(r.json['headers']['Accept'], 'application/json')
# Make sure Content-Type gets set even with no data.
# https://github.com/jkbr/httpie/issues/137
self.assertIn('application/json', r.json['headers']['Content-Type'])
def test_GET_explicit_JSON_explicit_headers(self):
r = http(
'--json',
'GET',
httpbin('/headers'),
'Accept:application/xml',
'Content-Type:application/xml'
)
self.assertIn(OK, r)
self.assertIn('"Accept": "application/xml"', r)
self.assertIn('"Content-Type": "application/xml"', r)
def test_POST_form_auto_Content_Type(self):
r = http(
'--form',
'POST',
httpbin('/post')
)
self.assertIn(OK, r)
self.assertIn(
'"Content-Type":'
' "application/x-www-form-urlencoded; charset=utf-8"',
r
)
def test_POST_form_Content_Type_override(self):
r = http(
'--form',
'POST',
httpbin('/post'),
'Content-Type:application/xml'
)
self.assertIn(OK, r)
self.assertIn('"Content-Type": "application/xml"', r)
def test_print_only_body_when_stdout_redirected_by_default(self):
r = http(
'GET',
httpbin('/get'),
env=TestEnvironment(
stdin_isatty=True,
stdout_isatty=False
)
)
self.assertNotIn('HTTP/', r)
def test_print_overridable_when_stdout_redirected(self):
r = http(
'--print=h',
'GET',
httpbin('/get'),
env=TestEnvironment(
stdin_isatty=True,
stdout_isatty=False
)
)
self.assertIn(OK, r)
class ImplicitHTTPMethodTest(BaseTestCase):
def test_implicit_GET(self):
r = http(httpbin('/get'))
self.assertIn(OK, r)
def test_implicit_GET_with_headers(self):
r = http(
httpbin('/headers'),
'Foo:bar'
)
self.assertIn(OK, r)
self.assertIn('"Foo": "bar"', r)
def test_implicit_POST_json(self):
r = http(
httpbin('/post'),
'hello=world'
)
self.assertIn(OK, r)
self.assertIn(r'\"hello\": \"world\"', r)
def test_implicit_POST_form(self):
r = http(
'--form',
httpbin('/post'),
'foo=bar'
)
self.assertIn(OK, r)
self.assertIn('"foo": "bar"', r)
def test_implicit_POST_stdin(self):
with open(FILE_PATH) as f:
env = TestEnvironment(
stdin_isatty=False,
stdin=f,
)
r = http(
'--form',
httpbin('/post'),
env=env
)
self.assertIn(OK, r)
class PrettyOptionsTest(BaseTestCase):
"""Test the --pretty flag handling."""
def test_pretty_enabled_by_default(self):
r = http(
'GET',
httpbin('/get'),
env=TestEnvironment(colors=256),
)
self.assertIn(COLOR, r)
def test_pretty_enabled_by_default_unless_stdout_redirected(self):
r = http(
'GET',
httpbin('/get')
)
self.assertNotIn(COLOR, r)
def test_force_pretty(self):
r = http(
'--pretty=all',
'GET',
httpbin('/get'),
env=TestEnvironment(stdout_isatty=False, colors=256),
)
self.assertIn(COLOR, r)
def test_force_ugly(self):
r = http(
'--pretty=none',
'GET',
httpbin('/get'),
)
self.assertNotIn(COLOR, r)
def test_subtype_based_pygments_lexer_match(self):
"""Test that media subtype is used if type/subtype doesn't
match any lexer.
"""
r = http(
'--print=B',
'--pretty=all',
httpbin('/post'),
'Content-Type:text/foo+json',
'a=b',
env=TestEnvironment(colors=256)
)
self.assertIn(COLOR, r)
def test_colors_option(self):
r = http(
'--print=B',
'--pretty=colors',
'GET',
httpbin('/get'),
'a=b',
env=TestEnvironment(colors=256),
)
#noinspection PyUnresolvedReferences
# Tests that the JSON data isn't formatted.
self.assertEqual(r.strip().count('\n'), 0)
self.assertIn(COLOR, r)
def test_format_option(self):
r = http(
'--print=B',
'--pretty=format',
'GET',
httpbin('/get'),
'a=b',
env=TestEnvironment(colors=256),
)
#noinspection PyUnresolvedReferences
# Tests that the JSON data is formatted.
self.assertEqual(r.strip().count('\n'), 2)
self.assertNotIn(COLOR, r)
class VerboseFlagTest(BaseTestCase):
def test_verbose(self):
r = http(
'--verbose',
'GET',
httpbin('/get'),
'test-header:__test__'
)
self.assertIn(OK, r)
#noinspection PyUnresolvedReferences
self.assertEqual(r.count('__test__'), 2)
def test_verbose_form(self):
# https://github.com/jkbr/httpie/issues/53
r = http(
'--verbose',
'--form',
'POST',
httpbin('/post'),
'foo=bar',
'baz=bar'
)
self.assertIn(OK, r)
self.assertIn('foo=bar&baz=bar', r)
def test_verbose_json(self):
r = http(
'--verbose',
'POST',
httpbin('/post'),
'foo=bar',
'baz=bar'
)
self.assertIn(OK, r)
self.assertIn('"baz": "bar"', r) # request
self.assertIn(r'\"baz\": \"bar\"', r) # response
class MultipartFormDataFileUploadTest(BaseTestCase):
def test_non_existent_file_raises_parse_error(self):
self.assertRaises(ParseError, http,
'--form',
'POST',
httpbin('/post'),
'foo@/__does_not_exist__',
)
def test_upload_ok(self):
r = http(
'--form',
'--verbose',
'POST',
httpbin('/post'),
'test-file@%s' % FILE_PATH_ARG,
'foo=bar'
)
self.assertIn(OK, r)
self.assertIn('Content-Disposition: form-data; name="foo"', r)
self.assertIn('Content-Disposition: form-data; name="test-file";'
' filename="%s"' % os.path.basename(FILE_PATH), r)
#noinspection PyUnresolvedReferences
self.assertEqual(r.count(FILE_CONTENT), 2)
self.assertIn('"foo": "bar"', r)
class BinaryRequestDataTest(BaseTestCase):
def test_binary_stdin(self):
with open(BIN_FILE_PATH, 'rb') as stdin:
env = TestEnvironment(
stdin=stdin,
stdin_isatty=False,
stdout_isatty=False
)
r = http(
'--print=B',
'POST',
httpbin('/post'),
env=env,
)
self.assertEqual(r, BIN_FILE_CONTENT)
def test_binary_file_path(self):
env = TestEnvironment(
stdin_isatty=True,
stdout_isatty=False
)
r = http(
'--print=B',
'POST',
httpbin('/post'),
'@' + BIN_FILE_PATH_ARG,
env=env,
)
self.assertEqual(r, BIN_FILE_CONTENT)
def test_binary_file_form(self):
env = TestEnvironment(
stdin_isatty=True,
stdout_isatty=False
)
r = http(
'--print=B',
'--form',
'POST',
httpbin('/post'),
'test@' + BIN_FILE_PATH_ARG,
env=env,
)
self.assertIn(bytes(BIN_FILE_CONTENT), bytes(r))
class BinaryResponseDataTest(BaseTestCase):
url = 'http://www.google.com/favicon.ico'
@property
def bindata(self):
if not hasattr(self, '_bindata'):
self._bindata = urlopen(self.url).read()
return self._bindata
def test_binary_suppresses_when_terminal(self):
r = http(
'GET',
self.url
)
self.assertIn(BINARY_SUPPRESSED_NOTICE.decode(), r)
def test_binary_suppresses_when_not_terminal_but_pretty(self):
r = http(
'--pretty=all',
'GET',
self.url,
env=TestEnvironment(stdin_isatty=True,
stdout_isatty=False)
)
self.assertIn(BINARY_SUPPRESSED_NOTICE.decode(), r)
def test_binary_included_and_correct_when_suitable(self):
r = http(
'GET',
self.url,
env=TestEnvironment(stdin_isatty=True,
stdout_isatty=False)
)
self.assertEqual(r, self.bindata)
class RequestBodyFromFilePathTest(BaseTestCase):
"""
`http URL @file'
"""
def test_request_body_from_file_by_path(self):
r = http(
'--verbose',
'POST',
httpbin('/post'),
'@' + FILE_PATH_ARG
)
self.assertIn(OK, r)
self.assertIn(FILE_CONTENT, r)
self.assertIn('"Content-Type": "text/plain"', r)
def test_request_body_from_file_by_path_with_explicit_content_type(self):
r = http(
'POST',
httpbin('/post'),
'@' + FILE_PATH_ARG,
'Content-Type:x-foo/bar'
)
self.assertIn(OK, r)
self.assertIn(FILE_CONTENT, r)
self.assertIn('"Content-Type": "x-foo/bar"', r)
def test_request_body_from_file_by_path_no_field_name_allowed(self):
env = TestEnvironment(stdin_isatty=True)
r = http(
'POST',
httpbin('/post'),
'field-name@' + FILE_PATH_ARG,
env=env
)
self.assertIn('perhaps you meant --form?', r.stderr)
def test_request_body_from_file_by_path_no_data_items_allowed(self):
r = http(
'POST',
httpbin('/post'),
'@' + FILE_PATH_ARG,
'foo=bar',
env=TestEnvironment(stdin_isatty=False)
)
self.assertIn('cannot be mixed', r.stderr)
class AuthTest(BaseTestCase):
def test_basic_auth(self):
r = http(
'--auth=user:password',
'GET',
httpbin('/basic-auth/user/password')
)
self.assertIn(OK, r)
self.assertIn('"authenticated": true', r)
self.assertIn('"user": "user"', r)
@skipIf(requests_version == '0.13.6',
'Redirects with prefetch=False are broken in Requests 0.13.6')
def test_digest_auth(self):
r = http(
'--auth-type=digest',
'--auth=user:password',
'GET',
httpbin('/digest-auth/auth/user/password')
)
self.assertIn(OK, r)
self.assertIn(r'"authenticated": true', r)
self.assertIn(r'"user": "user"', r)
def test_password_prompt(self):
input.AuthCredentials._getpass = lambda self, prompt: 'password'
r = http(
'--auth',
'user',
'GET',
httpbin('/basic-auth/user/password')
)
self.assertIn(OK, r)
self.assertIn('"authenticated": true', r)
self.assertIn('"user": "user"', r)
def test_credentials_in_url(self):
url = httpbin('/basic-auth/user/password')
url = 'http://user:password@' + url.split('http://', 1)[1]
r = http(
'GET',
url
)
self.assertIn(OK, r)
self.assertIn('"authenticated": true', r)
self.assertIn('"user": "user"', r)
def test_credentials_in_url_auth_flag_has_priority(self):
"""When credentials are passed in URL and via -a at the same time,
then the ones from -a are used."""
url = httpbin('/basic-auth/user/password')
url = 'http://user:wrong_password@' + url.split('http://', 1)[1]
r = http(
'--auth=user:password',
'GET',
url
)
self.assertIn(OK, r)
self.assertIn('"authenticated": true', r)
self.assertIn('"user": "user"', r)
class ExitStatusTest(BaseTestCase):
def test_ok_response_exits_0(self):
r = http(
'GET',
httpbin('/status/200')
)
self.assertIn(OK, r)
self.assertEqual(r.exit_status, ExitStatus.OK)
def test_error_response_exits_0_without_check_status(self):
r = http(
'GET',
httpbin('/status/500')
)
self.assertIn('HTTP/1.1 500', r)
self.assertEqual(r.exit_status, ExitStatus.OK)
self.assertTrue(not r.stderr)
def test_timeout_exit_status(self):
r = http(
'--timeout=0.5',
'GET',
httpbin('/delay/1')
)
self.assertEqual(r.exit_status, ExitStatus.ERROR_TIMEOUT)
def test_3xx_check_status_exits_3_and_stderr_when_stdout_redirected(self):
r = http(
'--check-status',
'--headers', # non-terminal, force headers
'GET',
httpbin('/status/301'),
env=TestEnvironment(stdout_isatty=False,)
)
self.assertIn('HTTP/1.1 301', r)
self.assertEqual(r.exit_status, ExitStatus.ERROR_HTTP_3XX)
self.assertIn('301 moved permanently', r.stderr.lower())
@skipIf(requests_version == '0.13.6',
'Redirects with prefetch=False are broken in Requests 0.13.6')
def test_3xx_check_status_redirects_allowed_exits_0(self):
r = http(
'--check-status',
'--follow',
'GET',
httpbin('/status/301')
)
# The redirect will be followed so 200 is expected.
self.assertIn('HTTP/1.1 200 OK', r)
self.assertEqual(r.exit_status, ExitStatus.OK)
def test_4xx_check_status_exits_4(self):
r = http(
'--check-status',
'GET',
httpbin('/status/401')
)
self.assertIn('HTTP/1.1 401', r)
self.assertEqual(r.exit_status, ExitStatus.ERROR_HTTP_4XX)
# Also stderr should be empty since stdout isn't redirected.
self.assertTrue(not r.stderr)
def test_5xx_check_status_exits_5(self):
r = http(
'--check-status',
'GET',
httpbin('/status/500')
)
self.assertIn('HTTP/1.1 500', r)
self.assertEqual(r.exit_status, ExitStatus.ERROR_HTTP_5XX)
class WindowsOnlyTests(BaseTestCase):
@skip('FIXME: kills the runner')
#@skipIf(not is_windows, 'windows-only')
def test_windows_colorized_output(self):
# Spits out the colorized output.
http(httpbin('/get'), env=Environment())
class FakeWindowsTest(BaseTestCase):
def test_output_file_pretty_not_allowed_on_windows(self):
r = http(
'--output',
os.path.join(tempfile.gettempdir(), '__httpie_test_output__'),
'--pretty=all',
'GET',
httpbin('/get'),
env=TestEnvironment(is_windows=True)
)
self.assertIn(
'Only terminal output can be colorized on Windows', r.stderr)
class StreamTest(BaseTestCase):
# GET because httpbin 500s with binary POST body.
@skipIf(is_windows, 'Pretty redirect not supported under Windows')
def test_pretty_redirected_stream(self):
"""Test that --stream works with prettified redirected output."""
with open(BIN_FILE_PATH, 'rb') as f:
r = http(
'--verbose',
'--pretty=all',
'--stream',
'GET',
httpbin('/get'),
env=TestEnvironment(
colors=256,
stdin=f,
stdin_isatty=False,
stdout_isatty=False,
)
)
self.assertIn(BINARY_SUPPRESSED_NOTICE.decode(), r)
# We get 'Bad Request' but it's okay.
#self.assertIn(OK_COLOR, r)
def test_encoded_stream(self):
"""Test that --stream works with non-prettified
redirected terminal output."""
with open(BIN_FILE_PATH, 'rb') as f:
r = http(
'--pretty=none',
'--stream',
'--verbose',
'GET',
httpbin('/get'),
env=TestEnvironment(
stdin=f,
stdin_isatty=False
),
)
self.assertIn(BINARY_SUPPRESSED_NOTICE.decode(), r)
# We get 'Bad Request' but it's okay.
#self.assertIn(OK, r)
def test_redirected_stream(self):
"""Test that --stream works with non-prettified
redirected terminal output."""
with open(BIN_FILE_PATH, 'rb') as f:
r = http(
'--pretty=none',
'--stream',
'--verbose',
'GET',
httpbin('/get'),
env=TestEnvironment(
stdout_isatty=False,
stdin=f,
stdin_isatty=False
)
)
# We get 'Bad Request' but it's okay.
#self.assertIn(OK.encode(), r)
self.assertIn(BIN_FILE_CONTENT, r)
class IgnoreStdinTest(BaseTestCase):
def test_ignore_stdin(self):
with open(FILE_PATH) as f:
r = http(
'--ignore-stdin',
'--verbose',
httpbin('/get'),
env=TestEnvironment(stdin=f, stdin_isatty=False)
)
self.assertIn(OK, r)
self.assertIn('GET /get HTTP', r) # Don't default to POST.
self.assertNotIn(FILE_CONTENT, r) # Don't send stdin data.
def test_ignore_stdin_cannot_prompt_password(self):
r = http(
'--ignore-stdin',
'--auth=username-without-password',
httpbin('/get'),
)
self.assertEqual(r.exit_status, ExitStatus.ERROR)
self.assertIn('because --ignore-stdin', r.stderr)
class LineEndingsTest(BaseTestCase):
"""Test that CRLF is properly used in headers and
as the headers/body separator."""
def _validate_crlf(self, msg):
lines = iter(msg.splitlines(True))
for header in lines:
if header == CRLF:
break
self.assertTrue(header.endswith(CRLF), repr(header))
else:
self.fail('CRLF between headers and body not found in %r' % msg)
body = ''.join(lines)
self.assertNotIn(CRLF, body)
return body
def test_CRLF_headers_only(self):
r = http(
'--headers',
'GET',
httpbin('/get')
)
body = self._validate_crlf(r)
self.assertFalse(body, 'Garbage after headers: %r' % r)
def test_CRLF_ugly_response(self):
r = http(
'--pretty=none',
'GET',
httpbin('/get')
)
self._validate_crlf(r)
def test_CRLF_formatted_response(self):
r = http(
'--pretty=format',
'GET',
httpbin('/get')
)
self.assertEqual(r.exit_status, 0)
self._validate_crlf(r)
def test_CRLF_ugly_request(self):
r = http(
'--pretty=none',
'--print=HB',
'GET',
httpbin('/get')
)
self._validate_crlf(r)
def test_CRLF_formatted_request(self):
r = http(
'--pretty=format',
'--print=HB',
'GET',
httpbin('/get')
)
self._validate_crlf(r)
#################################################################
# CLI argument parsing related tests.
#################################################################
class ItemParsingTest(BaseTestCase):
def setUp(self):
self.key_value_type = input.KeyValueArgType(
*input.SEP_GROUP_ALL_ITEMS
)
def test_invalid_items(self):
items = ['no-separator']
for item in items:
self.assertRaises(argparse.ArgumentTypeError,
lambda: self.key_value_type(item))
def test_escape(self):
headers, data, files, params = input.parse_items([
# headers
self.key_value_type('foo\\:bar:baz'),
self.key_value_type('jack\\@jill:hill'),
# data
self.key_value_type('baz\\=bar=foo'),
# files
self.key_value_type('bar\\@baz@%s' % FILE_PATH_ARG)
])
# `requests.structures.CaseInsensitiveDict` => `dict`
headers = dict(headers._store.values())
self.assertDictEqual(headers, {
'foo:bar': 'baz',
'jack@jill': 'hill',
})
self.assertDictEqual(data, {
'baz=bar': 'foo',
})
self.assertIn('bar@baz', files)
def test_escape_longsep(self):
headers, data, files, params = input.parse_items([
self.key_value_type('bob\\:==foo'),
])
self.assertDictEqual(params, {
'bob:': 'foo',
})
def test_valid_items(self):
headers, data, files, params = input.parse_items([
self.key_value_type('string=value'),
self.key_value_type('header:value'),
self.key_value_type('list:=["a", 1, {}, false]'),
self.key_value_type('obj:={"a": "b"}'),
self.key_value_type('eh:'),
self.key_value_type('ed='),
self.key_value_type('bool:=true'),
self.key_value_type('file@' + FILE_PATH_ARG),
self.key_value_type('query==value'),
self.key_value_type('string-embed=@' + FILE_PATH_ARG),
self.key_value_type('raw-json-embed:=@' + JSON_FILE_PATH_ARG),
])
# Parsed headers
# `requests.structures.CaseInsensitiveDict` => `dict`
headers = dict(headers._store.values())
self.assertDictEqual(headers, {
'header': 'value',
'eh': ''
})
# Parsed data
raw_json_embed = data.pop('raw-json-embed')
self.assertDictEqual(raw_json_embed, json.loads(
JSON_FILE_CONTENT.decode('utf8')))
data['string-embed'] = data['string-embed'].strip()
self.assertDictEqual(dict(data), {
"ed": "",
"string": "value",
"bool": True,
"list": ["a", 1, {}, False],
"obj": {"a": "b"},
"string-embed": FILE_CONTENT,
})
# Parsed query string parameters
self.assertDictEqual(params, {
'query': 'value',
})
# Parsed file fields
self.assertIn('file', files)
self.assertEqual(files['file'][1].read().strip().decode('utf8'),
FILE_CONTENT)
class CLIParserTestCase(unittest.TestCase):
def test_expand_localhost_shorthand(self):
args = parser.parse_args(args=[':'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost')
def test_expand_localhost_shorthand_with_slash(self):
args = parser.parse_args(args=[':/'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost/')
def test_expand_localhost_shorthand_with_port(self):
args = parser.parse_args(args=[':3000'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost:3000')
def test_expand_localhost_shorthand_with_path(self):
args = parser.parse_args(args=[':/path'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost/path')
def test_expand_localhost_shorthand_with_port_and_slash(self):
args = parser.parse_args(args=[':3000/'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost:3000/')
def test_expand_localhost_shorthand_with_port_and_path(self):
args = parser.parse_args(args=[':3000/path'], env=TestEnvironment())
self.assertEqual(args.url, 'http://localhost:3000/path')
def test_dont_expand_shorthand_ipv6_as_shorthand(self):
args = parser.parse_args(args=['::1'], env=TestEnvironment())
self.assertEqual(args.url, 'http://::1')
def test_dont_expand_longer_ipv6_as_shorthand(self):
args = parser.parse_args(
args=['::ffff:c000:0280'],
env=TestEnvironment()
)
self.assertEqual(args.url, 'http://::ffff:c000:0280')
def test_dont_expand_full_ipv6_as_shorthand(self):
args = parser.parse_args(
args=['0000:0000:0000:0000:0000:0000:0000:0001'],
env=TestEnvironment()
)
self.assertEqual(
args.url,
'http://0000:0000:0000:0000:0000:0000:0000:0001'
)
class ArgumentParserTestCase(unittest.TestCase):
def setUp(self):
self.parser = input.Parser()
def test_guess_when_method_set_and_valid(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'GET'
self.parser.args.url = 'http://example.com/'
self.parser.args.items = []
self.parser.args.ignore_stdin = False
self.parser.env = TestEnvironment()
self.parser._guess_method()
self.assertEqual(self.parser.args.method, 'GET')
self.assertEqual(self.parser.args.url, 'http://example.com/')
self.assertEqual(self.parser.args.items, [])
def test_guess_when_method_not_set(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = None
self.parser.args.url = 'http://example.com/'
self.parser.args.items = []
self.parser.args.ignore_stdin = False
self.parser.env = TestEnvironment()
self.parser._guess_method()
self.assertEqual(self.parser.args.method, 'GET')
self.assertEqual(self.parser.args.url, 'http://example.com/')
self.assertEqual(self.parser.args.items, [])
def test_guess_when_method_set_but_invalid_and_data_field(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'data=field'
self.parser.args.items = []
self.parser.args.ignore_stdin = False
self.parser.env = TestEnvironment()
self.parser._guess_method()
self.assertEqual(self.parser.args.method, 'POST')
self.assertEqual(self.parser.args.url, 'http://example.com/')
self.assertEqual(
self.parser.args.items,
[input.KeyValue(
key='data', value='field', sep='=', orig='data=field')])
def test_guess_when_method_set_but_invalid_and_header_field(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'test:header'
self.parser.args.items = []
self.parser.args.ignore_stdin = False
self.parser.env = TestEnvironment()
self.parser._guess_method()
self.assertEqual(self.parser.args.method, 'GET')
self.assertEqual(self.parser.args.url, 'http://example.com/')
self.assertEqual(
self.parser.args.items,
[input.KeyValue(
key='test', value='header', sep=':', orig='test:header')])
def test_guess_when_method_set_but_invalid_and_item_exists(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'new_item=a'
self.parser.args.items = [
input.KeyValue(
key='old_item', value='b', sep='=', orig='old_item=b')
]
self.parser.args.ignore_stdin = False
self.parser.env = TestEnvironment()
self.parser._guess_method()
self.assertEqual(self.parser.args.items, [
input.KeyValue(
key='new_item', value='a', sep='=', orig='new_item=a'),
input.KeyValue(
key='old_item', value='b', sep='=', orig='old_item=b'),
])
class TestNoOptions(BaseTestCase):
def test_valid_no_options(self):
r = http(
'--verbose',
'--no-verbose',
'GET',
httpbin('/get')
)
self.assertNotIn('GET /get HTTP/1.1', r)
def test_invalid_no_options(self):
r = http(
'--no-war',
'GET',
httpbin('/get')
)
self.assertEqual(r.exit_status, 1)
self.assertIn('unrecognized arguments: --no-war', r.stderr)
self.assertNotIn('GET /get HTTP/1.1', r)
class READMETest(BaseTestCase):
@skipIf(not has_docutils(), 'docutils not installed')
def test_README_reStructuredText_valid(self):
errors = get_readme_errors()
self.assertFalse(errors, msg=errors)
class SessionTest(BaseTestCase):
@property
def env(self):
return TestEnvironment(config_dir=self.config_dir)
def setUp(self):
# Start a full-blown session with a custom request header,
# authorization, and response cookies.
self.config_dir = mk_config_dir()
r = http(
'--follow',
'--session=test',
'--auth=username:password',
'GET',
httpbin('/cookies/set?hello=world'),
'Hello:World',
env=self.env
)
self.assertIn(OK, r)
def tearDown(self):
shutil.rmtree(self.config_dir)
def test_session_create(self):
# Verify that the session has been created.
r = http(
'--session=test',
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r)
self.assertEqual(r.json['headers']['Hello'], 'World')
self.assertEqual(r.json['headers']['Cookie'], 'hello=world')
self.assertIn('Basic ', r.json['headers']['Authorization'])
def test_session_ignored_header_prefixes(self):
r = http(
'--session=test',
'GET',
httpbin('/get'),
'Content-Type: text/plain',
'If-Unmodified-Since: Sat, 29 Oct 1994 19:43:31 GMT',
env=self.env
)
self.assertIn(OK, r)
r2 = http(
'--session=test',
'GET',
httpbin('/get')
)
self.assertIn(OK, r2)
self.assertNotIn('Content-Type', r2.json['headers'])
self.assertNotIn('If-Unmodified-Since', r2.json['headers'])
def test_session_update(self):
# Get a response to a request from the original session.
r1 = http(
'--session=test',
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r1)
# Make a request modifying the session data.
r2 = http(
'--follow',
'--session=test',
'--auth=username:password2',
'GET',
httpbin('/cookies/set?hello=world2'),
'Hello:World2',
env=self.env
)
self.assertIn(OK, r2)
# Get a response to a request from the updated session.
r3 = http(
'--session=test',
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r3)
self.assertEqual(r3.json['headers']['Hello'], 'World2')
self.assertEqual(r3.json['headers']['Cookie'], 'hello=world2')
self.assertNotEqual(r1.json['headers']['Authorization'],
r3.json['headers']['Authorization'])
def test_session_read_only(self):
# Get a response from the original session.
r1 = http(
'--session=test',
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r1)
# Make a request modifying the session data but
# with --session-read-only.
r2 = http(
'--follow',
'--session-read-only=test',
'--auth=username:password2',
'GET',
httpbin('/cookies/set?hello=world2'),
'Hello:World2',
env=self.env
)
self.assertIn(OK, r2)
# Get a response from the updated session.
r3 = http(
'--session=test',
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r3)
# Origin can differ on Travis.
del r1.json['origin'], r3.json['origin']
# Should be the same as before r2.
self.assertDictEqual(r1.json, r3.json)
def test_session_by_path(self):
session_path = os.path.join(self.config_dir, 'session-by-path.json')
r1 = http(
'--session=' + session_path,
'GET',
httpbin('/get'),
'Foo:Bar',
env=self.env
)
self.assertIn(OK, r1)
r2 = http(
'--session=' + session_path,
'GET',
httpbin('/get'),
env=self.env
)
self.assertIn(OK, r2)
self.assertEqual(r2.json['headers']['Foo'], 'Bar')
class DownloadUtilsTest(BaseTestCase):
def test_Content_Range_parsing(self):
parse = parse_content_range
self.assertEqual(parse('bytes 100-199/200', 100), 200)
self.assertEqual(parse('bytes 100-199/*', 100), 200)
# missing
self.assertRaises(ContentRangeError, parse, None, 100)
# syntax error
self.assertRaises(ContentRangeError, parse, 'beers 100-199/*', 100)
# unexpected range
self.assertRaises(ContentRangeError, parse, 'bytes 100-199/*', 99)
# invalid instance-length
self.assertRaises(ContentRangeError, parse, 'bytes 100-199/199', 100)
# invalid byte-range-resp-spec
self.assertRaises(ContentRangeError, parse, 'bytes 100-99/199', 100)
# invalid byte-range-resp-spec
self.assertRaises(ContentRangeError, parse, 'bytes 100-100/*', 100)
def test_Content_Disposition_parsing(self):
parse = filename_from_content_disposition
self.assertEqual(
parse('attachment; filename=hello-WORLD_123.txt'),
'hello-WORLD_123.txt'
)
self.assertEqual(
parse('attachment; filename=".hello-WORLD_123.txt"'),
'hello-WORLD_123.txt'
)
self.assertEqual(
parse('attachment; filename="white space.txt"'),
'white space.txt'
)
self.assertEqual(
parse(r'attachment; filename="\"quotes\".txt"'),
'"quotes".txt'
)
self.assertEqual(parse('attachment; filename=/etc/hosts'), 'hosts')
self.assertIsNone(parse('attachment; filename='))
def test_filename_from_url(self):
self.assertEqual(filename_from_url(
url='http://example.org/foo',
content_type='text/plain'
), 'foo.txt')
self.assertEqual(filename_from_url(
url='http://example.org/foo',
content_type='text/html; charset=utf8'
), 'foo.html')
self.assertEqual(filename_from_url(
url='http://example.org/foo',
content_type=None
), 'foo')
self.assertEqual(filename_from_url(
url='http://example.org/foo',
content_type='x-foo/bar'
), 'foo')
def test_unique_filename(self):
def make_exists(unique_on_attempt=0):
# noinspection PyUnresolvedReferences,PyUnusedLocal
def exists(filename):
if exists.attempt == unique_on_attempt:
return False
exists.attempt += 1
return True
exists.attempt = 0
return exists
self.assertEqual(
get_unique_filename('foo.bar', exists=make_exists()),
'foo.bar'
)
self.assertEqual(
get_unique_filename('foo.bar', exists=make_exists(1)),
'foo.bar-1'
)
self.assertEqual(
get_unique_filename('foo.bar', exists=make_exists(10)),
'foo.bar-10'
)
class Response(object):
# noinspection PyDefaultArgument
def __init__(self, url, headers={}, status_code=200):
self.url = url
self.headers = CaseInsensitiveDict(headers)
self.status_code = status_code
# noinspection PyTypeChecker
class DownloadTest(BaseTestCase):
# TODO: more tests
def test_actual_download(self):
url = httpbin('/robots.txt')
body = urlopen(url).read().decode()
r = http(
'--download',
url,
env=TestEnvironment(
stdin_isatty=True,
stdout_isatty=False
)
)
self.assertIn('Downloading', r.stderr)
self.assertIn('[K', r.stderr)
self.assertIn('Done', r.stderr)
self.assertEqual(body, r)
def test_download_with_Content_Length(self):
download = Download(output_file=open(os.devnull, 'w'))
download.start(Response(
url=httpbin('/'),
headers={'Content-Length': 10}
))
time.sleep(1.1)
download.chunk_downloaded(b'12345')
time.sleep(1.1)
download.chunk_downloaded(b'12345')
download.finish()
self.assertFalse(download.interrupted)
def test_download_no_Content_Length(self):
download = Download(output_file=open(os.devnull, 'w'))
download.start(Response(url=httpbin('/')))
time.sleep(1.1)
download.chunk_downloaded(b'12345')
download.finish()
self.assertFalse(download.interrupted)
def test_download_interrupted(self):
download = Download(output_file=open(os.devnull, 'w'))
download.start(Response(
url=httpbin('/'),
headers={'Content-Length': 5}
))
download.chunk_downloaded(b'1234')
download.finish()
self.assertTrue(download.interrupted)
if __name__ == '__main__':
unittest.main()