httpie-cli/httpie/core.py

278 lines
9.7 KiB
Python
Raw Normal View History

import argparse
import os
2016-03-04 18:42:13 +01:00
import platform
import sys
import socket
from typing import List, Optional, Union, Callable
2012-07-26 00:26:23 +02:00
import requests
2012-08-07 14:50:51 +02:00
from pygments import __version__ as pygments_version
from requests import __version__ as requests_version
2012-07-26 00:26:23 +02:00
from . import __version__ as httpie_version
from .cli.constants import OUT_REQ_BODY
from .client import collect_messages
from .context import Environment
from .downloads import Downloader
from .models import (
RequestsMessageKind,
OutputOptions,
)
from .output.writer import write_message, write_stream, MESSAGE_SEPARATOR_BYTES
from .plugins.registry import plugin_manager
from .status import ExitStatus, http_status_to_exit_status
from .utils import unwrap_context
# noinspection PyDefaultArgument
def raw_main(
parser: argparse.ArgumentParser,
main_program: Callable[[argparse.Namespace, Environment], ExitStatus],
args: List[Union[str, bytes]] = sys.argv,
env: Environment = Environment()
) -> ExitStatus:
program_name, *args = args
env.program_name = os.path.basename(program_name)
args = decode_raw_args(args, env.stdin_encoding)
plugin_manager.load_installed_plugins(env.config.plugins_dir)
if env.config.default_options:
args = env.config.default_options + args
include_debug_info = '--debug' in args
include_traceback = include_debug_info or '--traceback' in args
def handle_generic_error(e, annotation=None):
msg = str(e)
if hasattr(e, 'request'):
request = e.request
if hasattr(request, 'url'):
msg = (
f'{msg} while doing a {request.method}'
f' request to URL: {request.url}'
)
if annotation:
msg += annotation
env.log_error(f'{type(e).__name__}: {msg}')
if include_traceback:
raise
if include_debug_info:
print_debug_info(env)
if args == ['--debug']:
return ExitStatus.SUCCESS
exit_status = ExitStatus.SUCCESS
try:
parsed_args = parser.parse_args(
args=args,
env=env,
)
except KeyboardInterrupt:
env.stderr.write('\n')
if include_traceback:
raise
exit_status = ExitStatus.ERROR_CTRL_C
except SystemExit as e:
if e.code != ExitStatus.SUCCESS:
env.stderr.write('\n')
if include_traceback:
raise
exit_status = ExitStatus.ERROR
else:
try:
exit_status = main_program(
args=parsed_args,
env=env,
)
except KeyboardInterrupt:
env.stderr.write('\n')
if include_traceback:
raise
exit_status = ExitStatus.ERROR_CTRL_C
except SystemExit as e:
if e.code != ExitStatus.SUCCESS:
env.stderr.write('\n')
if include_traceback:
raise
exit_status = ExitStatus.ERROR
except requests.Timeout:
exit_status = ExitStatus.ERROR_TIMEOUT
env.log_error(f'Request timed out ({parsed_args.timeout}s).')
except requests.TooManyRedirects:
exit_status = ExitStatus.ERROR_TOO_MANY_REDIRECTS
env.log_error(
f'Too many redirects'
f' (--max-redirects={parsed_args.max_redirects}).'
)
except requests.exceptions.ConnectionError as exc:
annotation = None
original_exc = unwrap_context(exc)
if isinstance(original_exc, socket.gaierror):
if original_exc.errno == socket.EAI_AGAIN:
annotation = '\nCouldnt connect to a DNS server. Please check your connection and try again.'
elif original_exc.errno == socket.EAI_NONAME:
annotation = '\nCouldnt resolve the given hostname. Please check the URL and try again.'
propagated_exc = original_exc
else:
propagated_exc = exc
handle_generic_error(propagated_exc, annotation=annotation)
exit_status = ExitStatus.ERROR
except Exception as e:
# TODO: Further distinction between expected and unexpected errors.
handle_generic_error(e)
exit_status = ExitStatus.ERROR
2012-12-11 12:54:34 +01:00
return exit_status
def main(
args: List[Union[str, bytes]] = sys.argv,
env: Environment = Environment()
) -> ExitStatus:
"""
The main function.
Pre-process args, handle some special types of invocations,
and run the main program with error handling.
Return exit status code.
"""
from .cli.definition import parser
return raw_main(
parser=parser,
main_program=program,
args=args,
env=env
)
def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
"""
The main program without error handling.
"""
# TODO: Refactor and drastically simplify, especially so that the separator logic is elsewhere.
exit_status = ExitStatus.SUCCESS
downloader = None
initial_request: Optional[requests.PreparedRequest] = None
final_response: Optional[requests.Response] = None
def separate():
getattr(env.stdout, 'buffer', env.stdout).write(MESSAGE_SEPARATOR_BYTES)
def request_body_read_callback(chunk: bytes):
should_pipe_to_stdout = bool(
# Request body output desired
OUT_REQ_BODY in args.output_options
# & not `.read()` already pre-request (e.g., for compression)
and initial_request
# & non-EOF chunk
and chunk
)
if should_pipe_to_stdout:
msg = requests.PreparedRequest()
msg.is_body_upload_chunk = True
msg.body = chunk
msg.headers = initial_request.headers
msg_output_options = OutputOptions.from_message(msg, body=True, headers=False)
write_message(requests_message=msg, env=env, args=args, output_options=msg_output_options)
try:
if args.download:
args.follow = True # --download implies --follow.
downloader = Downloader(output_file=args.output_file, progress_file=env.stderr, resume=args.download_resume)
downloader.pre_request(args.headers)
messages = collect_messages(args=args, config_dir=env.config.directory,
request_body_read_callback=request_body_read_callback)
force_separator = False
prev_with_body = False
# Process messages as theyre generated
2020-09-25 13:44:28 +02:00
for message in messages:
output_options = OutputOptions.from_message(message, args.output_options)
do_write_body = output_options.body
if prev_with_body and output_options.any() and (force_separator or not env.stdout_isatty):
# Separate after a previous message with body, if needed. See test_tokens.py.
separate()
force_separator = False
if output_options.kind is RequestsMessageKind.REQUEST:
if not initial_request:
initial_request = message
if output_options.body:
is_streamed_upload = not isinstance(message.body, (str, bytes))
do_write_body = not is_streamed_upload
force_separator = is_streamed_upload and env.stdout_isatty
else:
final_response = message
if args.check_status or downloader:
exit_status = http_status_to_exit_status(http_status=message.status_code, follow=args.follow)
if exit_status != ExitStatus.SUCCESS and (not env.stdout_isatty or args.quiet == 1):
env.log_error(f'HTTP {message.raw.status} {message.raw.reason}', level='warning')
write_message(requests_message=message, env=env, args=args, output_options=output_options._replace(
body=do_write_body
))
prev_with_body = output_options.body
# Cleanup
if force_separator:
separate()
if downloader and exit_status == ExitStatus.SUCCESS:
# Last response body download.
download_stream, download_to = downloader.start(
initial_url=initial_request.url,
final_response=final_response,
)
write_stream(stream=download_stream, outfile=download_to, flush=False)
downloader.finish()
if downloader.interrupted:
exit_status = ExitStatus.ERROR
env.log_error(
f'Incomplete download: size={downloader.status.total_size};'
f' downloaded={downloader.status.downloaded}'
)
return exit_status
finally:
if downloader and not downloader.finished:
downloader.failed()
2021-09-02 16:47:01 +02:00
if args.output_file and args.output_file_specified:
args.output_file.close()
def print_debug_info(env: Environment):
env.stderr.writelines([
f'HTTPie {httpie_version}\n',
f'Requests {requests_version}\n',
f'Pygments {pygments_version}\n',
f'Python {sys.version}\n{sys.executable}\n',
f'{platform.system()} {platform.release()}',
])
env.stderr.write('\n\n')
env.stderr.write(repr(env))
env.stderr.write('\n\n')
env.stderr.write(repr(plugin_manager))
env.stderr.write('\n')
def decode_raw_args(
args: List[Union[str, bytes]],
stdin_encoding: str
) -> List[str]:
"""
Convert all bytes args to str
by decoding them using stdin encoding.
"""
return [
arg.decode(stdin_encoding)
2020-12-21 12:03:25 +01:00
if type(arg) is bytes else arg
for arg in args
]