This commit is contained in:
Mickaël Schoentgen 2021-10-08 10:45:49 +02:00
parent 50f57f8c82
commit 279e387d86
21 changed files with 3790 additions and 2 deletions

View File

@ -75,6 +75,8 @@ class HTTPieArgumentParser(argparse.ArgumentParser):
) -> argparse.Namespace: ) -> argparse.Namespace:
self.env = env self.env = env
self.args, no_options = super().parse_known_args(args, namespace) self.args, no_options = super().parse_known_args(args, namespace)
if self.args.prompt:
return self.args
if self.args.debug: if self.args.debug:
self.args.traceback = True self.args.traceback = True
self.has_stdin_data = ( self.has_stdin_data = (

View File

@ -2,7 +2,7 @@
CLI arguments definition. CLI arguments definition.
""" """
from argparse import (FileType, OPTIONAL, SUPPRESS, ZERO_OR_MORE) from argparse import FileType, OPTIONAL, SUPPRESS, ZERO_OR_MORE
from textwrap import dedent, wrap from textwrap import dedent, wrap
from .. import __doc__, __version__ from .. import __doc__, __version__
@ -73,6 +73,7 @@ positional.add_argument(
positional.add_argument( positional.add_argument(
dest='url', dest='url',
metavar='URL', metavar='URL',
nargs=OPTIONAL,
help=''' help='''
The scheme defaults to 'http://' if the URL does not include one. The scheme defaults to 'http://' if the URL does not include one.
(You can override this with: --default-scheme=https) (You can override this with: --default-scheme=https)
@ -840,3 +841,12 @@ troubleshooting.add_argument(
''' '''
) )
troubleshooting.add_argument(
'--prompt',
action='store_true',
default=False,
help='''
Start the shell!
'''
)

View File

@ -29,6 +29,10 @@ def main(args: List[Union[str, bytes]] = sys.argv, env=Environment()) -> ExitSta
Return exit status code. Return exit status code.
""" """
if '--prompt' in args:
from .prompt.cli import cli
return cli(sys.argv[2:])
program_name, *args = args program_name, *args = args
env.program_name = os.path.basename(program_name) env.program_name = os.path.basename(program_name)
args = decode_raw_args(args, env.stdin_encoding) args = decode_raw_args(args, env.stdin_encoding)

1
httpie/prompt Submodule

@ -0,0 +1 @@
Subproject commit 8922a77156a7dc96bac9e3e94fe900bb17f976c2

View File

@ -9,6 +9,7 @@ import httpie
# Note: keep requirements here to ease distributions packaging # Note: keep requirements here to ease distributions packaging
tests_require = [ tests_require = [
'pexpect',
'pytest', 'pytest',
'pytest-httpbin>=0.0.6', 'pytest-httpbin>=0.0.6',
'responses', 'responses',
@ -20,12 +21,12 @@ dev_require = [
'flake8-deprecated', 'flake8-deprecated',
'flake8-mutable', 'flake8-mutable',
'flake8-tuple', 'flake8-tuple',
'jinja2',
'pyopenssl', 'pyopenssl',
'pytest-cov', 'pytest-cov',
'pyyaml', 'pyyaml',
'twine', 'twine',
'wheel', 'wheel',
'Jinja2'
] ]
install_requires = [ install_requires = [
'charset_normalizer>=2.0.0', 'charset_normalizer>=2.0.0',
@ -34,6 +35,11 @@ install_requires = [
'Pygments>=2.5.2', 'Pygments>=2.5.2',
'requests-toolbelt>=0.9.1', 'requests-toolbelt>=0.9.1',
'setuptools', 'setuptools',
# Prompt
'click>=5.0',
'parsimonious>=0.6.2',
'prompt-toolkit>=2.0.0,<3.0.0',
'pyyaml>=3.0',
] ]
install_requires_win_only = [ install_requires_win_only = [
'colorama>=0.2.4', 'colorama>=0.2.4',
@ -79,6 +85,7 @@ setup(
'console_scripts': [ 'console_scripts': [
'http = httpie.__main__:main', 'http = httpie.__main__:main',
'https = httpie.__main__:main', 'https = httpie.__main__:main',
'http-prompt=httpie.prompt.cli:cli',
], ],
}, },
python_requires='>=3.6', python_requires='>=3.6',

0
tests/prompt/__init__.py Normal file
View File

59
tests/prompt/base.py Normal file
View File

@ -0,0 +1,59 @@
import os
import shutil
import sys
import tempfile
import unittest
class TempAppDirTestCase(unittest.TestCase):
"""Set up temporary app data and config directories before every test
method, and delete them afterwards.
"""
def setUp(self):
# Create a temp dir that will contain data and config directories
self.temp_dir = tempfile.mkdtemp()
if sys.platform == 'win32':
self.homes = {
# subdir_name: envvar_name
'data': 'LOCALAPPDATA',
'config': 'LOCALAPPDATA'
}
else:
self.homes = {
# subdir_name: envvar_name
'data': 'XDG_DATA_HOME',
'config': 'XDG_CONFIG_HOME'
}
# Used to restore
self.orig_envvars = {}
for subdir_name, envvar_name in self.homes.items():
if envvar_name in os.environ:
self.orig_envvars[envvar_name] = os.environ[envvar_name]
os.environ[envvar_name] = os.path.join(self.temp_dir, subdir_name)
def tearDown(self):
# Restore envvar values
for name in self.homes.values():
if name in self.orig_envvars:
os.environ[name] = self.orig_envvars[name]
else:
del os.environ[name]
shutil.rmtree(self.temp_dir)
def make_tempfile(self, data='', subdir_name=''):
"""Create a file under self.temp_dir and return the path."""
full_tempdir = os.path.join(self.temp_dir, subdir_name)
if not os.path.exists(full_tempdir):
os.makedirs(full_tempdir)
if isinstance(data, str):
data = data.encode()
with tempfile.NamedTemporaryFile(dir=full_tempdir, delete=False) as f:
f.write(data)
return f.name

View File

@ -0,0 +1,161 @@
from httpie.prompt.context import Context
def test_creation():
context = Context('http://example.com')
assert context.url == 'http://example.com'
assert context.options == {}
assert context.headers == {}
assert context.querystring_params == {}
assert context.body_params == {}
assert not context.should_exit
def test_creation_with_longer_url():
context = Context('http://example.com/a/b/c/index.html')
assert context.url == 'http://example.com/a/b/c/index.html'
assert context.options == {}
assert context.headers == {}
assert context.querystring_params == {}
assert context.body_params == {}
assert not context.should_exit
def test_eq():
c1 = Context('http://localhost')
c2 = Context('http://localhost')
assert c1 == c2
c1.options['--verify'] = 'no'
assert c1 != c2
def test_copy():
c1 = Context('http://localhost')
c2 = c1.copy()
assert c1 == c2
assert c1 is not c2
def test_update():
c1 = Context('http://localhost')
c1.headers['Accept'] = 'application/json'
c1.querystring_params['flag'] = '1'
c1.body_params.update({
'name': 'John Doe',
'email': 'john@example.com'
})
c2 = Context('http://example.com')
c2.headers['Content-Type'] = 'text/html'
c2.body_params['name'] = 'John Smith'
c1.update(c2)
assert c1.url == 'http://example.com'
assert c1.headers == {
'Accept': 'application/json',
'Content-Type': 'text/html'
}
assert c1.querystring_params == {'flag': '1'}
assert c1.body_params == {
'name': 'John Smith',
'email': 'john@example.com'
}
def test_spec():
c = Context('http://localhost', spec={
'paths': {
'/users': {
'get': {
'parameters': [
{'name': 'username', 'in': 'path'},
{'name': 'since', 'in': 'query'},
{'name': 'Accept'}
]
}
},
'/orgs/{org}': {
'get': {
'parameters': [
{'name': 'org', 'in': 'path'},
{'name': 'featured', 'in': 'query'},
{'name': 'X-Foo', 'in': 'header'}
]
}
}
}
})
assert c.url == 'http://localhost'
root_children = list(sorted(c.root.children))
assert len(root_children) == 2
assert root_children[0].name == 'orgs'
assert root_children[1].name == 'users'
orgs_children = list(sorted(root_children[0].children))
assert len(orgs_children) == 1
org_children = list(sorted(list(orgs_children)[0].children))
assert len(org_children) == 2
assert org_children[0].name == 'X-Foo'
assert org_children[1].name == 'featured'
users_children = list(sorted(root_children[1].children))
assert len(users_children) == 2
assert users_children[0].name == 'Accept'
assert users_children[1].name == 'since'
def test_override():
"""Parameters can be defined at path level
"""
c = Context('http://localhost', spec={
'paths': {
'/users': {
'parameters': [
{'name': 'username', 'in': 'query'},
{'name': 'Accept', 'in': 'header'}
],
'get': {
'parameters': [
{'name': 'custom1', 'in': 'query'}
]
},
'post': {
'parameters': [
{'name': 'custom2', 'in': 'query'},
]
},
},
'/orgs': {
'parameters': [
{'name': 'username', 'in': 'query'},
{'name': 'Accept', 'in': 'header'}
],
'get': {}
}
}
})
assert c.url == 'http://localhost'
root_children = list(sorted(c.root.children))
# one path
assert len(root_children) == 2
assert root_children[0].name == 'orgs'
assert root_children[1].name == 'users'
orgs_methods = list(sorted(list(root_children)[0].children))
# path parameters are used even if no method parameter
assert len(orgs_methods) == 2
assert next(filter(lambda i: i.name == 'username', orgs_methods), None) is not None
assert next(filter(lambda i: i.name == 'Accept', orgs_methods), None) is not None
users_methods = list(sorted(list(root_children)[1].children))
# path and methods parameters are merged
assert len(users_methods) == 4
assert next(filter(lambda i: i.name == 'username', users_methods), None) is not None
assert next(filter(lambda i: i.name == 'custom1', users_methods), None) is not None
assert next(filter(lambda i: i.name == 'custom2', users_methods), None) is not None
assert next(filter(lambda i: i.name == 'Accept', users_methods), None) is not None

View File

@ -0,0 +1,162 @@
from httpie.prompt.context import Context
from httpie.prompt.context import transform as t
def test_extract_args_for_httpie_main_get():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.querystring_params.update({
'page': '2',
'limit': '10'
})
args = t.extract_args_for_httpie_main(c, method='get')
assert args == ['GET', 'http://localhost/things', 'limit==10', 'page==2',
'Accept:text/html', 'Authorization:ApiKey 1234']
def test_extract_args_for_httpie_main_post():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.options.update({
'--verify': 'no',
'--form': None
})
c.body_params.update({
'full name': 'Jane Doe',
'email': 'jane@example.com'
})
args = t.extract_args_for_httpie_main(c, method='post')
assert args == ['--form', '--verify', 'no',
'POST', 'http://localhost/things',
'email=jane@example.com', 'full name=Jane Doe',
'Accept:text/html', 'Authorization:ApiKey 1234']
def test_extract_raw_json_args_for_httpie_main_post():
c = Context('http://localhost/things')
c.body_json_params.update({
'enabled': True,
'items': ['foo', 'bar'],
'object': {
'id': 10,
'name': 'test'
}
})
args = t.extract_args_for_httpie_main(c, method='post')
assert args == ['POST', 'http://localhost/things',
'enabled:=true', 'items:=["foo", "bar"]',
'object:={"id": 10, "name": "test"}']
def test_format_to_httpie_get():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.querystring_params.update({
'page': '2',
'limit': '10',
'name': ['alice', 'bob bob']
})
output = t.format_to_httpie(c, method='get')
assert output == ("http GET http://localhost/things "
"limit==10 name==alice 'name==bob bob' page==2 "
"Accept:text/html 'Authorization:ApiKey 1234'\n")
def test_format_to_httpie_post():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.options.update({
'--verify': 'no',
'--form': None
})
c.body_params.update({
'full name': 'Jane Doe',
'email': 'jane@example.com'
})
output = t.format_to_httpie(c, method='post')
assert output == ("http --form --verify=no POST http://localhost/things "
"email=jane@example.com 'full name=Jane Doe' "
"Accept:text/html 'Authorization:ApiKey 1234'\n")
def test_format_to_http_prompt_1():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.querystring_params.update({
'page': '2',
'limit': '10'
})
output = t.format_to_http_prompt(c)
assert output == ("cd http://localhost/things\n"
"limit==10\n"
"page==2\n"
"Accept:text/html\n"
"'Authorization:ApiKey 1234'\n")
def test_format_to_http_prompt_2():
c = Context('http://localhost/things')
c.headers.update({
'Authorization': 'ApiKey 1234',
'Accept': 'text/html'
})
c.options.update({
'--verify': 'no',
'--form': None
})
c.body_params.update({
'full name': 'Jane Doe',
'email': 'jane@example.com'
})
output = t.format_to_http_prompt(c)
assert output == ("--form\n"
"--verify=no\n"
"cd http://localhost/things\n"
"email=jane@example.com\n"
"'full name=Jane Doe'\n"
"Accept:text/html\n"
"'Authorization:ApiKey 1234'\n")
def test_format_raw_json_string_to_http_prompt():
c = Context('http://localhost/things')
c.body_json_params.update({
'bar': 'baz',
})
output = t.format_to_http_prompt(c)
assert output == ("cd http://localhost/things\n"
"bar:='\"baz\"'\n")
def test_extract_httpie_options():
c = Context('http://localhost')
c.options.update({
'--verify': 'no',
'--form': None
})
output = t._extract_httpie_options(c, excluded_keys=['--form'])
assert output == ['--verify', 'no']

319
tests/prompt/test_cli.py Normal file
View File

@ -0,0 +1,319 @@
import json
import os
import sys
import unittest
from unittest.mock import patch, DEFAULT
from click.testing import CliRunner
from requests.models import Response
from .base import TempAppDirTestCase
from httpie.prompt import xdg
from httpie.prompt.context import Context
from httpie.prompt.cli import cli, execute, ExecutionListener
def run_and_exit(cli_args=None, prompt_commands=None):
"""Run http-prompt executable, execute some prompt commands, and exit."""
if cli_args is None:
cli_args = []
# Make sure last command is 'exit'
if prompt_commands is None:
prompt_commands = ['exit']
else:
prompt_commands += ['exit']
# Fool cli() so that it believes we're running from CLI instead of pytest.
# We will restore it at the end of the function.
orig_argv = sys.argv
sys.argv = ['http-prompt'] + cli_args
try:
with patch.multiple('httpie.prompt.cli',
prompt=DEFAULT, execute=DEFAULT) as mocks:
mocks['execute'].side_effect = execute
# prompt() is mocked to return the command in 'prompt_commands' in
# sequence, i.e., prompt() returns prompt_commands[i-1] when it is
# called for the ith time
mocks['prompt'].side_effect = prompt_commands
result = CliRunner().invoke(cli, cli_args)
context = mocks['execute'].call_args[0][1]
return result, context
finally:
sys.argv = orig_argv
class TestCli(TempAppDirTestCase):
def test_without_args(self):
result, context = run_and_exit(['http://localhost'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://localhost')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {})
def test_incomplete_url1(self):
result, context = run_and_exit(['://example.com'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {})
def test_incomplete_url2(self):
result, context = run_and_exit(['//example.com'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {})
def test_incomplete_url3(self):
result, context = run_and_exit(['example.com'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {})
def test_httpie_oprions(self):
url = 'http://example.com'
custom_args = '--auth value: name=foo'
result, context = run_and_exit([url] + custom_args.split())
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {'--auth': 'value:'})
self.assertEqual(context.body_params, {'name': 'foo'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {})
def test_persistent_context(self):
result, context = run_and_exit(['//example.com', 'name=bob', 'id==10'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'bob'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
result, context = run_and_exit()
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'bob'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
def test_cli_args_bypasses_persistent_context(self):
result, context = run_and_exit(['//example.com', 'name=bob', 'id==10'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'bob'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
result, context = run_and_exit(['//example.com', 'sex=M'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'sex': 'M'})
self.assertEqual(context.headers, {})
def test_config_file(self):
# Config file is not there at the beginning
config_path = os.path.join(xdg.get_config_dir(), 'config.py')
self.assertFalse(os.path.exists(config_path))
# After user runs it for the first time, a default config file should
# be created
result, context = run_and_exit(['//example.com'])
self.assertEqual(result.exit_code, 0)
self.assertTrue(os.path.exists(config_path))
def test_cli_arguments_with_spaces(self):
result, context = run_and_exit(['example.com', "name=John Doe",
"Authorization:Bearer API KEY"])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.querystring_params, {})
self.assertEqual(context.body_params, {'name': 'John Doe'})
self.assertEqual(context.headers, {'Authorization': 'Bearer API KEY'})
def test_spec_from_local(self):
spec_filepath = self.make_tempfile(json.dumps({
'paths': {
'/users': {},
'/orgs': {}
}
}))
result, context = run_and_exit(['example.com', "--spec",
spec_filepath])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(set([n.name for n in context.root.children]),
set(['users', 'orgs']))
def test_spec_basePath(self):
spec_filepath = self.make_tempfile(json.dumps({
'basePath': '/api/v1',
'paths': {
'/users': {},
'/orgs': {}
}
}))
result, context = run_and_exit(['example.com', "--spec",
spec_filepath])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
lv1_names = set([node.name for node in context.root.ls()])
lv2_names = set([node.name for node in context.root.ls('api')])
lv3_names = set([node.name for node in context.root.ls('api', 'v1')])
self.assertEqual(lv1_names, set(['api']))
self.assertEqual(lv2_names, set(['v1']))
self.assertEqual(lv3_names, set(['users', 'orgs']))
def test_spec_from_http(self):
spec_url = 'https://raw.githubusercontent.com/github/rest-api-description/main/descriptions/api.github.com/api.github.com.json'
result, context = run_and_exit(['https://api.github.com', '--spec',
spec_url])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'https://api.github.com')
top_level_paths = set([n.name for n in context.root.children])
self.assertIn('repos', top_level_paths)
self.assertIn('users', top_level_paths)
def test_spec_from_http_only(self):
spec_url = (
'https://api.apis.guru/v2/specs/medium.com/1.0.0/swagger.json')
result, context = run_and_exit(['--spec', spec_url])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'https://api.medium.com/v1')
lv1_names = set([node.name for node in context.root.ls()])
lv2_names = set([node.name for node in context.root.ls('v1')])
self.assertEqual(lv1_names, set(['v1']))
self.assertEqual(lv2_names, set(['me', 'publications', 'users']))
def test_spec_with_trailing_slash(self):
spec_filepath = self.make_tempfile(json.dumps({
'basePath': '/api',
'paths': {
'/': {},
'/users/': {}
}
}))
result, context = run_and_exit(['example.com', "--spec",
spec_filepath])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
lv1_names = set([node.name for node in context.root.ls()])
lv2_names = set([node.name for node in context.root.ls('api')])
self.assertEqual(lv1_names, set(['api']))
self.assertEqual(lv2_names, set(['/', 'users/']))
def test_env_only(self):
env_filepath = self.make_tempfile(
"cd http://example.com\nname=bob\nid==10")
result, context = run_and_exit(["--env", env_filepath])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'bob'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
def test_env_with_url(self):
env_filepath = self.make_tempfile(
"cd http://example.com\nname=bob\nid==10")
result, context = run_and_exit(["--env", env_filepath,
'other_example.com'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://other_example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'bob'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
def test_env_with_options(self):
env_filepath = self.make_tempfile(
"cd http://example.com\nname=bob\nid==10")
result, context = run_and_exit(["--env", env_filepath,
'other_example.com', 'name=alice'])
self.assertEqual(result.exit_code, 0)
self.assertEqual(context.url, 'http://other_example.com')
self.assertEqual(context.options, {})
self.assertEqual(context.body_params, {'name': 'alice'})
self.assertEqual(context.headers, {})
self.assertEqual(context.querystring_params, {'id': ['10']})
@patch('httpie.prompt.cli.prompt')
@patch('httpie.prompt.cli.execute')
def test_press_ctrl_d(self, execute_mock, prompt_mock):
prompt_mock.side_effect = EOFError
execute_mock.side_effect = execute
result = CliRunner().invoke(cli, [])
self.assertEqual(result.exit_code, 0)
class TestExecutionListenerSetCookies(unittest.TestCase):
def setUp(self):
self.listener = ExecutionListener({})
self.response = Response()
self.response.cookies.update({
'username': 'john',
'sessionid': 'abcd'
})
self.context = Context('http://localhost')
self.context.headers['Cookie'] = 'name="John Doe"; sessionid=xyz'
def test_auto(self):
self.listener.cfg['set_cookies'] = 'auto'
self.listener.response_returned(self.context, self.response)
self.assertEqual(self.context.headers['Cookie'],
'name="John Doe"; sessionid=abcd; username=john')
@patch('httpie.prompt.cli.click.confirm')
def test_ask_and_yes(self, confirm_mock):
confirm_mock.return_value = True
self.listener.cfg['set_cookies'] = 'ask'
self.listener.response_returned(self.context, self.response)
self.assertEqual(self.context.headers['Cookie'],
'name="John Doe"; sessionid=abcd; username=john')
@patch('httpie.prompt.cli.click.confirm')
def test_ask_and_no(self, confirm_mock):
confirm_mock.return_value = False
self.listener.cfg['set_cookies'] = 'ask'
self.listener.response_returned(self.context, self.response)
self.assertEqual(self.context.headers['Cookie'],
'name="John Doe"; sessionid=xyz')
def test_off(self):
self.listener.cfg['set_cookies'] = 'off'
self.listener.response_returned(self.context, self.response)
self.assertEqual(self.context.headers['Cookie'],
'name="John Doe"; sessionid=xyz')

View File

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import unittest
from prompt_toolkit.document import Document
from httpie.prompt.completer import HttpPromptCompleter
from httpie.prompt.context import Context
class TestCompleter(unittest.TestCase):
def setUp(self):
self.context = Context('http://localhost', spec={
'paths': {
'/users': {},
'/users/{username}': {},
'/users/{username}/events': {},
'/users/{username}/orgs': {},
'/orgs': {},
'/orgs/{org}': {},
'/orgs/{org}/events': {},
'/orgs/{org}/members': {}
}
})
self.completer = HttpPromptCompleter(self.context)
self.completer_event = None
def get_completions(self, command):
if not isinstance(command, str):
command = command.decode()
position = len(command)
completions = self.completer.get_completions(
Document(text=command, cursor_position=position),
self.completer_event)
return [c.text for c in completions]
def test_header_name(self):
result = self.get_completions('ctype')
self.assertEqual(result[0], 'Content-Type')
def test_header_value(self):
result = self.get_completions('Content-Type:json')
self.assertEqual(result[0], 'application/json')
def test_verify_option(self):
result = self.get_completions('--vfy')
self.assertEqual(result[0], '--verify')
def test_preview_then_action(self):
result = self.get_completions('httpie po')
self.assertEqual(result[0], 'post')
def test_rm_body_param(self):
self.context.body_params['my_name'] = 'dont_care'
result = self.get_completions('rm -b ')
self.assertEqual(result[0], 'my_name')
def test_rm_body_json_param(self):
self.context.body_json_params['number'] = 2
result = self.get_completions('rm -b ')
self.assertEqual(result[0], 'number')
def test_rm_querystring_param(self):
self.context.querystring_params['my_name'] = 'dont_care'
result = self.get_completions('rm -q ')
self.assertEqual(result[0], 'my_name')
def test_rm_header(self):
self.context.headers['Accept'] = 'dont_care'
result = self.get_completions('rm -h ')
self.assertEqual(result[0], 'Accept')
def test_rm_option(self):
self.context.options['--form'] = None
result = self.get_completions('rm -o ')
self.assertEqual(result[0], '--form')
def test_querystring_with_chinese(self):
result = self.get_completions('name==王')
self.assertFalse(result)
def test_header_with_spanish(self):
result = self.get_completions('X-Custom-Header:Jesú')
self.assertFalse(result)
def test_options_method(self):
result = self.get_completions('opt')
self.assertEqual(result[0], 'options')
def test_ls_no_path(self):
result = self.get_completions('ls ')
self.assertEqual(result, ['orgs', 'users'])
def test_ls_no_path_substring(self):
result = self.get_completions('ls o')
self.assertEqual(result, ['orgs'])
def test_ls_absolute_path(self):
result = self.get_completions('ls /users/1/')
self.assertEqual(result, ['events', 'orgs'])
def test_ls_absolute_path_substring(self):
result = self.get_completions('ls /users/1/e')
self.assertEqual(result, ['events'])
def test_ls_relative_path(self):
self.context.url = 'http://localhost/orgs'
result = self.get_completions('ls 1/')
self.assertEqual(result, ['events', 'members'])
def test_cd_no_path(self):
result = self.get_completions('cd ')
self.assertEqual(result, ['orgs', 'users'])
def test_cd_no_path_substring(self):
result = self.get_completions('cd o')
self.assertEqual(result, ['orgs'])
def test_cd_absolute_path(self):
result = self.get_completions('cd /users/1/')
self.assertEqual(result, ['events', 'orgs'])
def test_cd_absolute_path_substring(self):
result = self.get_completions('cd /users/1/e')
self.assertEqual(result, ['events'])
def test_cd_relative_path(self):
self.context.url = 'http://localhost/orgs'
result = self.get_completions('cd 1/')
self.assertEqual(result, ['events', 'members'])

View File

@ -0,0 +1,70 @@
import hashlib
import os
from .base import TempAppDirTestCase
from httpie.prompt import config
def _hash_file(path):
with open(path, 'rb') as f:
data = f.read()
return hashlib.sha1(data).hexdigest()
class TestConfig(TempAppDirTestCase):
def test_initialize(self):
# Config file doesn't exist at first
expected_path = config.get_user_config_path()
self.assertFalse(os.path.exists(expected_path))
# Config file should exist after initialization
copied, actual_path = config.initialize()
self.assertTrue(copied)
self.assertEqual(actual_path, expected_path)
self.assertTrue(os.path.exists(expected_path))
# Change config file and hash the content to see if it's changed
with open(expected_path, 'a') as f:
f.write('dont_care\n')
orig_hash = _hash_file(expected_path)
# Make sure it's fine to call config.initialize() twice
copied, actual_path = config.initialize()
self.assertFalse(copied)
self.assertEqual(actual_path, expected_path)
self.assertTrue(os.path.exists(expected_path))
# Make sure config file is unchanged
new_hash = _hash_file(expected_path)
self.assertEqual(new_hash, orig_hash)
def test_load_default(self):
cfg = config.load_default()
self.assertEqual(cfg['command_style'], 'solarized')
self.assertFalse(cfg['output_style'])
self.assertEqual(cfg['pager'], 'less')
def test_load_user(self):
copied, path = config.initialize()
self.assertTrue(copied)
with open(path, 'w') as f:
f.write("\ngreeting = 'hello!'\n")
cfg = config.load_user()
self.assertEqual(cfg, {'greeting': 'hello!'})
def test_load(self):
copied, path = config.initialize()
self.assertTrue(copied)
with open(path, 'w') as f:
f.write("pager = 'more'\n"
"greeting = 'hello!'\n")
cfg = config.load()
self.assertEqual(cfg['command_style'], 'solarized')
self.assertFalse(cfg['output_style'])
self.assertEqual(cfg['pager'], 'more')
self.assertEqual(cfg['greeting'], 'hello!')

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from .base import TempAppDirTestCase
from httpie.prompt.context import Context
from httpie.prompt.contextio import save_context, load_context
class TestContextIO(TempAppDirTestCase):
def test_save_and_load_context_non_ascii(self):
c = Context('http://localhost')
c.headers.update({
'User-Agent': 'Ö',
'Authorization': '中文'
})
save_context(c)
c = Context('http://0.0.0.0')
load_context(c)
self.assertEqual(c.url, 'http://localhost')
self.assertEqual(c.headers, {
'User-Agent': 'Ö',
'Authorization': '中文'
})

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,32 @@
"""Test if http-prompt is installed correctly."""
import subprocess
import pytest
from subprocess import PIPE
from .utils import get_http_prompt_path
from httpie.prompt import __version__
def run_http_prompt(args):
"""Run http-prompt from terminal."""
bin_path = get_http_prompt_path()
p = subprocess.Popen([bin_path] + args, stdin=PIPE, stdout=PIPE)
return p.communicate()
@pytest.mark.slow
def test_help():
out, err = run_http_prompt(['--help'])
assert out.startswith(b'Usage: http-prompt')
@pytest.mark.slow
def test_version():
out, err = run_http_prompt(['--version'])
version = __version__
if hasattr(version, 'encode'):
version = version.encode('ascii')
assert out.rstrip() == version

View File

@ -0,0 +1,79 @@
import os
import sys
import pexpect
import pytest
from .base import TempAppDirTestCase
from .utils import get_http_prompt_path
from httpie.prompt import config
class TestInteraction(TempAppDirTestCase):
def setUp(self):
super(TestInteraction, self).setUp()
# Use temporary directory as user config home.
# Will restore it in tearDown().
self.orig_config_home = os.getenv('XDG_CONFIG_HOME')
os.environ['XDG_CONFIG_HOME'] = self.temp_dir
# Make sure pexpect uses the same terminal environment
self.orig_term = os.getenv('TERM')
os.environ['TERM'] = 'screen-256color'
def tearDown(self):
super(TestInteraction, self).tearDown()
os.environ['XDG_CONFIG_HOME'] = self.orig_config_home
if self.orig_term:
os.environ['TERM'] = self.orig_term
else:
os.environ.pop('TERM', None)
def write_config(self, content):
config_path = config.get_user_config_path()
with open(config_path, 'a') as f:
f.write(content)
@pytest.mark.skipif(sys.platform == 'win32',
reason="pexpect doesn't work well on Windows")
@pytest.mark.slow
def test_interaction(self):
bin_path = get_http_prompt_path()
child = pexpect.spawn(bin_path, env=os.environ)
# TODO: Test more interaction
child.sendline('exit')
child.expect_exact('Goodbye!', timeout=20)
child.close()
@pytest.mark.skipif(sys.platform == 'win32',
reason="pexpect doesn't work well on Windows")
@pytest.mark.slow
def test_vi_mode(self):
self.write_config('vi = True\n')
bin_path = get_http_prompt_path()
child = pexpect.spawn(bin_path, env=os.environ)
child.expect_exact('http://localhost:8000>')
# Enter 'htpie', switch to command mode (ESC),
# move two chars left (hh), and insert (i) a 't'
child.send('htpie')
child.send('\x1b')
child.sendline('hhit')
child.expect_exact('http http://localhost:8000')
# Enter 'exit'
child.send('\x1b')
child.send('i')
child.sendline('exit')
child.expect_exact('Goodbye!', timeout=20)
child.close()

793
tests/prompt/test_lexer.py Normal file
View File

@ -0,0 +1,793 @@
import unittest
from pygments.token import Keyword, String, Text, Error, Name, Operator
from httpie.prompt.lexer import HttpPromptLexer
class LexerTestCase(unittest.TestCase):
def setUp(self):
self.lexer = HttpPromptLexer()
def get_tokens(self, text, filter_spaces=True):
tokens = self.lexer.get_tokens(text)
tokens = filter(lambda t: t[1], tokens)
if filter_spaces:
tokens = filter(lambda t: t[1].strip(), tokens)
return list(tokens)
class TestLexer_mutation(LexerTestCase):
def test_querystring(self):
self.assertEqual(self.get_tokens('foo==bar'), [
(Name, 'foo'),
(Operator, '=='),
(String, 'bar')
])
def test_body_param(self):
self.assertEqual(self.get_tokens('foo=bar'), [
(Name, 'foo'),
(Operator, '='),
(String, 'bar')
])
def test_header(self):
self.assertEqual(self.get_tokens('Accept:application/json'), [
(Name, 'Accept'),
(Operator, ':'),
(String, 'application/json')
])
def test_json_integer(self):
self.assertEqual(self.get_tokens('number:=1'), [
(Name, 'number'),
(Operator, ':='),
(String, '1')
])
def test_json_boolean(self):
self.assertEqual(self.get_tokens('enabled:=true'), [
(Name, 'enabled'),
(Operator, ':='),
(String, 'true')
])
def test_json_string(self):
self.assertEqual(self.get_tokens('name:="foo bar"'), [
(Name, 'name'),
(Operator, ':='),
(Text, '"'),
(String, 'foo bar'),
(Text, '"')
])
def test_json_array(self):
self.assertEqual(self.get_tokens('list:=[1,"two"]'), [
(Name, 'list'),
(Operator, ':='),
(String, '[1,"two"]'),
])
def test_json_array_quoted(self):
self.assertEqual(self.get_tokens("""list:='[1,"two"]'"""), [
(Name, 'list'),
(Operator, ':='),
(Text, "'"),
(String, '[1,"two"]'),
(Text, "'"),
])
def test_json_object(self):
self.assertEqual(self.get_tokens('object:={"id":123,"name":"foo"}'), [
(Name, 'object'),
(Operator, ':='),
(String, '{"id":123,"name":"foo"}'),
])
def test_json_object_quoted(self):
self.assertEqual(self.get_tokens("""object:='{"id": 123}'"""), [
(Name, 'object'),
(Operator, ':='),
(Text, "'"),
(String, '{"id": 123}'),
(Text, "'")
])
def test_json_escaped_colon(self):
self.assertEqual(self.get_tokens(r'where[id\:gt]:=2'), [
(Name, r'where[id\:gt]'),
(Operator, ':='),
(String, '2')
])
def test_body_param_escaped_equal(self):
self.assertEqual(self.get_tokens(r'foo\=bar=hello'), [
(Name, r'foo\=bar'),
(Operator, '='),
(String, 'hello')
])
def test_parameter_name_including_http_method_name(self):
self.assertEqual(self.get_tokens('heading==hello'), [
(Name, 'heading'),
(Operator, '=='),
(String, 'hello')
])
class TestLexer_cd(LexerTestCase):
def test_simple(self):
self.assertEqual(self.get_tokens('cd api/v1'), [
(Keyword, 'cd'),
(String, 'api/v1')
])
def test_double_quoted(self):
self.assertEqual(self.get_tokens('cd "api/v 1"'), [
(Keyword, 'cd'),
(Text, '"'),
(String, 'api/v 1'),
(Text, '"')
])
def test_single_quoted(self):
self.assertEqual(self.get_tokens("cd 'api/v 1'"), [
(Keyword, 'cd'),
(Text, "'"),
(String, 'api/v 1'),
(Text, "'")
])
def test_escape(self):
self.assertEqual(self.get_tokens(r"cd api/v\ 1"), [
(Keyword, 'cd'),
(String, r'api/v\ 1')
])
def test_second_path(self):
self.assertEqual(self.get_tokens(r"cd api v1"), [
(Keyword, 'cd'),
(String, 'api'),
(Error, 'v'),
(Error, '1')
])
def test_leading_trailing_spaces(self):
self.assertEqual(self.get_tokens(' cd api/v1 '), [
(Keyword, 'cd'),
(String, 'api/v1')
])
class TestLexer_ls(LexerTestCase):
def test_no_path(self):
self.assertEqual(self.get_tokens('ls'), [
(Keyword, 'ls')
])
def test_path(self):
self.assertEqual(self.get_tokens('ls api/v1'), [
(Keyword, 'ls'),
(String, 'api/v1')
])
def test_second_path(self):
self.assertEqual(self.get_tokens(r"ls api v1"), [
(Keyword, 'ls'),
(String, 'api'),
(Error, 'v'),
(Error, '1')
])
def test_leading_trailing_spaces(self):
self.assertEqual(self.get_tokens(' ls api/v1 '), [
(Keyword, 'ls'),
(String, 'api/v1')
])
def test_redirect(self):
self.assertEqual(self.get_tokens('ls api/v1 > endpoints.txt'), [
(Keyword, 'ls'),
(String, 'api/v1'),
(Operator, '>'),
(String, 'endpoints.txt')
])
class TestLexer_env(LexerTestCase):
def test_env_simple(self):
self.assertEqual(self.get_tokens('env'), [
(Keyword, 'env'),
])
def test_env_with_spaces(self):
self.assertEqual(self.get_tokens(' env '), [
(Keyword, 'env'),
])
def test_env_write(self):
self.assertEqual(self.get_tokens('env > /tmp/file.txt'), [
(Keyword, 'env'), (Operator, '>'),
(String, '/tmp/file.txt')
])
def test_env_append(self):
self.assertEqual(self.get_tokens('env >> /tmp/file.txt'), [
(Keyword, 'env'), (Operator, '>>'),
(String, '/tmp/file.txt')
])
def test_env_write_quoted_filename(self):
self.assertEqual(self.get_tokens('env > "/tmp/my file.txt"'), [
(Keyword, 'env'), (Operator, '>'),
(Text, '"'), (String, '/tmp/my file.txt'), (Text, '"')
])
def test_env_append_escaped_filename(self):
self.assertEqual(self.get_tokens(r'env >> /tmp/my\ file.txt'), [
(Keyword, 'env'), (Operator, '>>'),
(String, r'/tmp/my\ file.txt')
])
def test_env_pipe(self):
self.assertEqual(self.get_tokens('env | grep name'), [
(Keyword, 'env'), (Operator, '|'),
(Text, 'grep'), (Text, 'name')
])
class TestLexer_rm(LexerTestCase):
def test_header(self):
self.assertEqual(self.get_tokens('rm -h Accept'), [
(Keyword, 'rm'),
(Name, '-h'),
(String, 'Accept')
])
def test_header_escaped(self):
self.assertEqual(self.get_tokens(r'rm -h Custom\ Header'), [
(Keyword, 'rm'),
(Name, '-h'),
(String, r'Custom\ Header')
])
def test_querystring(self):
self.assertEqual(self.get_tokens('rm -q page'), [
(Keyword, 'rm'),
(Name, '-q'),
(String, 'page')
])
def test_querystring_double_quoted(self):
self.assertEqual(self.get_tokens('rm -q "page size"'), [
(Keyword, 'rm'),
(Name, '-q'),
(Text, '"'),
(String, 'page size'),
(Text, '"')
])
def test_body_param(self):
self.assertEqual(self.get_tokens('rm -b name'), [
(Keyword, 'rm'),
(Name, '-b'),
(String, 'name')
])
def test_body_param_single_quoted(self):
self.assertEqual(self.get_tokens("rm -b 'first name'"), [
(Keyword, 'rm'),
(Name, '-b'),
(Text, "'"),
(String, 'first name'),
(Text, "'")
])
def test_option(self):
self.assertEqual(self.get_tokens('rm -o --json'), [
(Keyword, 'rm'),
(Name, '-o'),
(String, '--json')
])
def test_reset(self):
self.assertEqual(self.get_tokens('rm *'), [
(Keyword, 'rm'),
(Name, '*')
])
def test_option_leading_trailing_spaces(self):
self.assertEqual(self.get_tokens(' rm -o --json '), [
(Keyword, 'rm'),
(Name, '-o'),
(String, '--json')
])
def test_invalid_type(self):
self.assertEqual(self.get_tokens('rm -a foo'), [
(Keyword, 'rm'),
(Error, '-'), (Error, 'a'),
(Error, 'f'), (Error, 'o'), (Error, 'o')
])
class TestLexer_help(LexerTestCase):
def test_help_simple(self):
self.assertEqual(self.get_tokens('help'), [
(Keyword, 'help')
])
def test_help_with_spaces(self):
self.assertEqual(self.get_tokens(' help '), [
(Keyword, 'help')
])
class TestLexer_source(LexerTestCase):
def test_source_simple_filename(self):
self.assertEqual(self.get_tokens('source file.txt'), [
(Keyword, 'source'), (String, 'file.txt')
])
def test_source_with_spaces(self):
self.assertEqual(self.get_tokens(' source file.txt '), [
(Keyword, 'source'), (String, 'file.txt')
])
def test_source_quoted_filename(self):
self.assertEqual(self.get_tokens("source '/tmp/my file.txt'"), [
(Keyword, 'source'),
(Text, "'"), (String, '/tmp/my file.txt'), (Text, "'")
])
def test_source_escaped_filename(self):
self.assertEqual(self.get_tokens(r"source /tmp/my\ file.txt"), [
(Keyword, 'source'), (String, r'/tmp/my\ file.txt')
])
class TestLexer_exec(LexerTestCase):
def test_exec_simple_filename(self):
self.assertEqual(self.get_tokens('exec file.txt'), [
(Keyword, 'exec'), (String, 'file.txt')
])
def test_exec_with_spaces(self):
self.assertEqual(self.get_tokens(' exec file.txt '), [
(Keyword, 'exec'), (String, 'file.txt')
])
def test_exec_quoted_filename(self):
self.assertEqual(self.get_tokens("exec '/tmp/my file.txt'"), [
(Keyword, 'exec'),
(Text, "'"), (String, '/tmp/my file.txt'), (Text, "'")
])
def test_exec_escaped_filename(self):
self.assertEqual(self.get_tokens(r"exec /tmp/my\ file.txt"), [
(Keyword, 'exec'), (String, r'/tmp/my\ file.txt')
])
class TestLexer_exit(LexerTestCase):
def test_exit_simple(self):
self.assertEqual(self.get_tokens('exit'), [
(Keyword, 'exit')
])
def test_exit_with_spaces(self):
self.assertEqual(self.get_tokens(' exit '), [
(Keyword, 'exit')
])
class TestLexerPreview(LexerTestCase):
def test_httpie_without_action(self):
cmd = 'httpie http://example.com name=jack'
self.assertEqual(self.get_tokens(cmd), [
(Keyword, 'httpie'),
(String, 'http://example.com'),
(Name, 'name'), (Operator, '='), (String, 'jack')
])
def test_httpie_without_action_and_url(self):
cmd = 'httpie name=jack Accept:*/*'
self.assertEqual(self.get_tokens(cmd), [
(Keyword, 'httpie'),
(Name, 'name'), (Operator, '='), (String, 'jack'),
(Name, 'Accept'), (Operator, ':'), (String, '*/*')
])
def test_httpie_absolute_url(self):
cmd = 'httpie post http://example.com name=jack'
self.assertEqual(self.get_tokens(cmd), [
(Keyword, 'httpie'), (Keyword, 'post'),
(String, 'http://example.com'),
(Name, 'name'), (Operator, '='), (String, 'jack')
])
def test_httpie_option_first(self):
self.assertEqual(self.get_tokens('httpie post --form name=jack'), [
(Keyword, 'httpie'), (Keyword, 'post'),
(Name, '--form'),
(Name, 'name'), (Operator, '='), (String, 'jack')
])
def test_httpie_body_param_first(self):
self.assertEqual(self.get_tokens('httpie post name=jack --form'), [
(Keyword, 'httpie'), (Keyword, 'post'),
(Name, 'name'), (Operator, '='), (String, 'jack'),
(Name, '--form')
])
def test_httpie_options(self):
self.assertEqual(self.get_tokens('httpie options test --body'), [
(Keyword, 'httpie'), (Keyword, 'options'),
(String, 'test'), (Name, '--body')
])
def test_httpie_relative_path(self):
tokens = self.get_tokens('httpie /api/test name==foo',
filter_spaces=False)
self.assertEqual(tokens, [
(Keyword, 'httpie'), (Text, ' '),
(String, '/api/test'), (Text, ' '),
(Name, 'name'), (Operator, '=='), (String, 'foo'),
(Text, '\n')
])
class TestShellCode(LexerTestCase):
def test_unquoted_querystring(self):
self.assertEqual(self.get_tokens('`echo name`==john'), [
(Text, '`'),
(Name.Builtin, 'echo'),
(Text, 'name'),
(Text, '`'),
(Operator, '=='),
(String, 'john')
])
self.assertEqual(self.get_tokens('name==`echo john`'), [
(Name, 'name'),
(Operator, '=='),
(Text, '`'),
(Name.Builtin, 'echo'),
(Text, 'john'),
(Text, '`')
])
def test_unquoted_bodystring(self):
self.assertEqual(self.get_tokens('`echo name`=john'), [
(Text, '`'),
(Name.Builtin, 'echo'),
(Text, 'name'),
(Text, '`'),
(Operator, '='),
(String, 'john')
])
self.assertEqual(self.get_tokens('name=`echo john`'), [
(Name, 'name'),
(Operator, '='),
(Text, '`'),
(Name.Builtin, 'echo'),
(Text, 'john'),
(Text, '`')
])
def test_header_option_value(self):
self.assertEqual(self.get_tokens('Accept:`echo "application/json"`'), [
(Name, 'Accept'),
(Operator, ':'),
(Text, '`'),
(Name.Builtin, 'echo'),
(String.Double, '"application/json"'),
(Text, '`'),
])
def test_httpie_body_param(self):
self.assertEqual(self.get_tokens('httpie post name=`echo john`'), [
(Keyword, 'httpie'),
(Keyword, 'post'),
(Name, 'name'),
(Operator, '='),
(Text, '`'),
(Name.Builtin, 'echo'),
(Text, 'john'),
(Text, '`'),
])
def test_httpie_post_pipe(self):
self.assertEqual(self.get_tokens('httpie post | tee "/tmp/test"'), [
(Keyword, 'httpie'),
(Keyword, 'post'),
(Operator, '|'),
(Text, 'tee'),
(String.Double, '"/tmp/test"'),
])
def test_post_pipe(self):
self.assertEqual(self.get_tokens('post | tee "/tmp/test"'), [
(Keyword, 'post'),
(Operator, '|'),
(Text, 'tee'),
(String.Double, '"/tmp/test"'),
])
class TestLexerPreviewRedirection(LexerTestCase):
def test_httpie_write(self):
self.assertEqual(self.get_tokens('httpie > file.txt'), [
(Keyword, 'httpie'),
(Operator, '>'), (String, 'file.txt')
])
def test_httpie_write_without_spaces(self):
self.assertEqual(self.get_tokens('httpie>file.txt'), [
(Keyword, 'httpie'),
(Operator, '>'), (String, 'file.txt')
])
def test_httpie_append(self):
self.assertEqual(self.get_tokens('httpie >> file.txt'), [
(Keyword, 'httpie'),
(Operator, '>>'), (String, 'file.txt')
])
def test_httpie_append_without_spaces(self):
self.assertEqual(self.get_tokens('httpie>>file.txt'), [
(Keyword, 'httpie'),
(Operator, '>>'), (String, 'file.txt')
])
def test_httpie_write_with_post_param(self):
self.assertEqual(self.get_tokens('httpie post name=jack > file.txt'), [
(Keyword, 'httpie'), (Keyword, 'post'),
(Name, 'name'), (Operator, '='), (String, 'jack'),
(Operator, '>'), (String, 'file.txt')
])
def test_httpie_append_with_post_param(self):
self.assertEqual(self.get_tokens('httpie post name=doe >> file.txt'), [
(Keyword, 'httpie'), (Keyword, 'post'),
(Name, 'name'), (Operator, '='), (String, 'doe'),
(Operator, '>>'), (String, 'file.txt')
])
def test_httpie_write_quoted_filename(self):
self.assertEqual(self.get_tokens("httpie > 'my file.txt'"), [
(Keyword, 'httpie'), (Operator, '>'),
(Text, "'"), (String, 'my file.txt'), (Text, "'")
])
def test_httpie_append_quoted_filename(self):
self.assertEqual(self.get_tokens('httpie >> "my file.txt"'), [
(Keyword, 'httpie'), (Operator, '>>'),
(Text, '"'), (String, 'my file.txt'), (Text, '"')
])
def test_httpie_append_with_many_params(self):
command = ("httpie post --auth user:pass --verify=no "
"name='john doe' page==2 >> file.txt")
self.assertEqual(self.get_tokens(command), [
(Keyword, 'httpie'), (Keyword, 'post'),
(Name, '--auth'), (String, 'user:pass'),
(Name, '--verify'), (Operator, '='), (String, 'no'),
(Name, 'name'), (Operator, '='),
(Text, "'"), (String, 'john doe'), (Text, "'"),
(Name, 'page'), (Operator, '=='), (String, '2'),
(Operator, '>>'), (String, 'file.txt')
])
def test_curl_write(self):
self.assertEqual(self.get_tokens('curl > file.txt'), [
(Keyword, 'curl'),
(Operator, '>'), (String, 'file.txt')
])
def test_curl_write_without_spaces(self):
self.assertEqual(self.get_tokens('curl>file.txt'), [
(Keyword, 'curl'),
(Operator, '>'), (String, 'file.txt')
])
def test_curl_append(self):
self.assertEqual(self.get_tokens('curl >> file.txt'), [
(Keyword, 'curl'),
(Operator, '>>'), (String, 'file.txt')
])
def test_curl_append_without_spaces(self):
self.assertEqual(self.get_tokens('curl>>file.txt'), [
(Keyword, 'curl'),
(Operator, '>>'), (String, 'file.txt')
])
def test_curl_write_with_post_param(self):
self.assertEqual(self.get_tokens('curl post name=jack > file.txt'), [
(Keyword, 'curl'), (Keyword, 'post'),
(Name, 'name'), (Operator, '='), (String, 'jack'),
(Operator, '>'), (String, 'file.txt')
])
def test_curl_append_with_post_param(self):
self.assertEqual(self.get_tokens('curl post name=doe >> file.txt'), [
(Keyword, 'curl'), (Keyword, 'post'),
(Name, 'name'), (Operator, '='), (String, 'doe'),
(Operator, '>>'), (String, 'file.txt')
])
def test_curl_write_quoted_filename(self):
self.assertEqual(self.get_tokens("curl > 'my file.txt'"), [
(Keyword, 'curl'), (Operator, '>'),
(Text, "'"), (String, 'my file.txt'), (Text, "'")
])
def test_curl_append_quoted_filename(self):
self.assertEqual(self.get_tokens('curl >> "my file.txt"'), [
(Keyword, 'curl'), (Operator, '>>'),
(Text, '"'), (String, 'my file.txt'), (Text, '"')
])
def test_curl_append_with_many_params(self):
command = ("curl post --auth user:pass --verify=no "
"name='john doe' page==2 >> file.txt")
self.assertEqual(self.get_tokens(command), [
(Keyword, 'curl'), (Keyword, 'post'),
(Name, '--auth'), (String, 'user:pass'),
(Name, '--verify'), (Operator, '='), (String, 'no'),
(Name, 'name'), (Operator, '='),
(Text, "'"), (String, 'john doe'), (Text, "'"),
(Name, 'page'), (Operator, '=='), (String, '2'),
(Operator, '>>'), (String, 'file.txt')
])
class TestLexerAction(LexerTestCase):
def test_get(self):
self.assertEqual(self.get_tokens('get'), [
(Keyword, 'get')
])
def test_post_with_spaces(self):
self.assertEqual(self.get_tokens(' post '), [
(Keyword, 'post')
])
def test_capital_head(self):
self.assertEqual(self.get_tokens('HEAD'), [
(Keyword, 'HEAD')
])
def test_delete_random_capitals(self):
self.assertEqual(self.get_tokens('dElETe'), [
(Keyword, 'dElETe')
])
def test_patch(self):
self.assertEqual(self.get_tokens('patch'), [
(Keyword, 'patch')
])
def test_get_with_querystring_params(self):
command = 'get page==10 id==200'
self.assertEqual(self.get_tokens(command), [
(Keyword, 'get'),
(Name, 'page'), (Operator, '=='), (String, '10'),
(Name, 'id'), (Operator, '=='), (String, '200')
])
def test_capital_get_with_querystring_params(self):
command = 'GET page==10 id==200'
self.assertEqual(self.get_tokens(command), [
(Keyword, 'GET'),
(Name, 'page'), (Operator, '=='), (String, '10'),
(Name, 'id'), (Operator, '=='), (String, '200')
])
def test_post_with_body_params(self):
command = 'post name="john doe" username=john'
self.assertEqual(self.get_tokens(command), [
(Keyword, 'post'), (Name, 'name'), (Operator, '='),
(Text, '"'), (String, 'john doe'), (Text, '"'),
(Name, 'username'), (Operator, '='), (String, 'john')
])
def test_post_with_spaces_and_body_params(self):
command = ' post name="john doe" username=john '
self.assertEqual(self.get_tokens(command), [
(Keyword, 'post'), (Name, 'name'), (Operator, '='),
(Text, '"'), (String, 'john doe'), (Text, '"'),
(Name, 'username'), (Operator, '='), (String, 'john')
])
def test_options(self):
self.assertEqual(self.get_tokens('options'), [
(Keyword, 'options')
])
def test_post_relative_path(self):
tokens = self.get_tokens('post /api/test name=foo',
filter_spaces=False)
self.assertEqual(tokens, [
(Keyword, 'post'), (Text, ' '),
(String, '/api/test'), (Text, ' '),
(Name, 'name'), (Operator, '='), (String, 'foo'),
(Text, '\n')
])
class TestLexerActionRedirection(LexerTestCase):
def test_get_write(self):
self.assertEqual(self.get_tokens('get > file.txt'), [
(Keyword, 'get'), (Operator, '>'), (String, 'file.txt')
])
def test_get_write_quoted_filename(self):
self.assertEqual(self.get_tokens('get > "/tmp/my file.txt"'), [
(Keyword, 'get'), (Operator, '>'),
(Text, '"'), (String, '/tmp/my file.txt'), (Text, '"')
])
def test_get_append(self):
self.assertEqual(self.get_tokens('get >> file.txt'), [
(Keyword, 'get'), (Operator, '>>'), (String, 'file.txt')
])
def test_get_append_escaped_filename(self):
self.assertEqual(self.get_tokens(r'get >> /tmp/my\ file.txt'), [
(Keyword, 'get'), (Operator, '>>'),
(String, r'/tmp/my\ file.txt')
])
def test_post_append_with_spaces(self):
self.assertEqual(self.get_tokens(' post >> file.txt'), [
(Keyword, 'post'), (Operator, '>>'), (String, 'file.txt')
])
def test_capital_head_write(self):
self.assertEqual(self.get_tokens('HEAD > file.txt'), [
(Keyword, 'HEAD'), (Operator, '>'), (String, 'file.txt')
])
def test_get_append_with_querystring_params(self):
command = 'get page==10 id==200 >> /tmp/file.txt'
self.assertEqual(self.get_tokens(command), [
(Keyword, 'get'),
(Name, 'page'), (Operator, '=='), (String, '10'),
(Name, 'id'), (Operator, '=='), (String, '200'),
(Operator, '>>'), (String, '/tmp/file.txt')
])
def test_post_write_escaped_filename_with_body_params(self):
command = r'post name="john doe" username=john > /tmp/my\ file.txt'
self.assertEqual(self.get_tokens(command), [
(Keyword, 'post'), (Name, 'name'), (Operator, '='),
(Text, '"'), (String, 'john doe'), (Text, '"'),
(Name, 'username'), (Operator, '='), (String, 'john'),
(Operator, '>'), (String, r'/tmp/my\ file.txt')
])
def test_post_append_with_spaces_and_body_params(self):
command = ' post name="john doe" username=john >> /tmp/file.txt '
self.assertEqual(self.get_tokens(command), [
(Keyword, 'post'), (Name, 'name'), (Operator, '='),
(Text, '"'), (String, 'john doe'), (Text, '"'),
(Name, 'username'), (Operator, '='), (String, 'john'),
(Operator, '>>'), (String, '/tmp/file.txt')
])

131
tests/prompt/test_tree.py Normal file
View File

@ -0,0 +1,131 @@
import unittest
from httpie.prompt.tree import Node
class TestNode(unittest.TestCase):
def setUp(self):
# Make a tree like this:
# root
# a h
# b d i n
# c f e g k o
# l m p
self.root = Node('root')
self.root.add_path('a', 'b', 'c')
self.root.add_path('a', 'b', 'f')
self.root.add_path('a', 'd', 'e')
self.root.add_path('a', 'd', 'g')
self.root.add_path('h', 'i', 'k', 'l')
self.root.add_path('h', 'i', 'k', 'm')
self.root.add_path('h', 'i', 'k', 'p')
self.root.add_path('h', 'n', 'o')
def test_illegal_name(self):
self.assertRaises(ValueError, Node, '.')
self.assertRaises(ValueError, Node, '..')
def test_str(self):
node = Node('my node')
self.assertEqual(str(node), 'my node')
def test_cmp_same_type(self):
a = Node('a', data={'type': 'dir'})
b = Node('b', data={'type': 'dir'})
self.assertTrue(a < b)
def test_cmp_different_type(self):
a = Node('a', data={'type': 'file'})
b = Node('b', data={'type': 'dir'})
self.assertTrue(b < a)
def test_eq(self):
a = Node('a', data={'type': 'file'})
b = Node('b', data={'type': 'dir'})
self.assertNotEqual(a, b)
a = Node('a', data={'type': 'file'})
b = Node('a', data={'type': 'file'})
self.assertEqual(a, b)
def test_add_path_and_find_child(self):
# Level 1 (root)
self.assertEqual(set(c.name for c in self.root.children), set('ah'))
# Level 2
node_a = self.root.find_child('a')
node_h = self.root.find_child('h')
self.assertEqual(set(c.name for c in node_a.children), set('bd'))
self.assertEqual(set(c.name for c in node_h.children), set('in'))
# Level 3
node_b = node_a.find_child('b')
node_i = node_h.find_child('i')
self.assertEqual(set(c.name for c in node_b.children), set('cf'))
self.assertEqual(set(c.name for c in node_i.children), set('k'))
# Level 4
node_c = node_b.find_child('c')
node_k = node_i.find_child('k')
self.assertEqual(set(c.name for c in node_c.children), set())
self.assertEqual(set(c.name for c in node_k.children), set('lmp'))
# Return None if child can't be found
self.assertFalse(node_c.find_child('x'))
def test_find_child_wildcard(self):
root = Node('root')
root.add_path('a')
root.add_path('{b}')
root.add_path('c')
self.assertEqual(root.find_child('a').name, 'a')
self.assertEqual(root.find_child('c').name, 'c')
self.assertEqual(root.find_child('x').name, '{b}')
self.assertFalse(root.find_child('x', wildcard=False))
def test_ls(self):
self.assertEqual([n.name for n in self.root.ls('a')], list('bd'))
self.assertEqual([n.name for n in self.root.ls('a', 'b')], list('cf'))
self.assertEqual([n.name for n in self.root.ls('a', 'b', 'c')], [])
self.assertEqual([n.name for n in self.root.ls('h', 'i', 'k')],
list('lmp'))
def test_ls_root(self):
self.assertEqual([n.name for n in self.root.ls()], list('ah'))
def test_ls_non_existing(self):
self.assertEqual([n.name for n in self.root.ls('x')], [])
self.assertEqual([n.name for n in self.root.ls('a', 'b', 'x')], [])
def test_ls_parent(self):
self.assertEqual([n.name for n in self.root.ls('..')], list('ah'))
self.assertEqual([n.name for n in self.root.ls('..', '..', '..')],
list('ah'))
self.assertEqual([n.name for n in self.root.ls('..', '..', 'h')],
list('in'))
self.assertEqual(
[n.name for n in self.root.ls('..', '..', 'h', '..', 'a')],
list('bd'))
def test_ls_dot(self):
self.assertEqual([n.name for n in self.root.ls('.')], list('ah'))
self.assertEqual([n.name for n in self.root.ls('.', '.', '.')],
list('ah'))
self.assertEqual([n.name for n in self.root.ls('.', 'a', 'b')],
list('cf'))
self.assertEqual([n.name for n in self.root.ls('.', 'h', '.')],
list('in'))
self.assertEqual(
[n.name for n in self.root.ls('.', 'h', '.', '.', 'n')], ['o'])
def test_ls_sort_by_types(self):
self.root.add_path('q', 'r')
self.root.add_path('q', 's', node_type='file')
self.root.add_path('q', 't', node_type='file')
self.root.add_path('q', 'u')
self.root.add_path('q', 'v', node_type='file')
self.assertEqual([n.name for n in self.root.ls('q')],
list('rustv'))

View File

@ -0,0 +1,92 @@
from httpie.prompt import utils
def test_colformat_zero_items():
assert list(utils.colformat([], terminal_width=80)) == []
def test_colformat_one_item():
assert list(utils.colformat(['hello'], terminal_width=80)) == ['hello']
def test_colformat_single_line():
items = ['hello', 'world', 'foo', 'bar']
assert list(utils.colformat(items, terminal_width=80)) == [
'hello world foo bar'
]
def test_colformat_single_column():
items = ['chap1.txt', 'chap2.txt', 'chap3.txt', 'chap4.txt',
'chap5.txt', 'chap6.txt', 'chap7.txt', 'chap8.txt']
assert list(utils.colformat(items, terminal_width=10)) == [
'chap1.txt', 'chap2.txt', 'chap3.txt', 'chap4.txt',
'chap5.txt', 'chap6.txt', 'chap7.txt', 'chap8.txt'
]
def test_colformat_multi_columns_no_remainder():
items = ['chap1.txt', 'chap2.txt', 'chap3.txt', 'chap4.txt',
'chap5.txt', 'chap6.txt', 'chap7.txt', 'chap8.txt',
'chap9.txt', 'chap10.txt', 'chap11.txt', 'chap12.txt']
assert list(utils.colformat(items, terminal_width=50)) == [
'chap1.txt chap4.txt chap7.txt chap10.txt',
'chap2.txt chap5.txt chap8.txt chap11.txt',
'chap3.txt chap6.txt chap9.txt chap12.txt'
]
def test_colformat_multi_columns_remainder_1():
items = ['chap1.txt', 'chap2.txt', 'chap3.txt', 'chap4.txt',
'chap5.txt', 'chap6.txt', 'chap7.txt', 'chap8.txt',
'chap9.txt', 'chap10.txt', 'chap11.txt', 'chap12.txt',
'chap13.txt']
assert list(utils.colformat(items, terminal_width=50)) == [
'chap1.txt chap5.txt chap9.txt chap13.txt',
'chap2.txt chap6.txt chap10.txt',
'chap3.txt chap7.txt chap11.txt',
'chap4.txt chap8.txt chap12.txt'
]
def test_colformat_multi_columns_remainder_2():
items = ['chap1.txt', 'chap2.txt', 'chap3.txt', 'chap4.txt',
'chap5.txt', 'chap6.txt', 'chap7.txt', 'chap8.txt',
'chap9.txt', 'chap10.txt', 'chap11.txt', 'chap12.txt',
'chap13.txt', 'chap14.txt']
assert list(utils.colformat(items, terminal_width=50)) == [
'chap1.txt chap5.txt chap9.txt chap13.txt',
'chap2.txt chap6.txt chap10.txt chap14.txt',
'chap3.txt chap7.txt chap11.txt',
'chap4.txt chap8.txt chap12.txt'
]
def test_colformat_wider_than_terminal():
items = ['a very long long name', '1111 2222 3333 4444 5555']
assert list(utils.colformat(items, terminal_width=10)) == [
'a very long long name',
'1111 2222 3333 4444 5555'
]
def test_colformat_long_short_mixed():
items = ['a', '1122334455667788', 'hello world', 'foo bar',
'b', '8877665544332211', 'abcd', 'yeah']
assert list(utils.colformat(items, terminal_width=50)) == [
'a foo bar abcd',
'1122334455667788 b yeah',
'hello world 8877665544332211'
]
def test_colformat_github_top_endpoints():
items = ['emojis', 'events', 'feeds', 'gists', 'gitignore', 'issues',
'legacy', 'markdown', 'meta', 'networks', 'notifications',
'orgs', 'rate_limit', 'repos', 'repositories', 'search',
'teams', 'user', 'users']
assert list(utils.colformat(items, terminal_width=136)) == [
'emojis gists legacy networks rate_limit'' search users', # noqa
'events gitignore markdown notifications repos teams', # noqa
'feeds issues meta orgs repositories user' # noqa
]

59
tests/prompt/test_xdg.py Normal file
View File

@ -0,0 +1,59 @@
import os
import stat
import sys
from .base import TempAppDirTestCase
from httpie.prompt import xdg
class TestXDG(TempAppDirTestCase):
def test_get_app_data_home(self):
path = xdg.get_data_dir()
expected_path = os.path.join(os.environ[self.homes['data']],
'http-prompt')
self.assertEqual(path, expected_path)
self.assertTrue(os.path.exists(path))
if sys.platform != 'win32':
# Make sure permission for the directory is 700
mask = stat.S_IMODE(os.stat(path).st_mode)
self.assertTrue(mask & stat.S_IRWXU)
self.assertFalse(mask & stat.S_IRWXG)
self.assertFalse(mask & stat.S_IRWXO)
def test_get_app_config_home(self):
path = xdg.get_config_dir()
expected_path = os.path.join(os.environ[self.homes['config']],
'http-prompt')
self.assertEqual(path, expected_path)
self.assertTrue(os.path.exists(path))
if sys.platform != 'win32':
# Make sure permission for the directory is 700
mask = stat.S_IMODE(os.stat(path).st_mode)
self.assertTrue(mask & stat.S_IRWXU)
self.assertFalse(mask & stat.S_IRWXG)
self.assertFalse(mask & stat.S_IRWXO)
def test_get_resource_data_dir(self):
path = xdg.get_data_dir('something')
expected_path = os.path.join(
os.environ[self.homes['data']], 'http-prompt', 'something')
self.assertEqual(path, expected_path)
self.assertTrue(os.path.exists(path))
# Make sure we can write a file to the directory
with open(os.path.join(path, 'test'), 'wb') as f:
f.write(b'hello')
def test_get_resource_config_dir(self):
path = xdg.get_config_dir('something')
expected_path = os.path.join(
os.environ[self.homes['config']], 'http-prompt', 'something')
self.assertEqual(path, expected_path)
self.assertTrue(os.path.exists(path))
# Make sure we can write a file to the directory
with open(os.path.join(path, 'test'), 'wb') as f:
f.write(b'hello')

22
tests/prompt/utils.py Normal file
View File

@ -0,0 +1,22 @@
import os
import sys
def get_http_prompt_path():
"""Get the path to http-prompt executable."""
python_dir = os.path.dirname(sys.executable)
bin_name = 'http-prompt'
if sys.platform == 'win32':
bin_name += '.exe'
paths = [
os.path.join(python_dir, bin_name),
os.path.join(python_dir, 'Scripts', bin_name), # Windows
'/usr/bin/http-prompt' # Homebrew installation
]
for path in paths:
if os.path.exists(path):
return path
raise OSError("could not locate http-prompt executable, "
"Python directory: %s" % python_dir)