forked from extern/httpie-cli
1632 lines
55 KiB
Python
1632 lines
55 KiB
Python
# -*- coding: utf-8 -*-
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import shutil
|
|
import os
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
from collections import namedtuple
|
|
|
|
from unittest.mock import patch
|
|
|
|
from httpie.prompt.context import Context
|
|
from httpie.prompt.executionimport execute, HTTPIE_PROGRAM_NAME
|
|
|
|
from .base import TempAppDirTestCase
|
|
|
|
|
|
class ExecutionTestCase(TempAppDirTestCase):
|
|
|
|
def setUp(self):
|
|
super(ExecutionTestCase, self).setUp()
|
|
self.patchers = [
|
|
('httpie_main', patch('http_prompt.execution.httpie_main')),
|
|
('echo_via_pager',
|
|
patch('http_prompt.output.click.echo_via_pager')),
|
|
('secho', patch('http_prompt.execution.click.secho')),
|
|
('get_terminal_size', patch('http_prompt.utils.get_terminal_size'))
|
|
]
|
|
for attr_name, patcher in self.patchers:
|
|
setattr(self, attr_name, patcher.start())
|
|
|
|
self.context = Context('http://localhost', spec={
|
|
'paths': {
|
|
'/users': {},
|
|
'/users/{username}': {},
|
|
'/users/{username}/events': {},
|
|
'/users/{username}/orgs': {},
|
|
'/orgs': {},
|
|
'/orgs/{org}': {},
|
|
'/orgs/{org}/events': {},
|
|
'/orgs/{org}/members': {}
|
|
}
|
|
})
|
|
|
|
# pytest mocks to capture stdout so we can't really get_terminal_size()
|
|
Size = namedtuple('Size', ['columns', 'rows'])
|
|
self.get_terminal_size.return_value = Size(80, 30)
|
|
|
|
def tearDown(self):
|
|
super(ExecutionTestCase, self).tearDown()
|
|
for _, patcher in self.patchers:
|
|
patcher.stop()
|
|
|
|
def assert_httpie_main_called_with(self, args):
|
|
self.assertEqual(self.httpie_main.call_args[0][0], [
|
|
HTTPIE_PROGRAM_NAME, *args])
|
|
|
|
def assert_stdout(self, expected_msg):
|
|
# Append '\n' to simulate behavior of click.echo_via_pager(),
|
|
# which we use whenever we want to output anything to stdout
|
|
printed_msg = self.echo_via_pager.call_args[0][0] + '\n'
|
|
self.assertEqual(printed_msg, expected_msg)
|
|
|
|
def assert_stdout_startswith(self, expected_prefix):
|
|
printed_msg = self.echo_via_pager.call_args[0][0]
|
|
self.assertTrue(printed_msg.startswith(expected_prefix))
|
|
|
|
def get_stdout(self):
|
|
return self.echo_via_pager.call_args[0][0]
|
|
|
|
def assert_stderr(self, expected_msg):
|
|
printed_msg = self.secho.call_args[0][0]
|
|
print_options = self.secho.call_args[1]
|
|
self.assertEqual(printed_msg, expected_msg)
|
|
self.assertEqual(print_options, {'err': True, 'fg': 'red'})
|
|
|
|
|
|
class TestExecution_noop(ExecutionTestCase):
|
|
|
|
def test_empty_string(self):
|
|
execute('', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.options)
|
|
self.assertFalse(self.context.headers)
|
|
self.assertFalse(self.context.querystring_params)
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertFalse(self.context.should_exit)
|
|
|
|
def test_spaces(self):
|
|
execute(' \t \t ', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.options)
|
|
self.assertFalse(self.context.headers)
|
|
self.assertFalse(self.context.querystring_params)
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertFalse(self.context.should_exit)
|
|
|
|
|
|
class TestExecution_env(ExecutionTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestExecution_env, self).setUp()
|
|
|
|
self.context.url = 'http://localhost:8000/api'
|
|
self.context.headers.update({
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 1234'
|
|
})
|
|
self.context.querystring_params.update({
|
|
'page': ['1'],
|
|
'limit': ['50']
|
|
})
|
|
self.context.body_params.update({
|
|
'name': 'John Doe'
|
|
})
|
|
self.context.options.update({
|
|
'--verify': 'no',
|
|
'--form': None
|
|
})
|
|
|
|
def test_env(self):
|
|
execute('env', self.context)
|
|
self.assert_stdout("--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_with_spaces(self):
|
|
execute(' env ', self.context)
|
|
self.assert_stdout("--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_non_ascii(self):
|
|
self.context.body_params['name'] = '許 功蓋'
|
|
execute('env', self.context)
|
|
self.assert_stdout("--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=許 功蓋'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_write_to_file(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('env > %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
self.assertEqual(content,
|
|
"--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_write_to_file_with_env_vars(self):
|
|
filename = self.make_tempfile('hello world\n', 'testenvvar')
|
|
filename_with_var = filename.replace("testenvvar", "${MYPRIVATEVAR}")
|
|
|
|
os.environ['MYPRIVATEVAR'] = 'testenvvar'
|
|
execute('env > %s' % filename_with_var, self.context)
|
|
os.environ['MYPRIVATEVAR'] = ''
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
self.assertEqual(content,
|
|
"--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_non_ascii_and_write_to_file(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
self.context.body_params['name'] = '許 功蓋'
|
|
execute('env > %s' % filename, self.context)
|
|
|
|
with open(filename, encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
self.assertEqual(content,
|
|
"--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=許 功蓋'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_write_to_quoted_filename(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute("env > '%s'" % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
self.assertEqual(content,
|
|
"--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
def test_env_append_to_file(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's an append
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('env >> %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
self.assertEqual(content,
|
|
"hello world\n"
|
|
"--form\n--verify=no\n"
|
|
"cd http://localhost:8000/api\n"
|
|
"limit==50\npage==1\n"
|
|
"'name=John Doe'\n"
|
|
"Accept:text/csv\n"
|
|
"'Authorization:ApiKey 1234'\n")
|
|
|
|
|
|
class TestExecution_source_and_exec(ExecutionTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestExecution_source_and_exec, self).setUp()
|
|
|
|
self.context.url = 'http://localhost:8000/api'
|
|
self.context.headers.update({
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 1234'
|
|
})
|
|
self.context.querystring_params.update({
|
|
'page': ['1'],
|
|
'limit': ['50']
|
|
})
|
|
self.context.body_params.update({
|
|
'name': 'John Doe'
|
|
})
|
|
self.context.options.update({
|
|
'--verify': 'no',
|
|
'--form': None
|
|
})
|
|
|
|
# The file that is about to be sourced/exec'd
|
|
self.filename = self.make_tempfile(
|
|
"Language:en Authorization:'ApiKey 5678'\n"
|
|
"name='Jane Doe' username=jane limit==25\n"
|
|
"rm -o --form\n"
|
|
"cd v2/user\n")
|
|
|
|
def test_source(self):
|
|
execute('source %s' % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
self.assertEqual(self.context.options, {
|
|
'--verify': 'no'
|
|
})
|
|
|
|
def test_source_with_spaces(self):
|
|
execute(' source %s ' % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
self.assertEqual(self.context.options, {
|
|
'--verify': 'no'
|
|
})
|
|
|
|
def test_source_non_existing_file(self):
|
|
c = self.context.copy()
|
|
execute('source no_such_file.txt', self.context)
|
|
self.assertEqual(self.context, c)
|
|
|
|
# Expect the error message would be the same as when we open the
|
|
# non-existing file
|
|
try:
|
|
with open('no_such_file.txt'):
|
|
pass
|
|
except OSError as err:
|
|
err_msg = str(err)
|
|
else:
|
|
assert False, 'what?! no_such_file.txt exists!'
|
|
|
|
self.assert_stderr(err_msg)
|
|
|
|
def test_source_quoted_filename(self):
|
|
execute('source "%s"' % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
self.assertEqual(self.context.options, {
|
|
'--verify': 'no'
|
|
})
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32',
|
|
reason="Windows doesn't use backslashes to escape")
|
|
def test_source_escaped_filename(self):
|
|
new_filename = self.filename + r' copy'
|
|
shutil.copyfile(self.filename, new_filename)
|
|
|
|
new_filename = new_filename.replace(' ', r'\ ')
|
|
|
|
execute('source %s' % new_filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
self.assertEqual(self.context.options, {
|
|
'--verify': 'no'
|
|
})
|
|
|
|
def test_exec(self):
|
|
execute('exec %s' % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
|
|
def test_exec_with_spaces(self):
|
|
execute(' exec %s ' % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
|
|
def test_exec_non_existing_file(self):
|
|
c = self.context.copy()
|
|
execute('exec no_such_file.txt', self.context)
|
|
self.assertEqual(self.context, c)
|
|
|
|
# Try to get the error message when opening a non-existing file
|
|
try:
|
|
with open('no_such_file.txt'):
|
|
pass
|
|
except OSError as err:
|
|
err_msg = str(err)
|
|
else:
|
|
assert False, 'what?! no_such_file.txt exists!'
|
|
|
|
self.assert_stderr(err_msg)
|
|
|
|
def test_exec_quoted_filename(self):
|
|
execute("exec '%s'" % self.filename, self.context)
|
|
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32',
|
|
reason="Windows doesn't use backslashes to escape")
|
|
def test_exec_escaped_filename(self):
|
|
new_filename = self.filename + r' copy'
|
|
shutil.copyfile(self.filename, new_filename)
|
|
|
|
new_filename = new_filename.replace(' ', r'\ ')
|
|
|
|
execute('exec %s' % new_filename, self.context)
|
|
self.assertEqual(self.context.url,
|
|
'http://localhost:8000/api/v2/user')
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'ApiKey 5678',
|
|
'Language': 'en'
|
|
})
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'limit': ['25']
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jane Doe',
|
|
'username': 'jane'
|
|
})
|
|
|
|
|
|
class TestExecution_env_and_source(ExecutionTestCase):
|
|
|
|
def test_env_and_source(self):
|
|
c = Context()
|
|
c.url = 'http://localhost:8000/api'
|
|
c.headers.update({
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 1234'
|
|
})
|
|
c.querystring_params.update({
|
|
'page': ['1'],
|
|
'limit': ['50']
|
|
})
|
|
c.body_params.update({
|
|
'name': 'John Doe'
|
|
})
|
|
c.options.update({
|
|
'--verify': 'no',
|
|
'--form': None
|
|
})
|
|
|
|
c2 = c.copy()
|
|
|
|
filename = self.make_tempfile()
|
|
execute('env > %s' % filename, c)
|
|
execute('rm *', c)
|
|
|
|
self.assertFalse(c.headers)
|
|
self.assertFalse(c.querystring_params)
|
|
self.assertFalse(c.body_params)
|
|
self.assertFalse(c.options)
|
|
|
|
execute('source %s' % filename, c)
|
|
|
|
self.assertEqual(c, c2)
|
|
|
|
def test_env_and_source_non_ascii(self):
|
|
c = Context()
|
|
c.url = 'http://localhost:8000/api'
|
|
c.headers.update({
|
|
'Accept': 'text/csv',
|
|
'Authorization': 'ApiKey 1234'
|
|
})
|
|
c.querystring_params.update({
|
|
'page': ['1'],
|
|
'limit': ['50']
|
|
})
|
|
c.body_params.update({
|
|
'name': '許 功蓋'
|
|
})
|
|
c.options.update({
|
|
'--verify': 'no',
|
|
'--form': None
|
|
})
|
|
|
|
c2 = c.copy()
|
|
|
|
filename = self.make_tempfile()
|
|
execute('env > %s' % filename, c)
|
|
execute('rm *', c)
|
|
|
|
self.assertFalse(c.headers)
|
|
self.assertFalse(c.querystring_params)
|
|
self.assertFalse(c.body_params)
|
|
self.assertFalse(c.options)
|
|
|
|
execute('source %s' % filename, c)
|
|
|
|
self.assertEqual(c, c2)
|
|
|
|
|
|
class TestExecution_help(ExecutionTestCase):
|
|
|
|
def test_help(self):
|
|
execute('help', self.context)
|
|
self.assert_stdout_startswith('Commands:\n\tcd')
|
|
|
|
def test_help_with_spaces(self):
|
|
execute(' help ', self.context)
|
|
self.assert_stdout_startswith('Commands:\n\tcd')
|
|
|
|
|
|
class TestExecution_exit(ExecutionTestCase):
|
|
|
|
def test_exit(self):
|
|
execute('exit', self.context)
|
|
self.assertTrue(self.context.should_exit)
|
|
|
|
def test_exit_with_spaces(self):
|
|
execute(' exit ', self.context)
|
|
self.assertTrue(self.context.should_exit)
|
|
|
|
|
|
class TestExecution_cd(ExecutionTestCase):
|
|
|
|
def test_single_level(self):
|
|
execute('cd api', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api')
|
|
|
|
def test_many_levels(self):
|
|
execute('cd api/v2/movie/50', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/v2/movie/50')
|
|
|
|
def test_change_base(self):
|
|
execute('cd //example.com/api', self.context)
|
|
self.assertEqual(self.context.url, 'http://example.com/api')
|
|
|
|
def test_root(self):
|
|
execute('cd /api/v2', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/v2')
|
|
|
|
execute('cd /index.html', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/index.html')
|
|
|
|
def test_dot_dot(self):
|
|
execute('cd api/v1', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/v1')
|
|
|
|
execute('cd ..', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api')
|
|
|
|
# If dot-dot has a trailing slash, the resulting URL should have a
|
|
# trailing slash
|
|
execute('cd ../rest/api/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/rest/api/')
|
|
|
|
def test_url_with_trailing_slash(self):
|
|
self.context.url = 'http://localhost/'
|
|
execute('cd api', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api')
|
|
|
|
execute('cd v2/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/v2/')
|
|
|
|
execute('cd /objects/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/objects/')
|
|
|
|
def test_path_with_trailing_slash(self):
|
|
execute('cd api/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/')
|
|
|
|
execute('cd movie/1/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/movie/1/')
|
|
|
|
def test_without_url(self):
|
|
execute('cd api/', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/')
|
|
|
|
execute('cd', self.context)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
|
|
|
|
class TestExecution_rm(ExecutionTestCase):
|
|
|
|
def test_header(self):
|
|
self.context.headers['Content-Type'] = 'text/html'
|
|
execute('rm -h Content-Type', self.context)
|
|
self.assertFalse(self.context.headers)
|
|
|
|
def test_option(self):
|
|
self.context.options['--form'] = None
|
|
execute('rm -o --form', self.context)
|
|
self.assertFalse(self.context.options)
|
|
|
|
def test_querystring(self):
|
|
self.context.querystring_params['page'] = '1'
|
|
execute('rm -q page', self.context)
|
|
self.assertFalse(self.context.querystring_params)
|
|
|
|
def test_body_param(self):
|
|
self.context.body_params['name'] = 'alice'
|
|
execute('rm -b name', self.context)
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_body_json_param(self):
|
|
self.context.body_json_params['name'] = 'bob'
|
|
execute('rm -b name', self.context)
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_header_single_quoted(self):
|
|
self.context.headers['Content-Type'] = 'text/html'
|
|
execute("rm -h 'Content-Type'", self.context)
|
|
self.assertFalse(self.context.headers)
|
|
|
|
def test_option_double_quoted(self):
|
|
self.context.options['--form'] = None
|
|
execute('rm -o "--form"', self.context)
|
|
self.assertFalse(self.context.options)
|
|
|
|
def test_querystring_double_quoted(self):
|
|
self.context.querystring_params['page size'] = '10'
|
|
execute('rm -q "page size"', self.context)
|
|
self.assertFalse(self.context.querystring_params)
|
|
|
|
def test_body_param_double_quoted(self):
|
|
self.context.body_params['family name'] = 'Doe Doe'
|
|
execute('rm -b "family name"', self.context)
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_body_param_escaped(self):
|
|
self.context.body_params['family name'] = 'Doe Doe'
|
|
execute(r'rm -b family\ name', self.context)
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_body_json_param_escaped_colon(self):
|
|
self.context.body_json_params[r'where[id\:gt]'] = 2
|
|
execute(r'rm -b where[id\:gt]', self.context)
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_body_param_escaped_equal(self):
|
|
self.context.body_params[r'foo\=bar'] = 'hello'
|
|
execute(r'rm -b foo\=bar', self.context)
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_non_existing_key(self):
|
|
execute('rm -q abcd', self.context)
|
|
self.assert_stderr("Key 'abcd' not found")
|
|
|
|
def test_non_existing_key_unicode(self): # See #25
|
|
execute(u'rm -q abcd', self.context)
|
|
self.assert_stderr("Key 'abcd' not found")
|
|
|
|
def test_body_reset(self):
|
|
self.context.body_params.update({
|
|
'first_name': 'alice',
|
|
'last_name': 'bryne'
|
|
})
|
|
execute('rm -b *', self.context)
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_querystring_reset(self):
|
|
self.context.querystring_params.update({
|
|
'first_name': 'alice',
|
|
'last_name': 'bryne'
|
|
})
|
|
execute('rm -q *', self.context)
|
|
self.assertFalse(self.context.querystring_params)
|
|
|
|
def test_headers_reset(self):
|
|
self.context.headers.update({
|
|
'Content-Type': 'text/html',
|
|
'Accept': 'application/json'
|
|
})
|
|
execute('rm -h *', self.context)
|
|
self.assertFalse(self.context.headers)
|
|
|
|
def test_options_reset(self):
|
|
self.context.options.update({
|
|
'--form': None,
|
|
'--body': None
|
|
})
|
|
execute('rm -o *', self.context)
|
|
self.assertFalse(self.context.options)
|
|
|
|
def test_reset(self):
|
|
self.context.options.update({
|
|
'--form': None,
|
|
'--verify': 'no'
|
|
})
|
|
self.context.headers.update({
|
|
'Accept': 'dontcare',
|
|
'Content-Type': 'dontcare'
|
|
})
|
|
self.context.querystring_params.update({
|
|
'name': 'dontcare',
|
|
'email': 'dontcare'
|
|
})
|
|
self.context.body_params.update({
|
|
'name': 'dontcare',
|
|
'email': 'dontcare'
|
|
})
|
|
self.context.body_json_params.update({
|
|
'name': 'dontcare'
|
|
})
|
|
|
|
execute('rm *', self.context)
|
|
|
|
self.assertFalse(self.context.options)
|
|
self.assertFalse(self.context.headers)
|
|
self.assertFalse(self.context.querystring_params)
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
|
|
class TestExecution_ls(ExecutionTestCase):
|
|
|
|
def test_root(self):
|
|
execute('ls', self.context)
|
|
self.assert_stdout('orgs users\n')
|
|
|
|
def test_relative_path(self):
|
|
self.context.url = 'http://localhost/users'
|
|
execute('ls 101', self.context)
|
|
self.assert_stdout('events orgs\n')
|
|
|
|
def test_absolute_path(self):
|
|
self.context.url = 'http://localhost/users'
|
|
execute('ls /orgs/1', self.context)
|
|
self.assert_stdout('events members\n')
|
|
|
|
def test_redirect_write(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('ls > %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'orgs\nusers')
|
|
|
|
def test_redirect_append(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's an append
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('ls >> %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'hello world\norgs\nusers')
|
|
|
|
def test_grep(self):
|
|
execute('ls | grep users', self.context)
|
|
self.assert_stdout('users\n')
|
|
|
|
|
|
class TestMutation(ExecutionTestCase):
|
|
|
|
def test_simple_headers(self):
|
|
execute('Accept:text/html User-Agent:HttpPrompt', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/html',
|
|
'User-Agent': 'HttpPrompt'
|
|
})
|
|
|
|
def test_header_value_with_double_quotes(self):
|
|
execute('Accept:text/html User-Agent:"HTTP Prompt"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/html',
|
|
'User-Agent': 'HTTP Prompt'
|
|
})
|
|
|
|
def test_header_value_with_single_quotes(self):
|
|
execute("Accept:text/html User-Agent:'HTTP Prompt'", self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/html',
|
|
'User-Agent': 'HTTP Prompt'
|
|
})
|
|
|
|
def test_header_with_double_quotes(self):
|
|
execute('Accept:text/html "User-Agent:HTTP Prompt"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/html',
|
|
'User-Agent': 'HTTP Prompt'
|
|
})
|
|
|
|
def test_header_with_single_quotes(self):
|
|
execute("Accept:text/html 'User-Agent:HTTP Prompt'", self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Accept': 'text/html',
|
|
'User-Agent': 'HTTP Prompt'
|
|
})
|
|
|
|
def test_header_escaped_chars(self):
|
|
execute(r'X-Name:John\'s\ Doe', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Name': "John's Doe"
|
|
})
|
|
|
|
def test_header_value_escaped_quote(self):
|
|
execute(r"'X-Name:John\'s Doe'", self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Name': "John's Doe"
|
|
})
|
|
|
|
def test_simple_querystring(self):
|
|
execute('page==1 limit==20', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'limit': ['20']
|
|
})
|
|
|
|
def test_querystring_with_double_quotes(self):
|
|
execute('page==1 name=="John Doe"', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'name': ['John Doe']
|
|
})
|
|
|
|
def test_querystring_with_single_quotes(self):
|
|
execute("page==1 name=='John Doe'", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'page': ['1'],
|
|
'name': ['John Doe']
|
|
})
|
|
|
|
def test_querystring_with_chinese(self):
|
|
execute("name==王小明", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'name': ['王小明']
|
|
})
|
|
|
|
def test_querystring_escaped_chars(self):
|
|
execute(r'name==John\'s\ Doe', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'name': ["John's Doe"]
|
|
})
|
|
|
|
def test_querytstring_value_escaped_quote(self):
|
|
execute(r"'name==John\'s Doe'", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'name': ["John's Doe"]
|
|
})
|
|
|
|
def test_querystring_key_escaped_quote(self):
|
|
execute(r"'john\'s last name==Doe'", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
"john's last name": ['Doe']
|
|
})
|
|
|
|
def test_simple_body_params(self):
|
|
execute('username=john password=123', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'username': 'john',
|
|
'password': '123'
|
|
})
|
|
|
|
def test_body_param_value_with_double_quotes(self):
|
|
execute('name="John Doe" password=123', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'John Doe',
|
|
'password': '123'
|
|
})
|
|
|
|
def test_body_param_value_with_single_quotes(self):
|
|
execute("name='John Doe' password=123", self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'John Doe',
|
|
'password': '123'
|
|
})
|
|
|
|
def test_body_param_with_double_quotes(self):
|
|
execute('"name=John Doe" password=123', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'John Doe',
|
|
'password': '123'
|
|
})
|
|
|
|
def test_body_param_with_spanish(self):
|
|
execute('name=Jesús', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'Jesús'
|
|
})
|
|
|
|
def test_body_param_escaped_chars(self):
|
|
execute(r'name=John\'s\ Doe', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': "John's Doe"
|
|
})
|
|
|
|
def test_body_param_value_escaped_quote(self):
|
|
execute(r"'name=John\'s Doe'", self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': "John's Doe"
|
|
})
|
|
|
|
def test_body_param_key_escaped_quote(self):
|
|
execute(r"'john\'s last name=Doe'", self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
"john's last name": 'Doe'
|
|
})
|
|
|
|
def test_long_option_names(self):
|
|
execute('--auth user:pass --form', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--form': None,
|
|
'--auth': 'user:pass'
|
|
})
|
|
|
|
def test_long_option_names_with_its_prefix(self):
|
|
execute('--auth-type basic --auth user:pass --session user '
|
|
'--session-read-only user', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--auth-type': 'basic',
|
|
'--auth': 'user:pass',
|
|
'--session-read-only': 'user',
|
|
'--session': 'user'
|
|
})
|
|
|
|
def test_long_short_option_names_mixed(self):
|
|
execute('--style=default -j --stream', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'-j': None,
|
|
'--stream': None,
|
|
'--style': 'default'
|
|
})
|
|
|
|
def test_option_and_body_param(self):
|
|
execute('--form name="John Doe"', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--form': None
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'John Doe'
|
|
})
|
|
|
|
def test_mixed(self):
|
|
execute(' --form name="John Doe" password=1234\\ 5678 '
|
|
'User-Agent:HTTP\\ Prompt -a \'john:1234 5678\' '
|
|
'"Accept:text/html" ', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--form': None,
|
|
'-a': 'john:1234 5678'
|
|
})
|
|
self.assertEqual(self.context.headers, {
|
|
'User-Agent': 'HTTP Prompt',
|
|
'Accept': 'text/html'
|
|
})
|
|
self.assertEqual(self.context.options, {
|
|
'--form': None,
|
|
'-a': 'john:1234 5678'
|
|
})
|
|
self.assertEqual(self.context.body_params, {
|
|
'name': 'John Doe',
|
|
'password': '1234 5678'
|
|
})
|
|
|
|
def test_multi_querystring(self):
|
|
execute('name==john name==doe', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'name': ['john', 'doe']
|
|
})
|
|
|
|
execute('name==jane', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'name': ['jane']
|
|
})
|
|
|
|
def test_raw_json_object(self):
|
|
execute("""definition:={"id":819,"name":"ML"}""", self.context)
|
|
self.assertEqual(self.context.body_json_params, {
|
|
'definition': {
|
|
'id': 819,
|
|
'name': 'ML'
|
|
}
|
|
})
|
|
|
|
def test_raw_json_object_quoted(self):
|
|
execute("""definition:='{"id": 819, "name": "ML"}'""", self.context)
|
|
self.assertEqual(self.context.body_json_params, {
|
|
'definition': {
|
|
'id': 819,
|
|
'name': 'ML'
|
|
}
|
|
})
|
|
|
|
def test_raw_json_array(self):
|
|
execute("""names:=["foo","bar"]""", self.context)
|
|
self.assertEqual(self.context.body_json_params, {
|
|
'names': ["foo", "bar"]
|
|
})
|
|
|
|
def test_raw_json_array_quoted(self):
|
|
execute("""names:='["foo", "bar"]'""", self.context)
|
|
self.assertEqual(self.context.body_json_params, {
|
|
'names': ["foo", "bar"]
|
|
})
|
|
|
|
def test_raw_json_integer(self):
|
|
execute('number:=999', self.context)
|
|
self.assertEqual(self.context.body_json_params, {'number': 999})
|
|
|
|
def test_raw_json_string(self):
|
|
execute("""name:='"john doe"'""", self.context)
|
|
self.assertEqual(self.context.body_json_params, {'name': 'john doe'})
|
|
|
|
def test_escape_colon(self):
|
|
execute(r'where[id\:gt]:=2', self.context)
|
|
self.assertEqual(self.context.body_json_params, {
|
|
r'where[id\:gt]': 2
|
|
})
|
|
|
|
def test_escape_equal(self):
|
|
execute(r'foo\=bar=hello', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
r'foo\=bar': 'hello'
|
|
})
|
|
|
|
|
|
class TestHttpAction(ExecutionTestCase):
|
|
|
|
def test_get(self):
|
|
execute('get', self.context)
|
|
self.assert_httpie_main_called_with(['GET', 'http://localhost'])
|
|
|
|
def test_get_uppercase(self):
|
|
execute('GET', self.context)
|
|
self.assert_httpie_main_called_with(['GET', 'http://localhost'])
|
|
|
|
def test_get_multi_querystring(self):
|
|
execute('get foo==1 foo==2 foo==3', self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'GET', 'http://localhost', 'foo==1', 'foo==2', 'foo==3'])
|
|
|
|
def test_post(self):
|
|
execute('post page==1', self.context)
|
|
self.assert_httpie_main_called_with(['POST', 'http://localhost',
|
|
'page==1'])
|
|
self.assertFalse(self.context.querystring_params)
|
|
|
|
def test_post_with_absolute_path(self):
|
|
execute('post /api/v3 name=bob', self.context)
|
|
self.assert_httpie_main_called_with(['POST', 'http://localhost/api/v3',
|
|
'name=bob'])
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
|
|
def test_post_with_relative_path(self):
|
|
self.context.url = 'http://localhost/api/v3'
|
|
execute('post ../v2/movie id=8', self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost/api/v2/movie', 'id=8'])
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertEqual(self.context.url, 'http://localhost/api/v3')
|
|
|
|
def test_post_with_full_url(self):
|
|
execute('post http://httpbin.org/post id=9', self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://httpbin.org/post', 'id=9'])
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
|
|
def test_post_with_full_https_url(self):
|
|
execute('post https://httpbin.org/post id=9', self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'https://httpbin.org/post', 'id=9'])
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
|
|
def test_post_uppercase(self):
|
|
execute('POST content=text', self.context)
|
|
self.assert_httpie_main_called_with(['POST', 'http://localhost',
|
|
'content=text'])
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_post_raw_json_object(self):
|
|
execute("""post definition:={"id":819,"name":"ML"}""",
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost',
|
|
"""definition:={"id": 819, "name": "ML"}"""])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_post_raw_json_object_quoted(self):
|
|
execute("""post definition:='{"id": 819, "name": "ML"}'""",
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost',
|
|
'definition:={"id": 819, "name": "ML"}'])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_post_raw_json_array(self):
|
|
execute("""post hobbies:=["foo","bar"]""",
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost',
|
|
'hobbies:=["foo", "bar"]'])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_post_raw_json_array_quoted(self):
|
|
execute("""post hobbies:='["foo", "bar"]'""",
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost',
|
|
'hobbies:=["foo", "bar"]'])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_post_raw_json_integer(self):
|
|
execute('post number:=123',
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost', 'number:=123'])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_post_raw_json_boolean(self):
|
|
execute('post foo:=true',
|
|
self.context)
|
|
self.assert_httpie_main_called_with([
|
|
'POST', 'http://localhost', 'foo:=true'])
|
|
self.assertFalse(self.context.body_json_params)
|
|
|
|
def test_delete(self):
|
|
execute('delete', self.context)
|
|
self.assert_httpie_main_called_with(['DELETE', 'http://localhost'])
|
|
|
|
def test_delete_uppercase(self):
|
|
execute('DELETE', self.context)
|
|
self.assert_httpie_main_called_with(['DELETE', 'http://localhost'])
|
|
|
|
def test_patch(self):
|
|
execute('patch', self.context)
|
|
self.assert_httpie_main_called_with(['PATCH', 'http://localhost'])
|
|
|
|
def test_patch_uppercase(self):
|
|
execute('PATCH', self.context)
|
|
self.assert_httpie_main_called_with(['PATCH', 'http://localhost'])
|
|
|
|
def test_head(self):
|
|
execute('head', self.context)
|
|
self.assert_httpie_main_called_with(['HEAD', 'http://localhost'])
|
|
|
|
def test_head_uppercase(self):
|
|
execute('HEAD', self.context)
|
|
self.assert_httpie_main_called_with(['HEAD', 'http://localhost'])
|
|
|
|
def test_options(self):
|
|
execute('options', self.context)
|
|
self.assert_httpie_main_called_with(['OPTIONS', 'http://localhost'])
|
|
|
|
|
|
class TestHttpActionRedirection(ExecutionTestCase):
|
|
|
|
def test_get(self):
|
|
execute('get > data.json', self.context)
|
|
self.assert_httpie_main_called_with(['GET', 'http://localhost'])
|
|
|
|
env = self.httpie_main.call_args[1]['env']
|
|
self.assertFalse(env.stdout_isatty)
|
|
self.assertEqual(env.stdout.fp.name, 'data.json')
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestHttpBin(TempAppDirTestCase):
|
|
"""Send real requests to http://httpbin.org, save the responses to files,
|
|
and asserts on the file content.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(TestHttpBin, self).setUp()
|
|
|
|
# XXX: pytest doesn't allow HTTPie to read stdin while it's capturing
|
|
# stdout, so we replace stdin with a file temporarily during the test.
|
|
class MockStdin(object):
|
|
def __init__(self, fp):
|
|
self.fp = fp
|
|
|
|
def isatty(self):
|
|
return True
|
|
|
|
def __getattr__(self, name):
|
|
if name == 'isatty':
|
|
return self.isatty
|
|
return getattr(self.fp, name)
|
|
|
|
self.orig_stdin = sys.stdin
|
|
filename = self.make_tempfile()
|
|
sys.stdin = MockStdin(open(filename, 'rb'))
|
|
sys.stdin.isatty = lambda: True
|
|
|
|
# Mock echo_via_pager() so that we can catch data fed to stdout
|
|
self.patcher = patch('http_prompt.output.click.echo_via_pager')
|
|
self.echo_via_pager = self.patcher.start()
|
|
|
|
def tearDown(self):
|
|
self.patcher.stop()
|
|
|
|
sys.stdin.close()
|
|
sys.stdin = self.orig_stdin
|
|
|
|
super(TestHttpBin, self).tearDown()
|
|
|
|
def get_stdout(self):
|
|
return self.echo_via_pager.call_args[0][0]
|
|
|
|
def execute_redirection(self, command):
|
|
context = Context('http://httpbin.org')
|
|
filename = self.make_tempfile()
|
|
execute('%s > %s' % (command, filename), context)
|
|
|
|
with open(filename, 'rb') as f:
|
|
return f.read()
|
|
|
|
def execute_pipe(self, command):
|
|
context = Context('http://httpbin.org')
|
|
execute(command, context)
|
|
|
|
def test_get_image(self):
|
|
data = self.execute_redirection('get /image/png')
|
|
self.assertTrue(data)
|
|
self.assertEqual(hashlib.sha1(data).hexdigest(),
|
|
'379f5137831350c900e757b39e525b9db1426d53')
|
|
|
|
def test_get_querystring(self):
|
|
data = self.execute_redirection(
|
|
'get /get id==1234 X-Custom-Header:5678')
|
|
data = json.loads(data.decode())
|
|
self.assertEqual(data['args'], {
|
|
'id': '1234'
|
|
})
|
|
self.assertEqual(data['headers']['X-Custom-Header'], '5678')
|
|
|
|
def test_post_json(self):
|
|
data = self.execute_redirection(
|
|
'post /post id=1234 X-Custom-Header:5678')
|
|
data = json.loads(data.decode())
|
|
self.assertEqual(data['json'], {
|
|
'id': '1234'
|
|
})
|
|
self.assertEqual(data['headers']['X-Custom-Header'], '5678')
|
|
|
|
def test_post_form(self):
|
|
data = self.execute_redirection(
|
|
'post /post --form id=1234 X-Custom-Header:5678')
|
|
data = json.loads(data.decode())
|
|
self.assertEqual(data['form'], {
|
|
'id': '1234'
|
|
})
|
|
self.assertEqual(data['headers']['X-Custom-Header'], '5678')
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Unix only")
|
|
def test_get_and_tee(self):
|
|
filename = self.make_tempfile()
|
|
self.execute_pipe('get /get hello==world | tee %s' % filename)
|
|
|
|
with open(filename) as f:
|
|
data = json.load(f)
|
|
self.assertEqual(data['args'], {'hello': 'world'})
|
|
|
|
printed_msg = self.get_stdout()
|
|
data = json.loads(printed_msg)
|
|
self.assertEqual(data['args'], {'hello': 'world'})
|
|
|
|
|
|
class TestCommandPreview(ExecutionTestCase):
|
|
|
|
def test_httpie_without_args(self):
|
|
execute('httpie', self.context)
|
|
self.assert_stdout('http http://localhost\n')
|
|
|
|
def test_httpie_with_post(self):
|
|
execute('httpie post name=alice', self.context)
|
|
self.assert_stdout('http POST http://localhost name=alice\n')
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_httpie_with_absolute_path(self):
|
|
execute('httpie post /api name=alice', self.context)
|
|
self.assert_stdout('http POST http://localhost/api name=alice\n')
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_httpie_with_full_url(self):
|
|
execute('httpie POST http://httpbin.org/post name=alice', self.context)
|
|
self.assert_stdout('http POST http://httpbin.org/post name=alice\n')
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_httpie_with_full_https_url(self):
|
|
execute('httpie post https://httpbin.org/post name=alice',
|
|
self.context)
|
|
self.assert_stdout('http POST https://httpbin.org/post name=alice\n')
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.body_params)
|
|
|
|
def test_httpie_with_quotes(self):
|
|
execute(r'httpie post http://httpbin.org/post name="john doe" '
|
|
r"apikey==abc\ 123 'Authorization:ApiKey 1234'",
|
|
self.context)
|
|
self.assert_stdout(
|
|
"http POST http://httpbin.org/post 'apikey==abc 123' "
|
|
"'name=john doe' 'Authorization:ApiKey 1234'\n")
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.body_params)
|
|
self.assertFalse(self.context.querystring_params)
|
|
self.assertFalse(self.context.headers)
|
|
|
|
def test_httpie_with_multi_querystring(self):
|
|
execute('httpie get foo==1 foo==2 foo==3', self.context)
|
|
self.assert_stdout('http GET http://localhost foo==1 foo==2 foo==3\n')
|
|
self.assertEqual(self.context.url, 'http://localhost')
|
|
self.assertFalse(self.context.querystring_params)
|
|
|
|
|
|
class TestPipe(ExecutionTestCase):
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Unix only")
|
|
def test_httpie_sed(self):
|
|
execute("httpie get some==data | sed 's/data$/input/'", self.context)
|
|
self.assert_stdout('http GET http://localhost some==input\n')
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Unix only")
|
|
def test_httpie_sed_with_echo(self):
|
|
execute("httpie post | `echo \"sed 's/localhost$/127.0.0.1/'\"`",
|
|
self.context)
|
|
self.assert_stdout("http POST http://127.0.0.1\n")
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Unix only")
|
|
def test_env_grep(self):
|
|
self.context.body_params = {
|
|
'username': 'jane',
|
|
'name': 'Jane',
|
|
'password': '1234'
|
|
}
|
|
execute('env | grep name', self.context)
|
|
self.assert_stdout('name=Jane\nusername=jane\n')
|
|
|
|
|
|
class TestShellSubstitution(ExecutionTestCase):
|
|
|
|
def test_unquoted_option(self):
|
|
execute("--auth `echo user:pass`", self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--auth': 'user:pass'
|
|
})
|
|
|
|
def test_partial_unquoted_option(self):
|
|
execute("--auth user:`echo pass`", self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--auth': 'user:pass'
|
|
})
|
|
|
|
def test_partial_squoted_option(self):
|
|
execute("--auth='user:`echo pass`'", self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--auth': 'user:pass'
|
|
})
|
|
|
|
def test_partial_dquoted_option(self):
|
|
execute('--auth="user:`echo pass`"', self.context)
|
|
self.assertEqual(self.context.options, {
|
|
'--auth': 'user:pass'
|
|
})
|
|
|
|
def test_unquoted_header(self):
|
|
execute("`echo 'X-Greeting'`:`echo 'hello world'`", self.context)
|
|
if sys.platform == 'win32':
|
|
expected_key = "'X-Greeting'"
|
|
expected_value = "'hello world'"
|
|
else:
|
|
expected_key = 'X-Greeting'
|
|
expected_value = 'hello world'
|
|
|
|
self.assertEqual(self.context.headers, {
|
|
expected_key: expected_value
|
|
})
|
|
|
|
def test_full_squoted_header(self):
|
|
execute("'`echo X-Greeting`:`echo hello`'", self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Greeting': 'hello'
|
|
})
|
|
|
|
def test_full_dquoted_header(self):
|
|
execute('"`echo X-Greeting`:`echo hello`"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Greeting': 'hello'
|
|
})
|
|
|
|
def test_value_squoted_header(self):
|
|
execute("`echo X-Greeting`:'`echo hello`'", self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Greeting': 'hello'
|
|
})
|
|
|
|
def test_value_dquoted_header(self):
|
|
execute('`echo X-Greeting`:"`echo hello`"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'X-Greeting': 'hello'
|
|
})
|
|
|
|
def test_partial_value_dquoted_header(self):
|
|
execute('Authorization:"Bearer `echo OAUTH TOKEN`"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'Bearer OAUTH TOKEN'
|
|
})
|
|
|
|
def test_partial_full_dquoted_header(self):
|
|
execute('"Authorization:Bearer `echo OAUTH TOKEN`"', self.context)
|
|
self.assertEqual(self.context.headers, {
|
|
'Authorization': 'Bearer OAUTH TOKEN'
|
|
})
|
|
|
|
def test_unquoted_querystring(self):
|
|
execute("`echo greeting`==`echo 'hello world'`", self.context)
|
|
expected = ("'hello world'"
|
|
if sys.platform == 'win32' else 'hello world')
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'greeting': [expected]
|
|
})
|
|
|
|
def test_full_squoted_querystring(self):
|
|
execute("'`echo greeting`==`echo hello`'", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'greeting': ['hello']
|
|
})
|
|
|
|
def test_value_squoted_querystring(self):
|
|
execute("`echo greeting`=='`echo hello`'", self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'greeting': ['hello']
|
|
})
|
|
|
|
def test_value_dquoted_querystring(self):
|
|
execute('`echo greeting`=="`echo hello`"', self.context)
|
|
self.assertEqual(self.context.querystring_params, {
|
|
'greeting': ['hello']
|
|
})
|
|
|
|
def test_unquoted_body_param(self):
|
|
execute("`echo greeting`=`echo 'hello world'`", self.context)
|
|
expected = ("'hello world'"
|
|
if sys.platform == 'win32' else 'hello world')
|
|
self.assertEqual(self.context.body_params, {
|
|
'greeting': expected
|
|
})
|
|
|
|
def test_full_squoted_body_param(self):
|
|
execute("'`echo greeting`=`echo hello`'", self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'greeting': 'hello'
|
|
})
|
|
|
|
def test_value_squoted_body_param(self):
|
|
execute("`echo greeting`='`echo hello`'", self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'greeting': 'hello'
|
|
})
|
|
|
|
def test_full_dquoted_body_param(self):
|
|
execute('"`echo greeting`=`echo hello`"', self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'greeting': 'hello'
|
|
})
|
|
|
|
def test_bad_command(self):
|
|
execute("name=`bad command test`", self.context)
|
|
self.assertEqual(self.context.body_params, {'name': ''})
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32', reason="Unix only")
|
|
def test_pipe_and_grep(self):
|
|
execute("greeting=`echo 'hello world\nhihi\n' | grep hello`",
|
|
self.context)
|
|
self.assertEqual(self.context.body_params, {
|
|
'greeting': 'hello world'
|
|
})
|
|
|
|
|
|
class TestCommandPreviewRedirection(ExecutionTestCase):
|
|
|
|
def test_httpie_redirect_write(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie > %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'http http://localhost\n')
|
|
|
|
def test_httpie_redirect_write_quoted_filename(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie > "%s"' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'http http://localhost\n')
|
|
|
|
@pytest.mark.skipif(sys.platform == 'win32',
|
|
reason="Windows doesn't use backslashes to escape")
|
|
def test_httpie_redirect_write_escaped_filename(self):
|
|
filename = self.make_tempfile()
|
|
filename += r' copy'
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie > %s' % filename.replace(' ', r'\ '), self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'http http://localhost\n')
|
|
|
|
def test_httpie_redirect_write_with_args(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's a full overwrite
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie post http://example.org name=john > %s' % filename,
|
|
self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'http POST http://example.org name=john\n')
|
|
|
|
def test_httpie_redirect_append(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's an append
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie >> %s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'hello world\nhttp http://localhost\n')
|
|
|
|
def test_httpie_redirect_append_without_spaces(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's an append
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute('httpie>>%s' % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'hello world\nhttp http://localhost\n')
|
|
|
|
def test_httpie_redirect_append_quoted_filename(self):
|
|
filename = self.make_tempfile()
|
|
|
|
# Write something first to make sure it's an append
|
|
with open(filename, 'w') as f:
|
|
f.write('hello world\n')
|
|
|
|
execute("httpie >> '%s'" % filename, self.context)
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
self.assertEqual(content, 'hello world\nhttp http://localhost\n')
|