forked from extern/httpie-cli
Extend auth plugin API
This extends the `AuthPlugin` API by the following attributes: * `auth_require`: set to `False` to make `--auth, -a` optional * `auth_parse`: set to `False` to disable `username:password` parsing (access the raw value passed to `-a` via `self.raw_auth`). * `prompt_password`: set to`False` to disable password prompt when no password provided (only relevant when `auth_parse == True`) These changes should be 100% backwards-compatible. What needs more testing is auth support in sessions. Close #433 Close #431 Close #378 Ping teracyhq/httpie-jwt-auth#3
This commit is contained in:
parent
b879d38b07
commit
a49774d3ab
@ -9,6 +9,7 @@ This project adheres to `Semantic Versioning <http://semver.org/>`_.
|
|||||||
`1.0.0-dev`_ (Unreleased)
|
`1.0.0-dev`_ (Unreleased)
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
|
* Extended auth plugin API.
|
||||||
* Added support for ``curses``-less Python installations.
|
* Added support for ``curses``-less Python installations.
|
||||||
* Fixed ``REQUEST_ITEM`` arg incorrectly being reported as required.
|
* Fixed ``REQUEST_ITEM`` arg incorrectly being reported as required.
|
||||||
* Improved ``CTRL-C`` interrupt handling.
|
* Improved ``CTRL-C`` interrupt handling.
|
||||||
|
@ -5,22 +5,25 @@ NOTE: the CLI interface may change before reaching v1.0.
|
|||||||
"""
|
"""
|
||||||
from textwrap import dedent, wrap
|
from textwrap import dedent, wrap
|
||||||
# noinspection PyCompatibility
|
# noinspection PyCompatibility
|
||||||
from argparse import (RawDescriptionHelpFormatter, FileType,
|
from argparse import (
|
||||||
OPTIONAL, ZERO_OR_MORE, SUPPRESS)
|
RawDescriptionHelpFormatter, FileType,
|
||||||
|
OPTIONAL, ZERO_OR_MORE, SUPPRESS
|
||||||
|
)
|
||||||
|
|
||||||
from httpie import __doc__, __version__
|
from httpie import __doc__, __version__
|
||||||
from httpie.plugins.builtin import BuiltinAuthPlugin
|
from httpie.plugins.builtin import BuiltinAuthPlugin
|
||||||
from httpie.plugins import plugin_manager
|
from httpie.plugins import plugin_manager
|
||||||
from httpie.sessions import DEFAULT_SESSIONS_DIR
|
from httpie.sessions import DEFAULT_SESSIONS_DIR
|
||||||
from httpie.output.formatters.colors import AVAILABLE_STYLES, DEFAULT_STYLE
|
from httpie.output.formatters.colors import AVAILABLE_STYLES, DEFAULT_STYLE
|
||||||
from httpie.input import (HTTPieArgumentParser,
|
from httpie.input import (
|
||||||
AuthCredentialsArgType, KeyValueArgType,
|
HTTPieArgumentParser, KeyValueArgType,
|
||||||
SEP_PROXY, SEP_CREDENTIALS, SEP_GROUP_ALL_ITEMS,
|
SEP_PROXY, SEP_GROUP_ALL_ITEMS,
|
||||||
OUT_REQ_HEAD, OUT_REQ_BODY, OUT_RESP_HEAD,
|
OUT_REQ_HEAD, OUT_REQ_BODY, OUT_RESP_HEAD,
|
||||||
OUT_RESP_BODY, OUTPUT_OPTIONS,
|
OUT_RESP_BODY, OUTPUT_OPTIONS,
|
||||||
OUTPUT_OPTIONS_DEFAULT, PRETTY_MAP,
|
OUTPUT_OPTIONS_DEFAULT, PRETTY_MAP,
|
||||||
PRETTY_STDOUT_TTY_ONLY, SessionNameValidator,
|
PRETTY_STDOUT_TTY_ONLY, SessionNameValidator,
|
||||||
readable_file_arg, SSL_VERSION_ARG_MAPPING)
|
readable_file_arg, SSL_VERSION_ARG_MAPPING
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class HTTPieHelpFormatter(RawDescriptionHelpFormatter):
|
class HTTPieHelpFormatter(RawDescriptionHelpFormatter):
|
||||||
@ -414,8 +417,8 @@ sessions.add_argument(
|
|||||||
auth = parser.add_argument_group(title='Authentication')
|
auth = parser.add_argument_group(title='Authentication')
|
||||||
auth.add_argument(
|
auth.add_argument(
|
||||||
'--auth', '-a',
|
'--auth', '-a',
|
||||||
|
default=None,
|
||||||
metavar='USER[:PASS]',
|
metavar='USER[:PASS]',
|
||||||
type=AuthCredentialsArgType(SEP_CREDENTIALS),
|
|
||||||
help="""
|
help="""
|
||||||
If only the username is provided (-a username), HTTPie will prompt
|
If only the username is provided (-a username), HTTPie will prompt
|
||||||
for the password.
|
for the password.
|
||||||
@ -423,10 +426,21 @@ auth.add_argument(
|
|||||||
""",
|
""",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _AuthTypeLazyChoices(object):
|
||||||
|
# Needed for plugin testing
|
||||||
|
|
||||||
|
def __contains__(self, item):
|
||||||
|
return item in plugin_manager.get_auth_plugin_mapping()
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(plugin_manager.get_auth_plugins())
|
||||||
|
|
||||||
|
|
||||||
_auth_plugins = plugin_manager.get_auth_plugins()
|
_auth_plugins = plugin_manager.get_auth_plugins()
|
||||||
auth.add_argument(
|
auth.add_argument(
|
||||||
'--auth-type', '-A',
|
'--auth-type', '-A',
|
||||||
choices=[plugin.auth_type for plugin in _auth_plugins],
|
choices=_AuthTypeLazyChoices(),
|
||||||
default=_auth_plugins[0].auth_type,
|
default=_auth_plugins[0].auth_type,
|
||||||
help="""
|
help="""
|
||||||
The authentication mechanism to be used. Defaults to "{default}".
|
The authentication mechanism to be used. Defaults to "{default}".
|
||||||
|
@ -145,11 +145,6 @@ def get_requests_kwargs(args, base_headers=None):
|
|||||||
headers.update(args.headers)
|
headers.update(args.headers)
|
||||||
headers = finalize_headers(headers)
|
headers = finalize_headers(headers)
|
||||||
|
|
||||||
credentials = None
|
|
||||||
if args.auth:
|
|
||||||
auth_plugin = plugin_manager.get_auth_plugin(args.auth_type)()
|
|
||||||
credentials = auth_plugin.get_auth(args.auth.key, args.auth.value)
|
|
||||||
|
|
||||||
cert = None
|
cert = None
|
||||||
if args.cert:
|
if args.cert:
|
||||||
cert = args.cert
|
cert = args.cert
|
||||||
@ -168,7 +163,7 @@ def get_requests_kwargs(args, base_headers=None):
|
|||||||
}.get(args.verify, args.verify),
|
}.get(args.verify, args.verify),
|
||||||
'cert': cert,
|
'cert': cert,
|
||||||
'timeout': args.timeout,
|
'timeout': args.timeout,
|
||||||
'auth': credentials,
|
'auth': args.auth,
|
||||||
'proxies': dict((p.key, p.value) for p in args.proxy),
|
'proxies': dict((p.key, p.value) for p in args.proxy),
|
||||||
'files': args.files,
|
'files': args.files,
|
||||||
'allow_redirects': args.follow,
|
'allow_redirects': args.follow,
|
||||||
|
@ -15,6 +15,7 @@ from argparse import ArgumentParser, ArgumentTypeError, ArgumentError
|
|||||||
|
|
||||||
# TODO: Use MultiDict for headers once added to `requests`.
|
# TODO: Use MultiDict for headers once added to `requests`.
|
||||||
# https://github.com/jkbrzt/httpie/issues/130
|
# https://github.com/jkbrzt/httpie/issues/130
|
||||||
|
from httpie.plugins import plugin_manager
|
||||||
from requests.structures import CaseInsensitiveDict
|
from requests.structures import CaseInsensitiveDict
|
||||||
|
|
||||||
from httpie.compat import OrderedDict, urlsplit, str, is_pypy, is_py27
|
from httpie.compat import OrderedDict, urlsplit, str, is_pypy, is_py27
|
||||||
@ -214,31 +215,56 @@ class HTTPieArgumentParser(ArgumentParser):
|
|||||||
self.env.stdout_isatty = False
|
self.env.stdout_isatty = False
|
||||||
|
|
||||||
def _process_auth(self):
|
def _process_auth(self):
|
||||||
"""
|
# TODO: refactor
|
||||||
If only a username provided via --auth, then ask for a password.
|
self.args.auth_plugin = None
|
||||||
Or, take credentials from the URL, if provided.
|
default_auth_plugin = plugin_manager.get_auth_plugins()[0]
|
||||||
|
auth_type_set = self.args.auth_type != default_auth_plugin.auth_type
|
||||||
"""
|
|
||||||
url = urlsplit(self.args.url)
|
url = urlsplit(self.args.url)
|
||||||
|
|
||||||
if self.args.auth:
|
if self.args.auth is None and not auth_type_set:
|
||||||
if not self.args.auth.has_password():
|
if url.username is not None:
|
||||||
# Stdin already read (if not a tty) so it's save to prompt.
|
# Handle http://username:password@hostname/
|
||||||
if self.args.ignore_stdin:
|
username = url.username
|
||||||
self.error('Unable to prompt for passwords because'
|
password = url.password or ''
|
||||||
' --ignore-stdin is set.')
|
self.args.auth = AuthCredentials(
|
||||||
self.args.auth.prompt_password(url.netloc)
|
key=username,
|
||||||
|
value=password,
|
||||||
|
sep=SEP_CREDENTIALS,
|
||||||
|
orig=SEP_CREDENTIALS.join([username, password])
|
||||||
|
)
|
||||||
|
|
||||||
elif url.username is not None:
|
if self.args.auth is not None or auth_type_set:
|
||||||
# Handle http://username:password@hostname/
|
plugin = plugin_manager.get_auth_plugin(self.args.auth_type)()
|
||||||
username = url.username
|
|
||||||
password = url.password or ''
|
if plugin.auth_require and self.args.auth is None:
|
||||||
self.args.auth = AuthCredentials(
|
self.error('--auth required')
|
||||||
key=username,
|
|
||||||
value=password,
|
plugin.raw_auth = self.args.auth
|
||||||
sep=SEP_CREDENTIALS,
|
self.args.auth_plugin = plugin
|
||||||
orig=SEP_CREDENTIALS.join([username, password])
|
already_parsed = isinstance(self.args.auth, AuthCredentials)
|
||||||
)
|
|
||||||
|
if self.args.auth is None or not plugin.auth_parse:
|
||||||
|
self.args.auth = plugin.get_auth()
|
||||||
|
else:
|
||||||
|
if already_parsed:
|
||||||
|
# from the URL
|
||||||
|
credentials = self.args.auth
|
||||||
|
else:
|
||||||
|
credentials = parse_auth(self.args.auth)
|
||||||
|
|
||||||
|
if (not credentials.has_password()
|
||||||
|
and plugin.prompt_password):
|
||||||
|
if self.args.ignore_stdin:
|
||||||
|
# Non-tty stdin read by now
|
||||||
|
self.error(
|
||||||
|
'Unable to prompt for passwords because'
|
||||||
|
' --ignore-stdin is set.'
|
||||||
|
)
|
||||||
|
credentials.prompt_password(url.netloc)
|
||||||
|
self.args.auth = plugin.get_auth(
|
||||||
|
username=credentials.key,
|
||||||
|
password=credentials.value,
|
||||||
|
)
|
||||||
|
|
||||||
def _apply_no_options(self, no_options):
|
def _apply_no_options(self, no_options):
|
||||||
"""For every `--no-OPTION` in `no_options`, set `args.OPTION` to
|
"""For every `--no-OPTION` in `no_options`, set `args.OPTION` to
|
||||||
@ -578,6 +604,9 @@ class AuthCredentialsArgType(KeyValueArgType):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
parse_auth = AuthCredentialsArgType(SEP_CREDENTIALS)
|
||||||
|
|
||||||
|
|
||||||
class RequestItemsDict(OrderedDict):
|
class RequestItemsDict(OrderedDict):
|
||||||
"""Multi-value dict for URL parameters and form data."""
|
"""Multi-value dict for URL parameters and form data."""
|
||||||
|
|
||||||
|
@ -17,13 +17,36 @@ class AuthPlugin(BasePlugin):
|
|||||||
|
|
||||||
See <https://github.com/jkbrzt/httpie-ntlm> for an example auth plugin.
|
See <https://github.com/jkbrzt/httpie-ntlm> for an example auth plugin.
|
||||||
|
|
||||||
|
See also `test_auth_plugins.py`
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# The value that should be passed to --auth-type
|
# The value that should be passed to --auth-type
|
||||||
# to use this auth plugin. Eg. "my-auth"
|
# to use this auth plugin. Eg. "my-auth"
|
||||||
auth_type = None
|
auth_type = None
|
||||||
|
|
||||||
def get_auth(self, username, password):
|
# Set to `False` to make it possible to invoke this auth
|
||||||
|
# plugin without requiring the user to specify credentials
|
||||||
|
# through `--auth, -a`.
|
||||||
|
auth_require = True
|
||||||
|
|
||||||
|
# By default the `-a` argument is parsed for `username:password`.
|
||||||
|
# Set this to `False` to disable the parsing and error handling.
|
||||||
|
auth_parse = True
|
||||||
|
|
||||||
|
# If both `auth_parse` and `prompt_password` are set to `True`,
|
||||||
|
# and the value of `-a` lacks the password part,
|
||||||
|
# then the user will be prompted to type the password in.
|
||||||
|
prompt_password = True
|
||||||
|
|
||||||
|
# Will be set to the raw value of `-a` (if provided) before
|
||||||
|
# `get_auth()` gets called.
|
||||||
|
raw_auth = None
|
||||||
|
|
||||||
|
def get_auth(self, username=None, password=None):
|
||||||
"""
|
"""
|
||||||
|
If `auth_parse` is set to `True`, then `username`
|
||||||
|
and `password` contain the parsed credentials.
|
||||||
|
|
||||||
Return a ``requests.auth.AuthBase`` subclass instance.
|
Return a ``requests.auth.AuthBase`` subclass instance.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -36,6 +36,7 @@ class BasicAuthPlugin(BuiltinAuthPlugin):
|
|||||||
name = 'Basic HTTP auth'
|
name = 'Basic HTTP auth'
|
||||||
auth_type = 'basic'
|
auth_type = 'basic'
|
||||||
|
|
||||||
|
# noinspection PyMethodOverriding
|
||||||
def get_auth(self, username, password):
|
def get_auth(self, username, password):
|
||||||
return HTTPBasicAuth(username, password)
|
return HTTPBasicAuth(username, password)
|
||||||
|
|
||||||
@ -45,5 +46,6 @@ class DigestAuthPlugin(BuiltinAuthPlugin):
|
|||||||
name = 'Digest HTTP auth'
|
name = 'Digest HTTP auth'
|
||||||
auth_type = 'digest'
|
auth_type = 'digest'
|
||||||
|
|
||||||
|
# noinspection PyMethodOverriding
|
||||||
def get_auth(self, username, password):
|
def get_auth(self, username, password):
|
||||||
return requests.auth.HTTPDigestAuth(username, password)
|
return requests.auth.HTTPDigestAuth(username, password)
|
||||||
|
@ -24,6 +24,9 @@ class PluginManager(object):
|
|||||||
for plugin in plugins:
|
for plugin in plugins:
|
||||||
self._plugins.append(plugin)
|
self._plugins.append(plugin)
|
||||||
|
|
||||||
|
def unregister(self, plugin):
|
||||||
|
self._plugins.remove(plugin)
|
||||||
|
|
||||||
def load_installed_plugins(self):
|
def load_installed_plugins(self):
|
||||||
for entry_point_name in ENTRY_POINT_NAMES:
|
for entry_point_name in ENTRY_POINT_NAMES:
|
||||||
for entry_point in iter_entry_points(entry_point_name):
|
for entry_point in iter_entry_points(entry_point_name):
|
||||||
|
@ -51,11 +51,10 @@ def get_response(requests_session, session_name,
|
|||||||
dump_request(kwargs)
|
dump_request(kwargs)
|
||||||
session.update_headers(kwargs['headers'])
|
session.update_headers(kwargs['headers'])
|
||||||
|
|
||||||
if args.auth:
|
if args.auth_plugin:
|
||||||
session.auth = {
|
session.auth = {
|
||||||
'type': args.auth_type,
|
'type': args.auth_plugin.auth_type,
|
||||||
'username': args.auth.key,
|
'raw_auth': args.auth_plugin.raw_auth,
|
||||||
'password': args.auth.value,
|
|
||||||
}
|
}
|
||||||
elif session.auth:
|
elif session.auth:
|
||||||
kwargs['auth'] = session.auth
|
kwargs['auth'] = session.auth
|
||||||
@ -147,10 +146,31 @@ class Session(BaseConfigDict):
|
|||||||
auth = self.get('auth', None)
|
auth = self.get('auth', None)
|
||||||
if not auth or not auth['type']:
|
if not auth or not auth['type']:
|
||||||
return
|
return
|
||||||
auth_plugin = plugin_manager.get_auth_plugin(auth['type'])()
|
|
||||||
return auth_plugin.get_auth(auth['username'], auth['password'])
|
plugin = plugin_manager.get_auth_plugin(auth['type'])()
|
||||||
|
|
||||||
|
credentials = {'username': None, 'password': None}
|
||||||
|
try:
|
||||||
|
# New style
|
||||||
|
plugin.raw_auth = auth['raw_auth']
|
||||||
|
except KeyError:
|
||||||
|
# Old style
|
||||||
|
credentials = {
|
||||||
|
'username': auth['username'],
|
||||||
|
'password': auth['password'],
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
if plugin.auth_parse:
|
||||||
|
from httpie.input import parse_auth
|
||||||
|
parsed = parse_auth(plugin.raw_auth)
|
||||||
|
credentials = {
|
||||||
|
'username': parsed.key,
|
||||||
|
'password': parsed.value,
|
||||||
|
}
|
||||||
|
|
||||||
|
return plugin.get_auth(**credentials)
|
||||||
|
|
||||||
@auth.setter
|
@auth.setter
|
||||||
def auth(self, auth):
|
def auth(self, auth):
|
||||||
assert set(['type', 'username', 'password']) == set(auth.keys())
|
assert set(['type', 'raw_auth']) == set(auth.keys())
|
||||||
self['auth'] = auth
|
self['auth'] = auth
|
||||||
|
@ -60,5 +60,5 @@ def test_only_username_in_url(url):
|
|||||||
"""
|
"""
|
||||||
args = httpie.cli.parser.parse_args(args=[url], env=TestEnvironment())
|
args = httpie.cli.parser.parse_args(args=[url], env=TestEnvironment())
|
||||||
assert args.auth
|
assert args.auth
|
||||||
assert args.auth.key == 'username'
|
assert args.auth.username == 'username'
|
||||||
assert args.auth.value == ''
|
assert args.auth.password == ''
|
||||||
|
89
tests/test_auth_plugins.py
Normal file
89
tests/test_auth_plugins.py
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
from utils import http, HTTP_OK
|
||||||
|
from httpie.plugins import AuthPlugin, plugin_manager
|
||||||
|
|
||||||
|
# TODO: run all these tests in session mode as well
|
||||||
|
|
||||||
|
# Basic auth encoded 'username' and 'password'
|
||||||
|
BASIC_AUTH_HEADER_VALUE = 'Basic dXNlcm5hbWU6cGFzc3dvcmQ='
|
||||||
|
BASIC_AUTH_URL = '/basic-auth/username/password'
|
||||||
|
|
||||||
|
|
||||||
|
def dummy_auth(auth_header=BASIC_AUTH_HEADER_VALUE):
|
||||||
|
|
||||||
|
def inner(r):
|
||||||
|
r.headers['Authorization'] = auth_header
|
||||||
|
return r
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_plugin_parse_false(httpbin):
|
||||||
|
|
||||||
|
class ParseFalseAuthPlugin(AuthPlugin):
|
||||||
|
auth_type = 'parse-false'
|
||||||
|
auth_parse = False
|
||||||
|
|
||||||
|
def get_auth(self, username=None, password=None):
|
||||||
|
assert username is None
|
||||||
|
assert password is None
|
||||||
|
assert self.raw_auth == BASIC_AUTH_HEADER_VALUE
|
||||||
|
return dummy_auth(self.raw_auth)
|
||||||
|
|
||||||
|
plugin_manager.register(ParseFalseAuthPlugin)
|
||||||
|
try:
|
||||||
|
r = http(
|
||||||
|
httpbin + BASIC_AUTH_URL,
|
||||||
|
'--auth-type', 'parse-false',
|
||||||
|
'--auth', BASIC_AUTH_HEADER_VALUE
|
||||||
|
)
|
||||||
|
assert HTTP_OK in r
|
||||||
|
finally:
|
||||||
|
plugin_manager.unregister(ParseFalseAuthPlugin)
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_plugin_require_false(httpbin):
|
||||||
|
|
||||||
|
class RequireFalseAuthPlugin(AuthPlugin):
|
||||||
|
auth_type = 'require-false'
|
||||||
|
auth_require = False
|
||||||
|
|
||||||
|
def get_auth(self, username=None, password=None):
|
||||||
|
assert self.raw_auth is None
|
||||||
|
assert username is None
|
||||||
|
assert password is None
|
||||||
|
return dummy_auth()
|
||||||
|
|
||||||
|
plugin_manager.register(RequireFalseAuthPlugin)
|
||||||
|
try:
|
||||||
|
r = http(
|
||||||
|
httpbin + BASIC_AUTH_URL,
|
||||||
|
'--auth-type', 'require-false',
|
||||||
|
)
|
||||||
|
assert HTTP_OK in r
|
||||||
|
finally:
|
||||||
|
plugin_manager.unregister(RequireFalseAuthPlugin)
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_plugin_prompt_false(httpbin):
|
||||||
|
|
||||||
|
class PromptFalseAuthPlugin(AuthPlugin):
|
||||||
|
auth_type = 'prompt-false'
|
||||||
|
prompt_password = False
|
||||||
|
|
||||||
|
def get_auth(self, username=None, password=None):
|
||||||
|
assert self.raw_auth == 'username:'
|
||||||
|
assert username == 'username'
|
||||||
|
assert password == ''
|
||||||
|
return dummy_auth()
|
||||||
|
|
||||||
|
plugin_manager.register(PromptFalseAuthPlugin)
|
||||||
|
|
||||||
|
try:
|
||||||
|
r = http(
|
||||||
|
httpbin + BASIC_AUTH_URL,
|
||||||
|
'--auth-type', 'prompt-false',
|
||||||
|
'--auth', 'username:'
|
||||||
|
)
|
||||||
|
assert HTTP_OK in r
|
||||||
|
finally:
|
||||||
|
plugin_manager.unregister(PromptFalseAuthPlugin)
|
Loading…
Reference in New Issue
Block a user