Refactoring

This commit is contained in:
Jakub Roztocil 2019-08-31 15:17:10 +02:00
parent 466df77b6b
commit aba3b1ec01
25 changed files with 1041 additions and 960 deletions

View File

@ -25,7 +25,7 @@ PACKAGES = [
def get_package_meta(package_name):
api_url = 'https://pypi.python.org/pypi/{}/json'.format(package_name)
api_url = f'https://pypi.python.org/pypi/{package_name}/json'
resp = requests.get(api_url).json()
hasher = hashlib.sha256()
for release in resp['urls']:
@ -38,8 +38,7 @@ def get_package_meta(package_name):
'sha256': hasher.hexdigest(),
}
else:
raise RuntimeError(
'{}: download not found: {}'.format(package_name, resp))
raise RuntimeError(f'{package_name}: download not found: {resp}')
def main():

0
httpie/cli/__init__.py Normal file
View File

387
httpie/cli/argparser.py Normal file
View File

@ -0,0 +1,387 @@
import argparse
import errno
import os
import re
import sys
from argparse import RawDescriptionHelpFormatter
from textwrap import dedent
from urllib.parse import urlsplit
from httpie.cli.argtypes import AuthCredentials, KeyValueArgType, parse_auth
from httpie.cli.constants import (
HTTP_GET, HTTP_POST, OUTPUT_OPTIONS, OUTPUT_OPTIONS_DEFAULT,
OUTPUT_OPTIONS_DEFAULT_STDOUT_REDIRECTED, OUT_RESP_BODY, PRETTY_MAP,
PRETTY_STDOUT_TTY_ONLY, SEPARATOR_CREDENTIALS, SEPARATOR_GROUP_ALL_ITEMS,
SEPARATOR_GROUP_DATA_ITEMS, URL_SCHEME_RE,
)
from httpie.cli.exceptions import ParseError
from httpie.cli.requestitems import RequestItems
from httpie.context import Environment
from httpie.plugins import plugin_manager
from httpie.utils import ExplicitNullAuth, get_content_type
class HTTPieHelpFormatter(RawDescriptionHelpFormatter):
"""A nicer help formatter.
Help for arguments can be indented and contain new lines.
It will be de-dented and arguments in the help
will be separated by a blank line for better readability.
"""
def __init__(self, max_help_position=6, *args, **kwargs):
# A smaller indent for args help.
kwargs['max_help_position'] = max_help_position
super().__init__(*args, **kwargs)
def _split_lines(self, text, width):
text = dedent(text).strip() + '\n\n'
return text.splitlines()
class HTTPieArgumentParser(argparse.ArgumentParser):
"""Adds additional logic to `argparse.ArgumentParser`.
Handles all input (CLI args, file args, stdin), applies defaults,
and performs extra validation.
"""
def __init__(self, *args, formatter_class=HTTPieHelpFormatter, **kwargs):
kwargs['add_help'] = False
super().__init__(*args, formatter_class=formatter_class, **kwargs)
self.env = None
self.args = None
self.has_stdin_data = False
# noinspection PyMethodOverriding
def parse_args(
self,
env: Environment,
program_name='http',
args=None,
namespace=None
) -> argparse.Namespace:
self.env = env
self.args, no_options = super().parse_known_args(args, namespace)
if self.args.debug:
self.args.traceback = True
self.has_stdin_data = (
self.env.stdin
and not self.args.ignore_stdin
and not self.env.stdin_isatty
)
# Arguments processing and environment setup.
self._apply_no_options(no_options)
self._validate_download_options()
self._setup_standard_streams()
self._process_output_options()
self._process_pretty_options()
self._guess_method()
self._parse_items()
if self.has_stdin_data:
self._body_from_file(self.env.stdin)
if not URL_SCHEME_RE.match(self.args.url):
if os.path.basename(program_name) == 'https':
scheme = 'https://'
else:
scheme = self.args.default_scheme + "://"
# See if we're using curl style shorthand for localhost (:3000/foo)
shorthand = re.match(r'^:(?!:)(\d*)(/?.*)$', self.args.url)
if shorthand:
port = shorthand.group(1)
rest = shorthand.group(2)
self.args.url = scheme + 'localhost'
if port:
self.args.url += ':' + port
self.args.url += rest
else:
self.args.url = scheme + self.args.url
self._process_auth()
return self.args
# noinspection PyShadowingBuiltins
def _print_message(self, message, file=None):
# Sneak in our stderr/stdout.
file = {
sys.stdout: self.env.stdout,
sys.stderr: self.env.stderr,
None: self.env.stderr
}.get(file, file)
if not hasattr(file, 'buffer') and isinstance(message, str):
message = message.encode(self.env.stdout_encoding)
super()._print_message(message, file)
def _setup_standard_streams(self):
"""
Modify `env.stdout` and `env.stdout_isatty` based on args, if needed.
"""
self.args.output_file_specified = bool(self.args.output_file)
if self.args.download:
# FIXME: Come up with a cleaner solution.
if not self.args.output_file and not self.env.stdout_isatty:
# Use stdout as the download output file.
self.args.output_file = self.env.stdout
# With `--download`, we write everything that would normally go to
# `stdout` to `stderr` instead. Let's replace the stream so that
# we don't have to use many `if`s throughout the codebase.
# The response body will be treated separately.
self.env.stdout = self.env.stderr
self.env.stdout_isatty = self.env.stderr_isatty
elif self.args.output_file:
# When not `--download`ing, then `--output` simply replaces
# `stdout`. The file is opened for appending, which isn't what
# we want in this case.
self.args.output_file.seek(0)
try:
self.args.output_file.truncate()
except IOError as e:
if e.errno == errno.EINVAL:
# E.g. /dev/null on Linux.
pass
else:
raise
self.env.stdout = self.args.output_file
self.env.stdout_isatty = False
def _process_auth(self):
# TODO: refactor
self.args.auth_plugin = None
default_auth_plugin = plugin_manager.get_auth_plugins()[0]
auth_type_set = self.args.auth_type is not None
url = urlsplit(self.args.url)
if self.args.auth is None and not auth_type_set:
if url.username is not None:
# Handle http://username:password@hostname/
username = url.username
password = url.password or ''
self.args.auth = AuthCredentials(
key=username,
value=password,
sep=SEPARATOR_CREDENTIALS,
orig=SEPARATOR_CREDENTIALS.join([username, password])
)
if self.args.auth is not None or auth_type_set:
if not self.args.auth_type:
self.args.auth_type = default_auth_plugin.auth_type
plugin = plugin_manager.get_auth_plugin(self.args.auth_type)()
if plugin.auth_require and self.args.auth is None:
self.error('--auth required')
plugin.raw_auth = self.args.auth
self.args.auth_plugin = plugin
already_parsed = isinstance(self.args.auth, AuthCredentials)
if self.args.auth is None or not plugin.auth_parse:
self.args.auth = plugin.get_auth()
else:
if already_parsed:
# from the URL
credentials = self.args.auth
else:
credentials = parse_auth(self.args.auth)
if (not credentials.has_password()
and plugin.prompt_password):
if self.args.ignore_stdin:
# Non-tty stdin read by now
self.error(
'Unable to prompt for passwords because'
' --ignore-stdin is set.'
)
credentials.prompt_password(url.netloc)
self.args.auth = plugin.get_auth(
username=credentials.key,
password=credentials.value,
)
if not self.args.auth and self.args.ignore_netrc:
# Set a no-op auth to force requests to ignore .netrc
# <https://github.com/psf/requests/issues/2773#issuecomment-174312831>
self.args.auth = ExplicitNullAuth()
def _apply_no_options(self, no_options):
"""For every `--no-OPTION` in `no_options`, set `args.OPTION` to
its default value. This allows for un-setting of options, e.g.,
specified in config.
"""
invalid = []
for option in no_options:
if not option.startswith('--no-'):
invalid.append(option)
continue
# --no-option => --option
inverted = '--' + option[5:]
for action in self._actions:
if inverted in action.option_strings:
setattr(self.args, action.dest, action.default)
break
else:
invalid.append(option)
if invalid:
msg = 'unrecognized arguments: %s'
self.error(msg % ' '.join(invalid))
def _body_from_file(self, fd):
"""There can only be one source of request data.
Bytes are always read.
"""
if self.args.data:
self.error('Request body (from stdin or a file) and request '
'data (key=value) cannot be mixed. Pass '
'--ignore-stdin to let key/value take priority.')
self.args.data = getattr(fd, 'buffer', fd).read()
def _guess_method(self):
"""Set `args.method` if not specified to either POST or GET
based on whether the request has data or not.
"""
if self.args.method is None:
# Invoked as `http URL'.
assert not self.args.request_items
if self.has_stdin_data:
self.args.method = HTTP_POST
else:
self.args.method = HTTP_GET
# FIXME: False positive, e.g., "localhost" matches but is a valid URL.
elif not re.match('^[a-zA-Z]+$', self.args.method):
# Invoked as `http URL item+'. The URL is now in `args.method`
# and the first ITEM is now incorrectly in `args.url`.
try:
# Parse the URL as an ITEM and store it as the first ITEM arg.
self.args.request_items.insert(0, KeyValueArgType(
*SEPARATOR_GROUP_ALL_ITEMS).__call__(self.args.url))
except argparse.ArgumentTypeError as e:
if self.args.traceback:
raise
self.error(e.args[0])
else:
# Set the URL correctly
self.args.url = self.args.method
# Infer the method
has_data = (
self.has_stdin_data
or any(
item.sep in SEPARATOR_GROUP_DATA_ITEMS
for item in self.args.request_items)
)
self.args.method = HTTP_POST if has_data else HTTP_GET
def _parse_items(self):
"""
Parse `args.request_items` into `args.headers`, `args.data`,
`args.params`, and `args.files`.
"""
try:
request_items = RequestItems.from_args(
request_item_args=self.args.request_items,
as_form=self.args.form,
)
except ParseError as e:
if self.args.traceback:
raise
self.error(e.args[0])
else:
self.args.headers = request_items.headers
self.args.data = request_items.data
self.args.files = request_items.files
self.args.params = request_items.params
if self.args.files and not self.args.form:
# `http url @/path/to/file`
file_fields = list(self.args.files.keys())
if file_fields != ['']:
self.error(
'Invalid file fields (perhaps you meant --form?): %s'
% ','.join(file_fields))
fn, fd, ct = self.args.files['']
self.args.files = {}
self._body_from_file(fd)
if 'Content-Type' not in self.args.headers:
content_type = get_content_type(fn)
if content_type:
self.args.headers['Content-Type'] = content_type
def _process_output_options(self):
"""Apply defaults to output options, or validate the provided ones.
The default output options are stdout-type-sensitive.
"""
def check_options(value, option):
unknown = set(value) - OUTPUT_OPTIONS
if unknown:
self.error('Unknown output options: {0}={1}'.format(
option,
','.join(unknown)
))
if self.args.verbose:
self.args.all = True
if self.args.output_options is None:
if self.args.verbose:
self.args.output_options = ''.join(OUTPUT_OPTIONS)
else:
self.args.output_options = (
OUTPUT_OPTIONS_DEFAULT
if self.env.stdout_isatty
else OUTPUT_OPTIONS_DEFAULT_STDOUT_REDIRECTED
)
if self.args.output_options_history is None:
self.args.output_options_history = self.args.output_options
check_options(self.args.output_options, '--print')
check_options(self.args.output_options_history, '--history-print')
if self.args.download and OUT_RESP_BODY in self.args.output_options:
# Response body is always downloaded with --download and it goes
# through a different routine, so we remove it.
self.args.output_options = str(
set(self.args.output_options) - set(OUT_RESP_BODY))
def _process_pretty_options(self):
if self.args.prettify == PRETTY_STDOUT_TTY_ONLY:
self.args.prettify = PRETTY_MAP[
'all' if self.env.stdout_isatty else 'none']
elif (self.args.prettify and self.env.is_windows
and self.args.output_file):
self.error('Only terminal output can be colorized on Windows.')
else:
# noinspection PyTypeChecker
self.args.prettify = PRETTY_MAP[self.args.prettify]
def _validate_download_options(self):
if not self.args.download:
if self.args.download_resume:
self.error('--continue only works with --download')
if self.args.download_resume and not (
self.args.download and self.args.output_file):
self.error('--continue requires --output to be specified')

180
httpie/cli/argtypes.py Normal file
View File

@ -0,0 +1,180 @@
import argparse
import getpass
import os
import sys
from httpie.cli.constants import SEPARATOR_CREDENTIALS
from httpie.sessions import VALID_SESSION_NAME_PATTERN
class KeyValueArg:
"""Base key-value pair parsed from CLI."""
def __init__(self, key, value, sep, orig):
self.key = key
self.value = value
self.sep = sep
self.orig = orig
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
return repr(self.__dict__)
class SessionNameValidator:
def __init__(self, error_message):
self.error_message = error_message
def __call__(self, value):
# Session name can be a path or just a name.
if (os.path.sep not in value
and not VALID_SESSION_NAME_PATTERN.search(value)):
raise argparse.ArgumentError(None, self.error_message)
return value
class Escaped(str):
"""Represents an escaped character."""
class KeyValueArgType:
"""A key-value pair argument type used with `argparse`.
Parses a key-value arg and constructs a `KeyValuArge` instance.
Used for headers, form data, and other key-value pair types.
"""
key_value_class = KeyValueArg
def __init__(self, *separators):
self.separators = separators
self.special_characters = set('\\')
for separator in separators:
self.special_characters.update(separator)
def __call__(self, string) -> KeyValueArg:
"""Parse `string` and return `self.key_value_class()` instance.
The best of `self.separators` is determined (first found, longest).
Back slash escaped characters aren't considered as separators
(or parts thereof). Literal back slash characters have to be escaped
as well (r'\\').
"""
def tokenize(string):
r"""Tokenize `string`. There are only two token types - strings
and escaped characters:
tokenize(r'foo\=bar\\baz')
=> ['foo', Escaped('='), 'bar', Escaped('\\'), 'baz']
"""
tokens = ['']
characters = iter(string)
for char in characters:
if char == '\\':
char = next(characters, '')
if char not in self.special_characters:
tokens[-1] += '\\' + char
else:
tokens.extend([Escaped(char), ''])
else:
tokens[-1] += char
return tokens
tokens = tokenize(string)
# Sorting by length ensures that the longest one will be
# chosen as it will overwrite any shorter ones starting
# at the same position in the `found` dictionary.
separators = sorted(self.separators, key=len)
for i, token in enumerate(tokens):
if isinstance(token, Escaped):
continue
found = {}
for sep in separators:
pos = token.find(sep)
if pos != -1:
found[pos] = sep
if found:
# Starting first, longest separator found.
sep = found[min(found.keys())]
key, value = token.split(sep, 1)
# Any preceding tokens are part of the key.
key = ''.join(tokens[:i]) + key
# Any following tokens are part of the value.
value += ''.join(tokens[i + 1:])
break
else:
raise argparse.ArgumentTypeError(
u'"%s" is not a valid value' % string)
return self.key_value_class(
key=key, value=value, sep=sep, orig=string)
class AuthCredentials(KeyValueArg):
"""Represents parsed credentials."""
def _getpass(self, prompt):
# To allow mocking.
return getpass.getpass(str(prompt))
def has_password(self):
return self.value is not None
def prompt_password(self, host):
try:
self.value = self._getpass(
'http: password for %s@%s: ' % (self.key, host))
except (EOFError, KeyboardInterrupt):
sys.stderr.write('\n')
sys.exit(0)
class AuthCredentialsArgType(KeyValueArgType):
"""A key-value arg type that parses credentials."""
key_value_class = AuthCredentials
def __call__(self, string):
"""Parse credentials from `string`.
("username" or "username:password").
"""
try:
return super().__call__(string)
except argparse.ArgumentTypeError:
# No password provided, will prompt for it later.
return self.key_value_class(
key=string,
value=None,
sep=SEPARATOR_CREDENTIALS,
orig=string
)
parse_auth = AuthCredentialsArgType(SEPARATOR_CREDENTIALS)
def readable_file_arg(filename):
try:
with open(filename, 'rb'):
return filename
except IOError as ex:
raise argparse.ArgumentTypeError('%s: %s' % (filename, ex.args[1]))

102
httpie/cli/constants.py Normal file
View File

@ -0,0 +1,102 @@
"""Parsing and processing of CLI input (args, auth credentials, files, stdin).
"""
import re
import ssl
# TODO: Use MultiDict for headers once added to `requests`.
# https://github.com/jakubroztocil/httpie/issues/130
# ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
# <https://tools.ietf.org/html/rfc3986#section-3.1>
URL_SCHEME_RE = re.compile(r'^[a-z][a-z0-9.+-]*://', re.IGNORECASE)
HTTP_POST = 'POST'
HTTP_GET = 'GET'
# Various separators used in args
SEPARATOR_HEADER = ':'
SEPARATOR_HEADER_EMPTY = ';'
SEPARATOR_CREDENTIALS = ':'
SEPARATOR_PROXY = ':'
SEPARATOR_DATA_STRING = '='
SEPARATOR_DATA_RAW_JSON = ':='
SEPARATOR_FILE_UPLOAD = '@'
SEPARATOR_DATA_EMBED_FILE_CONTENTS = '=@'
SEPARATOR_DATA_EMBED_RAW_JSON_FILE = ':=@'
SEPARATOR_QUERY_PARAM = '=='
# Separators that become request data
SEPARATOR_GROUP_DATA_ITEMS = frozenset({
SEPARATOR_DATA_STRING,
SEPARATOR_DATA_RAW_JSON,
SEPARATOR_FILE_UPLOAD,
SEPARATOR_DATA_EMBED_FILE_CONTENTS,
SEPARATOR_DATA_EMBED_RAW_JSON_FILE
})
# Separators for items whose value is a filename to be embedded
SEPARATOR_GROUP_DATA_EMBED_ITEMS = frozenset({
SEPARATOR_DATA_EMBED_FILE_CONTENTS,
SEPARATOR_DATA_EMBED_RAW_JSON_FILE,
})
# Separators for raw JSON items
SEPARATOR_GROUP_RAW_JSON_ITEMS = frozenset([
SEPARATOR_DATA_RAW_JSON,
SEPARATOR_DATA_EMBED_RAW_JSON_FILE,
])
# Separators allowed in ITEM arguments
SEPARATOR_GROUP_ALL_ITEMS = frozenset({
SEPARATOR_HEADER,
SEPARATOR_HEADER_EMPTY,
SEPARATOR_QUERY_PARAM,
SEPARATOR_DATA_STRING,
SEPARATOR_DATA_RAW_JSON,
SEPARATOR_FILE_UPLOAD,
SEPARATOR_DATA_EMBED_FILE_CONTENTS,
SEPARATOR_DATA_EMBED_RAW_JSON_FILE,
})
# Output options
OUT_REQ_HEAD = 'H'
OUT_REQ_BODY = 'B'
OUT_RESP_HEAD = 'h'
OUT_RESP_BODY = 'b'
OUTPUT_OPTIONS = frozenset({
OUT_REQ_HEAD,
OUT_REQ_BODY,
OUT_RESP_HEAD,
OUT_RESP_BODY
})
# Pretty
PRETTY_MAP = {
'all': ['format', 'colors'],
'colors': ['colors'],
'format': ['format'],
'none': []
}
PRETTY_STDOUT_TTY_ONLY = object()
# Defaults
OUTPUT_OPTIONS_DEFAULT = OUT_RESP_HEAD + OUT_RESP_BODY
OUTPUT_OPTIONS_DEFAULT_STDOUT_REDIRECTED = OUT_RESP_BODY
SSL_VERSION_ARG_MAPPING = {
'ssl2.3': 'PROTOCOL_SSLv23',
'ssl3': 'PROTOCOL_SSLv3',
'tls1': 'PROTOCOL_TLSv1',
'tls1.1': 'PROTOCOL_TLSv1_1',
'tls1.2': 'PROTOCOL_TLSv1_2',
'tls1.3': 'PROTOCOL_TLSv1_3',
}
SSL_VERSION_ARG_MAPPING = {
cli_arg: getattr(ssl, ssl_constant)
for cli_arg, ssl_constant in SSL_VERSION_ARG_MAPPING.items()
if hasattr(ssl, ssl_constant)
}

View File

@ -2,52 +2,29 @@
CLI arguments definition.
"""
from argparse import (
RawDescriptionHelpFormatter, FileType,
OPTIONAL, ZERO_OR_MORE, SUPPRESS
)
from argparse import (FileType, OPTIONAL, SUPPRESS, ZERO_OR_MORE)
from textwrap import dedent, wrap
from httpie import __doc__, __version__
from httpie.input import (
HTTPieArgumentParser, KeyValueArgType,
SEP_PROXY, SEP_GROUP_ALL_ITEMS,
OUT_REQ_HEAD, OUT_REQ_BODY, OUT_RESP_HEAD,
OUT_RESP_BODY, OUTPUT_OPTIONS,
OUTPUT_OPTIONS_DEFAULT, PRETTY_MAP,
PRETTY_STDOUT_TTY_ONLY, SessionNameValidator,
readable_file_arg, SSL_VERSION_ARG_MAPPING
from httpie.cli.argparser import HTTPieArgumentParser
from httpie.cli.argtypes import (
KeyValueArgType, SessionNameValidator, readable_file_arg,
)
from httpie.cli.constants import (
OUTPUT_OPTIONS, OUTPUT_OPTIONS_DEFAULT, OUT_REQ_BODY, OUT_REQ_HEAD,
OUT_RESP_BODY, OUT_RESP_HEAD, PRETTY_MAP, PRETTY_STDOUT_TTY_ONLY,
SEPARATOR_GROUP_ALL_ITEMS, SEPARATOR_PROXY, SSL_VERSION_ARG_MAPPING,
)
from httpie.output.formatters.colors import (
AVAILABLE_STYLES, DEFAULT_STYLE, AUTO_STYLE
AUTO_STYLE, AVAILABLE_STYLES, DEFAULT_STYLE,
)
from httpie.plugins import plugin_manager
from httpie.plugins.builtin import BuiltinAuthPlugin
from httpie.sessions import DEFAULT_SESSIONS_DIR
class HTTPieHelpFormatter(RawDescriptionHelpFormatter):
"""A nicer help formatter.
Help for arguments can be indented and contain new lines.
It will be de-dented and arguments in the help
will be separated by a blank line for better readability.
"""
def __init__(self, max_help_position=6, *args, **kwargs):
# A smaller indent for args help.
kwargs['max_help_position'] = max_help_position
super().__init__(*args, **kwargs)
def _split_lines(self, text, width):
text = dedent(text).strip() + '\n\n'
return text.splitlines()
parser = HTTPieArgumentParser(
prog='http',
formatter_class=HTTPieHelpFormatter,
description='%s <http://httpie.org>' % __doc__.strip(),
epilog=dedent("""
For every --OPTION there is also a --no-OPTION that reverts OPTION
@ -60,7 +37,6 @@ parser = HTTPieArgumentParser(
"""),
)
#######################################################################
# Positional arguments.
#######################################################################
@ -74,7 +50,7 @@ positional = parser.add_argument_group(
""")
)
positional.add_argument(
'method',
dest='method',
metavar='METHOD',
nargs=OPTIONAL,
default=None,
@ -90,7 +66,7 @@ positional.add_argument(
"""
)
positional.add_argument(
'url',
dest='url',
metavar='URL',
help="""
The scheme defaults to 'http://' if the URL does not include one.
@ -104,11 +80,11 @@ positional.add_argument(
"""
)
positional.add_argument(
'items',
dest='request_items',
metavar='REQUEST_ITEM',
nargs=ZERO_OR_MORE,
default=None,
type=KeyValueArgType(*SEP_GROUP_ALL_ITEMS),
type=KeyValueArgType(*SEPARATOR_GROUP_ALL_ITEMS),
help=r"""
Optional key-value pairs to be included in the request. The separator used
determines the type:
@ -149,7 +125,6 @@ positional.add_argument(
"""
)
#######################################################################
# Content type.
#######################################################################
@ -182,7 +157,6 @@ content_type.add_argument(
"""
)
#######################################################################
# Content processing.
#######################################################################
@ -205,7 +179,6 @@ content_processing.add_argument(
"""
)
#######################################################################
# Output processing
#######################################################################
@ -251,7 +224,6 @@ output_processing.add_argument(
)
)
#######################################################################
# Output options
#######################################################################
@ -261,49 +233,40 @@ output_options.add_argument(
'--print', '-p',
dest='output_options',
metavar='WHAT',
help="""
help=f"""
String specifying what the output should contain:
'{req_head}' request headers
'{req_body}' request body
'{res_head}' response headers
'{res_body}' response body
'{OUT_REQ_HEAD}' request headers
'{OUT_REQ_BODY}' request body
'{OUT_RESP_HEAD}' response headers
'{OUT_RESP_BODY}' response body
The default behaviour is '{default}' (i.e., the response headers and body
is printed), if standard output is not redirected. If the output is piped
to another program or to a file, then only the response body is printed
by default.
The default behaviour is '{OUTPUT_OPTIONS_DEFAULT}' (i.e., the response
headers and body is printed), if standard output is not redirected.
If the output is piped to another program or to a file, then only the
response body is printed by default.
"""
.format(
req_head=OUT_REQ_HEAD,
req_body=OUT_REQ_BODY,
res_head=OUT_RESP_HEAD,
res_body=OUT_RESP_BODY,
default=OUTPUT_OPTIONS_DEFAULT,
)
)
output_options.add_argument(
'--headers', '-h',
dest='output_options',
action='store_const',
const=OUT_RESP_HEAD,
help="""
Print only the response headers. Shortcut for --print={0}.
help=f"""
Print only the response headers. Shortcut for --print={OUT_RESP_HEAD}.
"""
.format(OUT_RESP_HEAD)
)
output_options.add_argument(
'--body', '-b',
dest='output_options',
action='store_const',
const=OUT_RESP_BODY,
help="""
Print only the response body. Shortcut for --print={0}.
help=f"""
Print only the response body. Shortcut for --print={OUT_RESP_BODY}.
"""
.format(OUT_RESP_BODY)
)
output_options.add_argument(
@ -315,8 +278,7 @@ output_options.add_argument(
any intermediary requests/responses (such as redirects).
It's a shortcut for: --all --print={0}
"""
.format(''.join(OUTPUT_OPTIONS))
""".format(''.join(OUTPUT_OPTIONS))
)
output_options.add_argument(
'--all',
@ -398,13 +360,12 @@ output_options.add_argument(
"""
)
#######################################################################
# Sessions
#######################################################################
sessions = parser.add_argument_group(title='Sessions')\
.add_mutually_exclusive_group(required=False)
sessions = parser.add_argument_group(title='Sessions') \
.add_mutually_exclusive_group(required=False)
session_name_validator = SessionNameValidator(
'Session name contains invalid characters.'
@ -414,17 +375,16 @@ sessions.add_argument(
'--session',
metavar='SESSION_NAME_OR_PATH',
type=session_name_validator,
help="""
help=f"""
Create, or reuse and update a session. Within a session, custom headers,
auth credential, as well as any cookies sent by the server persist between
requests.
Session files are stored in:
{session_dir}/<HOST>/<SESSION_NAME>.json.
{DEFAULT_SESSIONS_DIR}/<HOST>/<SESSION_NAME>.json.
"""
.format(session_dir=DEFAULT_SESSIONS_DIR)
)
sessions.add_argument(
'--session-read-only',
@ -475,8 +435,7 @@ auth.add_argument(
{types}
"""
.format(default=_auth_plugins[0].auth_type, types='\n '.join(
""".format(default=_auth_plugins[0].auth_type, types='\n '.join(
'"{type}": {name}{package}{description}'.format(
type=plugin.auth_type,
name=plugin.name,
@ -513,7 +472,7 @@ network.add_argument(
default=[],
action='append',
metavar='PROTOCOL:PROXY_URL',
type=KeyValueArgType(SEP_PROXY),
type=KeyValueArgType(SEPARATOR_PROXY),
help="""
String mapping protocol to the URL of the proxy
(e.g. http:http://foo.bar:3128). You can specify multiple proxies with
@ -585,7 +544,6 @@ network.add_argument(
"""
)
#######################################################################
# SSL
#######################################################################

53
httpie/cli/dicts.py Normal file
View File

@ -0,0 +1,53 @@
from collections import OrderedDict
from requests.structures import CaseInsensitiveDict
class RequestHeadersDict(CaseInsensitiveDict):
"""
Headers are case-insensitive and multiple values are currently not supported.
"""
class RequestJSONDataDict(OrderedDict):
pass
class MultiValueOrderedDict(OrderedDict):
"""Multi-value dict for URL parameters and form data."""
def __setitem__(self, key, value):
"""
If `key` is assigned more than once, `self[key]` holds a
`list` of all the values.
This allows having multiple fields with the same name in form
data and URL params.
"""
assert not isinstance(value, list)
if key not in self:
super().__setitem__(key, value)
else:
if not isinstance(self[key], list):
super().__setitem__(key, [self[key]])
self[key].append(value)
class RequestQueryParamsDict(MultiValueOrderedDict):
pass
class RequestDataDict(MultiValueOrderedDict):
def items(self):
for key, values in super(MultiValueOrderedDict, self).items():
if not isinstance(values, list):
values = [values]
for value in values:
yield key, value
class RequestFilesDict(RequestDataDict):
pass

2
httpie/cli/exceptions.py Normal file
View File

@ -0,0 +1,2 @@
class ParseError(Exception):
pass

162
httpie/cli/requestitems.py Normal file
View File

@ -0,0 +1,162 @@
import os
from io import BytesIO
from typing import Callable, Dict, IO, List, Optional, Tuple, Union
from httpie.cli.argtypes import KeyValueArg
from httpie.cli.constants import (
SEPARATOR_DATA_EMBED_FILE_CONTENTS, SEPARATOR_DATA_EMBED_RAW_JSON_FILE,
SEPARATOR_DATA_RAW_JSON,
SEPARATOR_DATA_STRING, SEPARATOR_FILE_UPLOAD, SEPARATOR_HEADER,
SEPARATOR_HEADER_EMPTY,
SEPARATOR_QUERY_PARAM,
)
from httpie.cli.dicts import (
RequestDataDict, RequestFilesDict, RequestHeadersDict, RequestJSONDataDict,
RequestQueryParamsDict,
)
from httpie.cli.exceptions import ParseError
from httpie.utils import (get_content_type, load_json_preserve_order)
class RequestItems:
def __init__(self, as_form=False, chunked=False):
self.headers = RequestHeadersDict()
self.data = RequestDataDict() if as_form else RequestJSONDataDict()
self.files = RequestFilesDict()
self.params = RequestQueryParamsDict()
self.chunked = chunked
@classmethod
def from_args(
cls,
request_item_args: List[KeyValueArg],
as_form=False,
chunked=False
) -> 'RequestItems':
instance = RequestItems(as_form=as_form, chunked=chunked)
rules: Dict[str, Tuple[Callable, dict]] = {
SEPARATOR_HEADER: (
process_header_arg,
instance.headers,
),
SEPARATOR_HEADER_EMPTY: (
process_empty_header_arg,
instance.headers,
),
SEPARATOR_QUERY_PARAM: (
process_query_param_arg,
instance.params,
),
SEPARATOR_FILE_UPLOAD: (
process_file_upload_arg,
instance.files,
),
SEPARATOR_DATA_STRING: (
process_data_item_arg,
instance.data,
),
SEPARATOR_DATA_EMBED_FILE_CONTENTS: (
process_data_embed_file_contents_arg,
instance.data,
),
SEPARATOR_DATA_RAW_JSON: (
process_data_raw_json_embed_arg,
instance.data,
),
SEPARATOR_DATA_EMBED_RAW_JSON_FILE: (
process_data_embed_raw_json_file_arg,
instance.data,
),
}
for arg in request_item_args:
processor_func, target_dict = rules[arg.sep]
target_dict[arg.key] = processor_func(arg)
return instance
JSONType = Union[str, bool, int, list, dict]
def process_header_arg(arg: KeyValueArg) -> Optional[str]:
return arg.value or None
def process_empty_header_arg(arg: KeyValueArg) -> str:
if arg.value:
raise ParseError(
'Invalid item "%s" '
'(to specify an empty header use `Header;`)'
% arg.orig
)
return arg.value
def process_query_param_arg(arg: KeyValueArg) -> str:
return arg.value
def process_file_upload_arg(arg: KeyValueArg) -> Tuple[str, IO, str]:
filename = arg.value
try:
with open(os.path.expanduser(filename), 'rb') as f:
contents = f.read()
except IOError as e:
raise ParseError('"%s": %s' % (arg.orig, e))
return (
os.path.basename(filename),
BytesIO(contents),
get_content_type(filename),
)
def parse_file_item_chunked(arg: KeyValueArg):
fn = arg.value
try:
f = open(os.path.expanduser(fn), 'rb')
except IOError as e:
raise ParseError('"%s": %s' % (arg.orig, e))
return os.path.basename(fn), f, get_content_type(fn)
def process_data_item_arg(arg: KeyValueArg) -> str:
return arg.value
def process_data_embed_file_contents_arg(arg: KeyValueArg) -> str:
return load_text_file(arg)
def process_data_embed_raw_json_file_arg(arg: KeyValueArg) -> JSONType:
contents = load_text_file(arg)
value = load_json(arg, contents)
return value
def process_data_raw_json_embed_arg(arg: KeyValueArg) -> JSONType:
value = load_json(arg, arg.value)
return value
def load_text_file(item) -> str:
path = item.value
try:
with open(os.path.expanduser(path), 'rb') as f:
return f.read().decode('utf8')
except IOError as e:
raise ParseError('"%s": %s' % (item.orig, e))
except UnicodeDecodeError:
raise ParseError(
'"%s": cannot embed the content of "%s",'
' not a UTF8 or ASCII-encoded text file'
% (item.orig, item.value)
)
def load_json(arg: KeyValueArg, contents: str) -> JSONType:
try:
return load_json_preserve_order(contents)
except ValueError as e:
raise ParseError('"%s": %s' % (arg.orig, e))

View File

@ -9,7 +9,7 @@ from requests.structures import CaseInsensitiveDict
from httpie import sessions
from httpie import __version__
from httpie.input import SSL_VERSION_ARG_MAPPING
from httpie.cli.constants import SSL_VERSION_ARG_MAPPING
from httpie.plugins import plugin_manager
from httpie.utils import repr_dict_nice
@ -30,7 +30,7 @@ except (ImportError, AttributeError):
FORM_CONTENT_TYPE = 'application/x-www-form-urlencoded; charset=utf-8'
JSON_CONTENT_TYPE = 'application/json'
JSON_ACCEPT = '{0}, */*'.format(JSON_CONTENT_TYPE)
JSON_ACCEPT = f'{JSON_CONTENT_TYPE}, */*'
DEFAULT_UA = 'HTTPie/%s' % __version__

View File

@ -101,4 +101,4 @@ class Environment:
)
def __repr__(self):
return '<{0} {1}>'.format(type(self).__name__, str(self))
return f'<{type(self).__name__} {self}>'

View File

@ -201,7 +201,7 @@ def main(
assert level in ['error', 'warning']
env.stderr.write('\nhttp: %s: %s\n' % (level, msg))
from httpie.cli import parser
from httpie.cli.definition import parser
if env.config.default_options:
args = env.config.default_options + args

View File

@ -1,770 +0,0 @@
"""Parsing and processing of CLI input (args, auth credentials, files, stdin).
"""
import os
import ssl
import sys
import re
import errno
import mimetypes
import getpass
from io import BytesIO
from collections import namedtuple, OrderedDict
# noinspection PyCompatibility
import argparse
# TODO: Use MultiDict for headers once added to `requests`.
# https://github.com/jakubroztocil/httpie/issues/130
from urllib.parse import urlsplit
from httpie.context import Environment
from httpie.plugins import plugin_manager
from requests.structures import CaseInsensitiveDict
from httpie.sessions import VALID_SESSION_NAME_PATTERN
from httpie.utils import load_json_preserve_order, ExplicitNullAuth
# ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
# <https://tools.ietf.org/html/rfc3986#section-3.1>
URL_SCHEME_RE = re.compile(r'^[a-z][a-z0-9.+-]*://', re.IGNORECASE)
HTTP_POST = 'POST'
HTTP_GET = 'GET'
# Various separators used in args
SEP_HEADERS = ':'
SEP_HEADERS_EMPTY = ';'
SEP_CREDENTIALS = ':'
SEP_PROXY = ':'
SEP_DATA = '='
SEP_DATA_RAW_JSON = ':='
SEP_FILES = '@'
SEP_DATA_EMBED_FILE = '=@'
SEP_DATA_EMBED_RAW_JSON_FILE = ':=@'
SEP_QUERY = '=='
# Separators that become request data
SEP_GROUP_DATA_ITEMS = frozenset([
SEP_DATA,
SEP_DATA_RAW_JSON,
SEP_FILES,
SEP_DATA_EMBED_FILE,
SEP_DATA_EMBED_RAW_JSON_FILE
])
# Separators for items whose value is a filename to be embedded
SEP_GROUP_DATA_EMBED_ITEMS = frozenset([
SEP_DATA_EMBED_FILE,
SEP_DATA_EMBED_RAW_JSON_FILE,
])
# Separators for raw JSON items
SEP_GROUP_RAW_JSON_ITEMS = frozenset([
SEP_DATA_RAW_JSON,
SEP_DATA_EMBED_RAW_JSON_FILE,
])
# Separators allowed in ITEM arguments
SEP_GROUP_ALL_ITEMS = frozenset([
SEP_HEADERS,
SEP_HEADERS_EMPTY,
SEP_QUERY,
SEP_DATA,
SEP_DATA_RAW_JSON,
SEP_FILES,
SEP_DATA_EMBED_FILE,
SEP_DATA_EMBED_RAW_JSON_FILE,
])
# Output options
OUT_REQ_HEAD = 'H'
OUT_REQ_BODY = 'B'
OUT_RESP_HEAD = 'h'
OUT_RESP_BODY = 'b'
OUTPUT_OPTIONS = frozenset([
OUT_REQ_HEAD,
OUT_REQ_BODY,
OUT_RESP_HEAD,
OUT_RESP_BODY
])
# Pretty
PRETTY_MAP = {
'all': ['format', 'colors'],
'colors': ['colors'],
'format': ['format'],
'none': []
}
PRETTY_STDOUT_TTY_ONLY = object()
# Defaults
OUTPUT_OPTIONS_DEFAULT = OUT_RESP_HEAD + OUT_RESP_BODY
OUTPUT_OPTIONS_DEFAULT_STDOUT_REDIRECTED = OUT_RESP_BODY
SSL_VERSION_ARG_MAPPING = {
'ssl2.3': 'PROTOCOL_SSLv23',
'ssl3': 'PROTOCOL_SSLv3',
'tls1': 'PROTOCOL_TLSv1',
'tls1.1': 'PROTOCOL_TLSv1_1',
'tls1.2': 'PROTOCOL_TLSv1_2',
'tls1.3': 'PROTOCOL_TLSv1_3',
}
SSL_VERSION_ARG_MAPPING = {
cli_arg: getattr(ssl, ssl_constant)
for cli_arg, ssl_constant in SSL_VERSION_ARG_MAPPING.items()
if hasattr(ssl, ssl_constant)
}
class HTTPieArgumentParser(argparse.ArgumentParser):
"""Adds additional logic to `argparse.ArgumentParser`.
Handles all input (CLI args, file args, stdin), applies defaults,
and performs extra validation.
"""
def __init__(self, *args, **kwargs):
kwargs['add_help'] = False
super().__init__(*args, **kwargs)
self.env = None
self.args = None
self.has_stdin_data = False
# noinspection PyMethodOverriding
def parse_args(
self,
env: Environment,
program_name='http',
args=None,
namespace=None
) -> argparse.Namespace:
self.env = env
self.args, no_options = super().parse_known_args(args, namespace)
if self.args.debug:
self.args.traceback = True
self.has_stdin_data = (
self.env.stdin
and not self.args.ignore_stdin
and not self.env.stdin_isatty
)
# Arguments processing and environment setup.
self._apply_no_options(no_options)
self._validate_download_options()
self._setup_standard_streams()
self._process_output_options()
self._process_pretty_options()
self._guess_method()
self._parse_items()
if self.has_stdin_data:
self._body_from_file(self.env.stdin)
if not URL_SCHEME_RE.match(self.args.url):
if os.path.basename(program_name) == 'https':
scheme = 'https://'
else:
scheme = self.args.default_scheme + "://"
# See if we're using curl style shorthand for localhost (:3000/foo)
shorthand = re.match(r'^:(?!:)(\d*)(/?.*)$', self.args.url)
if shorthand:
port = shorthand.group(1)
rest = shorthand.group(2)
self.args.url = scheme + 'localhost'
if port:
self.args.url += ':' + port
self.args.url += rest
else:
self.args.url = scheme + self.args.url
self._process_auth()
return self.args
# noinspection PyShadowingBuiltins
def _print_message(self, message, file=None):
# Sneak in our stderr/stdout.
file = {
sys.stdout: self.env.stdout,
sys.stderr: self.env.stderr,
None: self.env.stderr
}.get(file, file)
if not hasattr(file, 'buffer') and isinstance(message, str):
message = message.encode(self.env.stdout_encoding)
super()._print_message(message, file)
def _setup_standard_streams(self):
"""
Modify `env.stdout` and `env.stdout_isatty` based on args, if needed.
"""
self.args.output_file_specified = bool(self.args.output_file)
if self.args.download:
# FIXME: Come up with a cleaner solution.
if not self.args.output_file and not self.env.stdout_isatty:
# Use stdout as the download output file.
self.args.output_file = self.env.stdout
# With `--download`, we write everything that would normally go to
# `stdout` to `stderr` instead. Let's replace the stream so that
# we don't have to use many `if`s throughout the codebase.
# The response body will be treated separately.
self.env.stdout = self.env.stderr
self.env.stdout_isatty = self.env.stderr_isatty
elif self.args.output_file:
# When not `--download`ing, then `--output` simply replaces
# `stdout`. The file is opened for appending, which isn't what
# we want in this case.
self.args.output_file.seek(0)
try:
self.args.output_file.truncate()
except IOError as e:
if e.errno == errno.EINVAL:
# E.g. /dev/null on Linux.
pass
else:
raise
self.env.stdout = self.args.output_file
self.env.stdout_isatty = False
def _process_auth(self):
# TODO: refactor
self.args.auth_plugin = None
default_auth_plugin = plugin_manager.get_auth_plugins()[0]
auth_type_set = self.args.auth_type is not None
url = urlsplit(self.args.url)
if self.args.auth is None and not auth_type_set:
if url.username is not None:
# Handle http://username:password@hostname/
username = url.username
password = url.password or ''
self.args.auth = AuthCredentials(
key=username,
value=password,
sep=SEP_CREDENTIALS,
orig=SEP_CREDENTIALS.join([username, password])
)
if self.args.auth is not None or auth_type_set:
if not self.args.auth_type:
self.args.auth_type = default_auth_plugin.auth_type
plugin = plugin_manager.get_auth_plugin(self.args.auth_type)()
if plugin.auth_require and self.args.auth is None:
self.error('--auth required')
plugin.raw_auth = self.args.auth
self.args.auth_plugin = plugin
already_parsed = isinstance(self.args.auth, AuthCredentials)
if self.args.auth is None or not plugin.auth_parse:
self.args.auth = plugin.get_auth()
else:
if already_parsed:
# from the URL
credentials = self.args.auth
else:
credentials = parse_auth(self.args.auth)
if (not credentials.has_password()
and plugin.prompt_password):
if self.args.ignore_stdin:
# Non-tty stdin read by now
self.error(
'Unable to prompt for passwords because'
' --ignore-stdin is set.'
)
credentials.prompt_password(url.netloc)
self.args.auth = plugin.get_auth(
username=credentials.key,
password=credentials.value,
)
if not self.args.auth and self.args.ignore_netrc:
# Set a no-op auth to force requests to ignore .netrc
# <https://github.com/psf/requests/issues/2773#issuecomment-174312831>
self.args.auth = ExplicitNullAuth()
def _apply_no_options(self, no_options):
"""For every `--no-OPTION` in `no_options`, set `args.OPTION` to
its default value. This allows for un-setting of options, e.g.,
specified in config.
"""
invalid = []
for option in no_options:
if not option.startswith('--no-'):
invalid.append(option)
continue
# --no-option => --option
inverted = '--' + option[5:]
for action in self._actions:
if inverted in action.option_strings:
setattr(self.args, action.dest, action.default)
break
else:
invalid.append(option)
if invalid:
msg = 'unrecognized arguments: %s'
self.error(msg % ' '.join(invalid))
def _body_from_file(self, fd):
"""There can only be one source of request data.
Bytes are always read.
"""
if self.args.data:
self.error('Request body (from stdin or a file) and request '
'data (key=value) cannot be mixed. Pass '
'--ignore-stdin to let key/value take priority.')
self.args.data = getattr(fd, 'buffer', fd).read()
def _guess_method(self):
"""Set `args.method` if not specified to either POST or GET
based on whether the request has data or not.
"""
if self.args.method is None:
# Invoked as `http URL'.
assert not self.args.items
if self.has_stdin_data:
self.args.method = HTTP_POST
else:
self.args.method = HTTP_GET
# FIXME: False positive, e.g., "localhost" matches but is a valid URL.
elif not re.match('^[a-zA-Z]+$', self.args.method):
# Invoked as `http URL item+'. The URL is now in `args.method`
# and the first ITEM is now incorrectly in `args.url`.
try:
# Parse the URL as an ITEM and store it as the first ITEM arg.
self.args.items.insert(0, KeyValueArgType(
*SEP_GROUP_ALL_ITEMS).__call__(self.args.url))
except argparse.ArgumentTypeError as e:
if self.args.traceback:
raise
self.error(e.args[0])
else:
# Set the URL correctly
self.args.url = self.args.method
# Infer the method
has_data = (
self.has_stdin_data
or any(
item.sep in SEP_GROUP_DATA_ITEMS
for item in self.args.items
)
)
self.args.method = HTTP_POST if has_data else HTTP_GET
def _parse_items(self):
"""Parse `args.items` into `args.headers`, `args.data`, `args.params`,
and `args.files`.
"""
try:
items = parse_items(
items=self.args.items,
data_class=ParamsDict if self.args.form else OrderedDict
)
except ParseError as e:
if self.args.traceback:
raise
self.error(e.args[0])
else:
self.args.headers = items.headers
self.args.data = items.data
self.args.files = items.files
self.args.params = items.params
if self.args.files and not self.args.form:
# `http url @/path/to/file`
file_fields = list(self.args.files.keys())
if file_fields != ['']:
self.error(
'Invalid file fields (perhaps you meant --form?): %s'
% ','.join(file_fields))
fn, fd, ct = self.args.files['']
self.args.files = {}
self._body_from_file(fd)
if 'Content-Type' not in self.args.headers:
content_type = get_content_type(fn)
if content_type:
self.args.headers['Content-Type'] = content_type
def _process_output_options(self):
"""Apply defaults to output options, or validate the provided ones.
The default output options are stdout-type-sensitive.
"""
def check_options(value, option):
unknown = set(value) - OUTPUT_OPTIONS
if unknown:
self.error('Unknown output options: {0}={1}'.format(
option,
','.join(unknown)
))
if self.args.verbose:
self.args.all = True
if self.args.output_options is None:
if self.args.verbose:
self.args.output_options = ''.join(OUTPUT_OPTIONS)
else:
self.args.output_options = (
OUTPUT_OPTIONS_DEFAULT
if self.env.stdout_isatty
else OUTPUT_OPTIONS_DEFAULT_STDOUT_REDIRECTED
)
if self.args.output_options_history is None:
self.args.output_options_history = self.args.output_options
check_options(self.args.output_options, '--print')
check_options(self.args.output_options_history, '--history-print')
if self.args.download and OUT_RESP_BODY in self.args.output_options:
# Response body is always downloaded with --download and it goes
# through a different routine, so we remove it.
self.args.output_options = str(
set(self.args.output_options) - set(OUT_RESP_BODY))
def _process_pretty_options(self):
if self.args.prettify == PRETTY_STDOUT_TTY_ONLY:
self.args.prettify = PRETTY_MAP[
'all' if self.env.stdout_isatty else 'none']
elif (self.args.prettify and self.env.is_windows
and self.args.output_file):
self.error('Only terminal output can be colorized on Windows.')
else:
# noinspection PyTypeChecker
self.args.prettify = PRETTY_MAP[self.args.prettify]
def _validate_download_options(self):
if not self.args.download:
if self.args.download_resume:
self.error('--continue only works with --download')
if self.args.download_resume and not (
self.args.download and self.args.output_file):
self.error('--continue requires --output to be specified')
class ParseError(Exception):
pass
class KeyValue:
"""Base key-value pair parsed from CLI."""
def __init__(self, key, value, sep, orig):
self.key = key
self.value = value
self.sep = sep
self.orig = orig
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
return repr(self.__dict__)
class SessionNameValidator:
def __init__(self, error_message):
self.error_message = error_message
def __call__(self, value):
# Session name can be a path or just a name.
if (os.path.sep not in value
and not VALID_SESSION_NAME_PATTERN.search(value)):
raise argparse.ArgumentError(None, self.error_message)
return value
class KeyValueArgType:
"""A key-value pair argument type used with `argparse`.
Parses a key-value arg and constructs a `KeyValue` instance.
Used for headers, form data, and other key-value pair types.
"""
key_value_class = KeyValue
def __init__(self, *separators):
self.separators = separators
self.special_characters = set('\\')
for separator in separators:
self.special_characters.update(separator)
def __call__(self, string):
"""Parse `string` and return `self.key_value_class()` instance.
The best of `self.separators` is determined (first found, longest).
Back slash escaped characters aren't considered as separators
(or parts thereof). Literal back slash characters have to be escaped
as well (r'\\').
"""
class Escaped(str):
"""Represents an escaped character."""
def tokenize(string):
r"""Tokenize `string`. There are only two token types - strings
and escaped characters:
tokenize(r'foo\=bar\\baz')
=> ['foo', Escaped('='), 'bar', Escaped('\\'), 'baz']
"""
tokens = ['']
characters = iter(string)
for char in characters:
if char == '\\':
char = next(characters, '')
if char not in self.special_characters:
tokens[-1] += '\\' + char
else:
tokens.extend([Escaped(char), ''])
else:
tokens[-1] += char
return tokens
tokens = tokenize(string)
# Sorting by length ensures that the longest one will be
# chosen as it will overwrite any shorter ones starting
# at the same position in the `found` dictionary.
separators = sorted(self.separators, key=len)
for i, token in enumerate(tokens):
if isinstance(token, Escaped):
continue
found = {}
for sep in separators:
pos = token.find(sep)
if pos != -1:
found[pos] = sep
if found:
# Starting first, longest separator found.
sep = found[min(found.keys())]
key, value = token.split(sep, 1)
# Any preceding tokens are part of the key.
key = ''.join(tokens[:i]) + key
# Any following tokens are part of the value.
value += ''.join(tokens[i + 1:])
break
else:
raise argparse.ArgumentTypeError(
u'"%s" is not a valid value' % string)
return self.key_value_class(
key=key, value=value, sep=sep, orig=string)
class AuthCredentials(KeyValue):
"""Represents parsed credentials."""
def _getpass(self, prompt):
# To allow mocking.
return getpass.getpass(str(prompt))
def has_password(self):
return self.value is not None
def prompt_password(self, host):
try:
self.value = self._getpass(
'http: password for %s@%s: ' % (self.key, host))
except (EOFError, KeyboardInterrupt):
sys.stderr.write('\n')
sys.exit(0)
class AuthCredentialsArgType(KeyValueArgType):
"""A key-value arg type that parses credentials."""
key_value_class = AuthCredentials
def __call__(self, string):
"""Parse credentials from `string`.
("username" or "username:password").
"""
try:
return super().__call__(string)
except argparse.ArgumentTypeError:
# No password provided, will prompt for it later.
return self.key_value_class(
key=string,
value=None,
sep=SEP_CREDENTIALS,
orig=string
)
parse_auth = AuthCredentialsArgType(SEP_CREDENTIALS)
class RequestItemsDict(OrderedDict):
"""Multi-value dict for URL parameters and form data."""
# noinspection PyMethodOverriding
def __setitem__(self, key, value):
""" If `key` is assigned more than once, `self[key]` holds a
`list` of all the values.
This allows having multiple fields with the same name in form
data and URL params.
"""
assert not isinstance(value, list)
if key not in self:
super().__setitem__(key, value)
else:
if not isinstance(self[key], list):
super().__setitem__(key, [self[key]])
self[key].append(value)
class ParamsDict(RequestItemsDict):
pass
class DataDict(RequestItemsDict):
def items(self):
for key, values in super().items():
if not isinstance(values, list):
values = [values]
for value in values:
yield key, value
RequestItems = namedtuple('RequestItems',
['headers', 'data', 'files', 'params'])
def get_content_type(filename):
"""
Return the content type for ``filename`` in format appropriate
for Content-Type headers, or ``None`` if the file type is unknown
to ``mimetypes``.
"""
mime, encoding = mimetypes.guess_type(filename, strict=False)
if mime:
content_type = mime
if encoding:
content_type = '%s; charset=%s' % (mime, encoding)
return content_type
def parse_items(items,
headers_class=CaseInsensitiveDict,
data_class=OrderedDict,
files_class=DataDict,
params_class=ParamsDict):
"""Parse `KeyValue` `items` into `data`, `headers`, `files`,
and `params`.
"""
headers = []
data = []
files = []
params = []
for item in items:
value = item.value
if item.sep == SEP_HEADERS:
if value == '':
# No value => unset the header
value = None
target = headers
elif item.sep == SEP_HEADERS_EMPTY:
if item.value:
raise ParseError(
'Invalid item "%s" '
'(to specify an empty header use `Header;`)'
% item.orig
)
target = headers
elif item.sep == SEP_QUERY:
target = params
elif item.sep == SEP_FILES:
try:
with open(os.path.expanduser(value), 'rb') as f:
value = (os.path.basename(value),
BytesIO(f.read()),
get_content_type(value))
except IOError as e:
raise ParseError('"%s": %s' % (item.orig, e))
target = files
elif item.sep in SEP_GROUP_DATA_ITEMS:
if item.sep in SEP_GROUP_DATA_EMBED_ITEMS:
try:
with open(os.path.expanduser(value), 'rb') as f:
value = f.read().decode('utf8')
except IOError as e:
raise ParseError('"%s": %s' % (item.orig, e))
except UnicodeDecodeError:
raise ParseError(
'"%s": cannot embed the content of "%s",'
' not a UTF8 or ASCII-encoded text file'
% (item.orig, item.value)
)
if item.sep in SEP_GROUP_RAW_JSON_ITEMS:
try:
value = load_json_preserve_order(value)
except ValueError as e:
raise ParseError('"%s": %s' % (item.orig, e))
target = data
else:
raise TypeError(item)
target.append((item.key, value))
return RequestItems(headers_class(headers),
data_class(data),
files_class(files),
params_class(params))
def readable_file_arg(filename):
try:
with open(filename, 'rb'):
return filename
except IOError as ex:
raise argparse.ArgumentTypeError('%s: %s' % (filename, ex.args[1]))

View File

@ -61,11 +61,7 @@ class HTTPResponse(HTTPMessage):
20: '2',
}[original.version]
status_line = 'HTTP/{version} {status} {reason}'.format(
version=version,
status=original.status,
reason=original.reason
)
status_line = f'HTTP/{version} {original.status} {original.reason}'
headers = [status_line]
try:
# `original.msg` is a `http.client.HTTPMessage` on Python 3

View File

@ -3,8 +3,8 @@ from functools import partial
from httpie.context import Environment
from httpie.models import HTTPRequest, HTTPResponse
from httpie.input import (OUT_REQ_BODY, OUT_REQ_HEAD,
OUT_RESP_HEAD, OUT_RESP_BODY)
from httpie.cli.constants import (
OUT_REQ_BODY, OUT_REQ_HEAD, OUT_RESP_HEAD, OUT_RESP_BODY)
from httpie.output.processing import Formatting, Conversion

View File

@ -1,6 +1,7 @@
"""Persistent, JSON-serialized sessions.
"""
import argparse
import re
import os
from pathlib import Path
@ -28,7 +29,7 @@ def get_response(
requests_session: requests.Session,
session_name: str,
config_dir: Path,
args,
args: argparse.Namespace,
read_only=False,
) -> requests.Response:
"""Like `client.get_responses`, but applies permanent
@ -167,7 +168,7 @@ class Session(BaseConfigDict):
}
else:
if plugin.auth_parse:
from httpie.input import parse_auth
from httpie.cli.argtypes import parse_auth
parsed = parse_auth(plugin.raw_auth)
credentials = {
'username': parsed.key,

View File

@ -1,5 +1,6 @@
from __future__ import division
import json
import mimetypes
from collections import OrderedDict
import requests.auth
@ -78,3 +79,18 @@ class ExplicitNullAuth(requests.auth.AuthBase):
def __call__(self, r):
return r
def get_content_type(filename):
"""
Return the content type for ``filename`` in format appropriate
for Content-Type headers, or ``None`` if the file type is unknown
to ``mimetypes``.
"""
mime, encoding = mimetypes.guess_type(filename, strict=False)
if mime:
content_type = mime
if encoding:
content_type = '%s; charset=%s' % (mime, encoding)
return content_type

View File

@ -5,8 +5,8 @@ import pytest
from httpie.plugins.builtin import HTTPBasicAuth
from httpie.utils import ExplicitNullAuth
from utils import http, add_auth, HTTP_OK, MockEnvironment
import httpie.input
import httpie.cli
import httpie.cli.constants
import httpie.cli.definition
def test_basic_auth(httpbin_both):
@ -24,7 +24,7 @@ def test_digest_auth(httpbin_both, argument_name):
assert r.json == {'authenticated': True, 'user': 'user'}
@mock.patch('httpie.input.AuthCredentials._getpass',
@mock.patch('httpie.cli.argtypes.AuthCredentials._getpass',
new=lambda self, prompt: 'password')
def test_password_prompt(httpbin):
r = http('--auth', 'user',
@ -60,7 +60,7 @@ def test_only_username_in_url(url):
https://github.com/jakubroztocil/httpie/issues/242
"""
args = httpie.cli.parser.parse_args(args=[url], env=MockEnvironment())
args = httpie.cli.definition.parser.parse_args(args=[url], env=MockEnvironment())
assert args.auth
assert args.auth.username == 'username'
assert args.auth.password == ''
@ -94,7 +94,7 @@ def test_ignore_netrc(httpbin_both):
def test_ignore_netrc_null_auth():
args = httpie.cli.parser.parse_args(
args = httpie.cli.definition.parser.parse_args(
args=['--ignore-netrc', 'example.org'],
env=MockEnvironment(),
)
@ -102,7 +102,7 @@ def test_ignore_netrc_null_auth():
def test_ignore_netrc_together_with_auth():
args = httpie.cli.parser.parse_args(
args = httpie.cli.definition.parser.parse_args(
args=['--ignore-netrc', '--auth=username:password', 'example.org'],
env=MockEnvironment(),
)

View File

@ -1,6 +1,6 @@
from mock import mock
from httpie.input import SEP_CREDENTIALS
from httpie.cli.constants import SEPARATOR_CREDENTIALS
from httpie.plugins import AuthPlugin, plugin_manager
from utils import http, HTTP_OK
@ -83,7 +83,7 @@ def test_auth_plugin_require_auth_false_and_auth_provided(httpbin):
auth_require = False
def get_auth(self, username=None, password=None):
assert self.raw_auth == USERNAME + SEP_CREDENTIALS + PASSWORD
assert self.raw_auth == USERNAME + SEPARATOR_CREDENTIALS + PASSWORD
assert username == USERNAME
assert password == PASSWORD
return basic_auth()
@ -95,7 +95,7 @@ def test_auth_plugin_require_auth_false_and_auth_provided(httpbin):
'--auth-type',
Plugin.auth_type,
'--auth',
USERNAME + SEP_CREDENTIALS + PASSWORD,
USERNAME + SEPARATOR_CREDENTIALS + PASSWORD,
)
assert HTTP_OK in r
assert r.json == AUTH_OK
@ -103,7 +103,7 @@ def test_auth_plugin_require_auth_false_and_auth_provided(httpbin):
plugin_manager.unregister(Plugin)
@mock.patch('httpie.input.AuthCredentials._getpass',
@mock.patch('httpie.cli.argtypes.AuthCredentials._getpass',
new=lambda self, prompt: 'UNEXPECTED_PROMPT_RESPONSE')
def test_auth_plugin_prompt_password_false(httpbin):

View File

@ -1,42 +1,42 @@
"""CLI argument parsing related tests."""
import json
# noinspection PyCompatibility
import argparse
import json
import pytest
from requests.exceptions import InvalidSchema
from httpie import input
from httpie.input import KeyValue, KeyValueArgType, DataDict
from httpie import ExitStatus
from httpie.cli import parser
from utils import MockEnvironment, http, HTTP_OK
import httpie.cli.argparser
from fixtures import (
FILE_PATH_ARG, JSON_FILE_PATH_ARG,
JSON_FILE_CONTENT, FILE_CONTENT, FILE_PATH
FILE_CONTENT, FILE_PATH, FILE_PATH_ARG, JSON_FILE_CONTENT,
JSON_FILE_PATH_ARG,
)
from httpie import ExitStatus
from httpie.cli import constants
from httpie.cli.definition import parser
from httpie.cli.argtypes import KeyValueArg, KeyValueArgType
from httpie.cli.requestitems import RequestItems
from utils import HTTP_OK, MockEnvironment, http
class TestItemParsing:
key_value = KeyValueArgType(*input.SEP_GROUP_ALL_ITEMS)
key_value_arg = KeyValueArgType(*constants.SEPARATOR_GROUP_ALL_ITEMS)
def test_invalid_items(self):
items = ['no-separator']
for item in items:
pytest.raises(argparse.ArgumentTypeError, self.key_value, item)
pytest.raises(argparse.ArgumentTypeError, self.key_value_arg, item)
def test_escape_separator(self):
items = input.parse_items([
items = RequestItems.from_args([
# headers
self.key_value(r'foo\:bar:baz'),
self.key_value(r'jack\@jill:hill'),
self.key_value_arg(r'foo\:bar:baz'),
self.key_value_arg(r'jack\@jill:hill'),
# data
self.key_value(r'baz\=bar=foo'),
self.key_value_arg(r'baz\=bar=foo'),
# files
self.key_value(r'bar\@baz@%s' % FILE_PATH_ARG),
self.key_value_arg(r'bar\@baz@%s' % FILE_PATH_ARG),
])
# `requests.structures.CaseInsensitiveDict` => `dict`
headers = dict(items.headers._store.values())
@ -45,7 +45,9 @@ class TestItemParsing:
'foo:bar': 'baz',
'jack@jill': 'hill',
}
assert items.data == {'baz=bar': 'foo'}
assert items.data == {
'baz=bar': 'foo'
}
assert 'bar@baz' in items.files
@pytest.mark.parametrize(('string', 'key', 'sep', 'value'), [
@ -54,31 +56,34 @@ class TestItemParsing:
('path\\==c:\\windows', 'path=', '=', 'c:\\windows'),
])
def test_backslash_before_non_special_character_does_not_escape(
self, string, key, sep, value):
expected = KeyValue(orig=string, key=key, sep=sep, value=value)
actual = self.key_value(string)
self, string, key, sep, value
):
expected = KeyValueArg(orig=string, key=key, sep=sep, value=value)
actual = self.key_value_arg(string)
assert actual == expected
def test_escape_longsep(self):
items = input.parse_items([
self.key_value(r'bob\:==foo'),
items = RequestItems.from_args([
self.key_value_arg(r'bob\:==foo'),
])
assert items.params == {'bob:': 'foo'}
assert items.params == {
'bob:': 'foo'
}
def test_valid_items(self):
items = input.parse_items([
self.key_value('string=value'),
self.key_value('Header:value'),
self.key_value('Unset-Header:'),
self.key_value('Empty-Header;'),
self.key_value('list:=["a", 1, {}, false]'),
self.key_value('obj:={"a": "b"}'),
self.key_value('ed='),
self.key_value('bool:=true'),
self.key_value('file@' + FILE_PATH_ARG),
self.key_value('query==value'),
self.key_value('string-embed=@' + FILE_PATH_ARG),
self.key_value('raw-json-embed:=@' + JSON_FILE_PATH_ARG),
items = RequestItems.from_args([
self.key_value_arg('string=value'),
self.key_value_arg('Header:value'),
self.key_value_arg('Unset-Header:'),
self.key_value_arg('Empty-Header;'),
self.key_value_arg('list:=["a", 1, {}, false]'),
self.key_value_arg('obj:={"a": "b"}'),
self.key_value_arg('ed='),
self.key_value_arg('bool:=true'),
self.key_value_arg('file@' + FILE_PATH_ARG),
self.key_value_arg('query==value'),
self.key_value_arg('string-embed=@' + FILE_PATH_ARG),
self.key_value_arg('raw-json-embed:=@' + JSON_FILE_PATH_ARG),
])
# Parsed headers
@ -99,12 +104,16 @@ class TestItemParsing:
"string": "value",
"bool": True,
"list": ["a", 1, {}, False],
"obj": {"a": "b"},
"obj": {
"a": "b"
},
"string-embed": FILE_CONTENT,
}
# Parsed query string parameters
assert items.params == {'query': 'value'}
assert items.params == {
'query': 'value'
}
# Parsed file fields
assert 'file' in items.files
@ -112,17 +121,19 @@ class TestItemParsing:
decode('utf8') == FILE_CONTENT)
def test_multiple_file_fields_with_same_field_name(self):
items = input.parse_items([
self.key_value('file_field@' + FILE_PATH_ARG),
self.key_value('file_field@' + FILE_PATH_ARG),
items = RequestItems.from_args([
self.key_value_arg('file_field@' + FILE_PATH_ARG),
self.key_value_arg('file_field@' + FILE_PATH_ARG),
])
assert len(items.files['file_field']) == 2
def test_multiple_text_fields_with_same_field_name(self):
items = input.parse_items(
[self.key_value('text_field=a'),
self.key_value('text_field=b')],
data_class=DataDict
items = RequestItems.from_args(
request_item_args=[
self.key_value_arg('text_field=a'),
self.key_value_arg('text_field=b')
],
as_form=True,
)
assert items.data['text_field'] == ['a', 'b']
assert list(items.data.items()) == [
@ -206,92 +217,80 @@ class TestLocalhostShorthand:
class TestArgumentParser:
def setup_method(self, method):
self.parser = input.HTTPieArgumentParser()
self.parser = httpie.cli.argparser.HTTPieArgumentParser()
def test_guess_when_method_set_and_valid(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'GET'
self.parser.args.url = 'http://example.com/'
self.parser.args.items = []
self.parser.args.request_items = []
self.parser.args.ignore_stdin = False
self.parser.env = MockEnvironment()
self.parser._guess_method()
assert self.parser.args.method == 'GET'
assert self.parser.args.url == 'http://example.com/'
assert self.parser.args.items == []
assert self.parser.args.request_items == []
def test_guess_when_method_not_set(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = None
self.parser.args.url = 'http://example.com/'
self.parser.args.items = []
self.parser.args.request_items = []
self.parser.args.ignore_stdin = False
self.parser.env = MockEnvironment()
self.parser._guess_method()
assert self.parser.args.method == 'GET'
assert self.parser.args.url == 'http://example.com/'
assert self.parser.args.items == []
assert self.parser.args.request_items == []
def test_guess_when_method_set_but_invalid_and_data_field(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'data=field'
self.parser.args.items = []
self.parser.args.request_items = []
self.parser.args.ignore_stdin = False
self.parser.env = MockEnvironment()
self.parser._guess_method()
assert self.parser.args.method == 'POST'
assert self.parser.args.url == 'http://example.com/'
assert self.parser.args.items == [
KeyValue(key='data',
value='field',
sep='=',
orig='data=field')
assert self.parser.args.request_items == [
KeyValueArg(key='data',
value='field',
sep='=',
orig='data=field')
]
def test_guess_when_method_set_but_invalid_and_header_field(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'test:header'
self.parser.args.items = []
self.parser.args.request_items = []
self.parser.args.ignore_stdin = False
self.parser.env = MockEnvironment()
self.parser._guess_method()
assert self.parser.args.method == 'GET'
assert self.parser.args.url == 'http://example.com/'
assert self.parser.args.items, [
KeyValue(key='test',
value='header',
sep=':',
orig='test:header')
assert self.parser.args.request_items, [
KeyValueArg(key='test',
value='header',
sep=':',
orig='test:header')
]
def test_guess_when_method_set_but_invalid_and_item_exists(self):
self.parser.args = argparse.Namespace()
self.parser.args.method = 'http://example.com/'
self.parser.args.url = 'new_item=a'
self.parser.args.items = [
KeyValue(
self.parser.args.request_items = [
KeyValueArg(
key='old_item', value='b', sep='=', orig='old_item=b')
]
self.parser.args.ignore_stdin = False
self.parser.env = MockEnvironment()
self.parser._guess_method()
assert self.parser.args.items, [
KeyValue(key='new_item', value='a', sep='=', orig='new_item=a'),
KeyValue(
assert self.parser.args.request_items, [
KeyValueArg(key='new_item', value='a', sep='=', orig='new_item=a'),
KeyValueArg(
key='old_item', value='b', sep='=', orig='old_item=b'),
]

View File

@ -5,7 +5,7 @@ from utils import MockEnvironment, http, HTTP_OK
def test_keyboard_interrupt_during_arg_parsing_exit_status(httpbin):
with mock.patch('httpie.cli.parser.parse_args',
with mock.patch('httpie.cli.definition.parser.parse_args',
side_effect=KeyboardInterrupt()):
r = http('GET', httpbin.url + '/get', error_exit_ok=True)
assert r.exit_status == ExitStatus.ERROR_CTRL_C

View File

@ -1,7 +1,7 @@
"""High-level tests."""
import pytest
from httpie.input import ParseError
from httpie.cli.exceptions import ParseError
from utils import MockEnvironment, http, HTTP_OK
from fixtures import FILE_PATH, FILE_CONTENT

View File

@ -140,10 +140,6 @@ class TestSession(SessionTestBase):
assert HTTP_OK in r2
assert r2.json['headers']['Foo'] == 'Bar'
@pytest.mark.skipif(
sys.version_info >= (3,),
reason="This test fails intermittently on Python 3 - "
"see https://github.com/jakubroztocil/httpie/issues/282")
def test_session_unicode(self, httpbin):
self.start_session(httpbin)

View File

@ -5,7 +5,7 @@ import pytest_httpbin.certs
import requests.exceptions
from httpie import ExitStatus
from httpie.input import SSL_VERSION_ARG_MAPPING
from httpie.cli.constants import SSL_VERSION_ARG_MAPPING
from utils import HTTP_OK, TESTS_ROOT, http

View File

@ -2,7 +2,7 @@ import os
import pytest
from httpie.input import ParseError
from httpie.cli.exceptions import ParseError
from utils import MockEnvironment, http, HTTP_OK
from fixtures import FILE_PATH_ARG, FILE_PATH, FILE_CONTENT