Merge pull request #16 from Group-18-DD2480/feat-parser

Extended implementation of the parser
This commit is contained in:
Jakub Rybak 2025-03-07 14:24:02 +01:00 committed by GitHub
commit 5cc4edc9bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1129 additions and 367 deletions

View File

@ -5,20 +5,39 @@ import textwrap
from argparse import FileType
from httpie import __doc__, __version__
from httpie.cli.argtypes import (KeyValueArgType, SessionNameValidator,
SSLCredentials, readable_file_arg,
response_charset_type, response_mime_type)
from httpie.cli.constants import (BASE_OUTPUT_OPTIONS, DEFAULT_FORMAT_OPTIONS,
OUT_REQ_BODY, OUT_REQ_HEAD, OUT_RESP_BODY,
OUT_RESP_HEAD, OUT_RESP_META, OUTPUT_OPTIONS,
OUTPUT_OPTIONS_DEFAULT, PRETTY_MAP,
PRETTY_STDOUT_TTY_ONLY,
SEPARATOR_GROUP_ALL_ITEMS, SEPARATOR_PROXY,
SORTED_FORMAT_OPTIONS_STRING,
UNSORTED_FORMAT_OPTIONS_STRING, RequestType)
from httpie.cli.argtypes import (
KeyValueArgType,
SessionNameValidator,
SSLCredentials,
readable_file_arg,
response_charset_type,
response_mime_type,
)
from httpie.cli.constants import (
BASE_OUTPUT_OPTIONS,
DEFAULT_FORMAT_OPTIONS,
OUT_REQ_BODY,
OUT_REQ_HEAD,
OUT_RESP_BODY,
OUT_RESP_HEAD,
OUT_RESP_META,
OUTPUT_OPTIONS,
OUTPUT_OPTIONS_DEFAULT,
PRETTY_MAP,
PRETTY_STDOUT_TTY_ONLY,
SEPARATOR_GROUP_ALL_ITEMS,
SEPARATOR_PROXY,
SORTED_FORMAT_OPTIONS_STRING,
UNSORTED_FORMAT_OPTIONS_STRING,
RequestType,
)
from httpie.cli.options import ParserSpec, Qualifiers, to_argparse
from httpie.output.formatters.colors import (AUTO_STYLE, DEFAULT_STYLE, BUNDLED_STYLES,
get_available_styles)
from httpie.output.formatters.colors import (
AUTO_STYLE,
DEFAULT_STYLE,
BUNDLED_STYLES,
get_available_styles,
)
from httpie.plugins.builtin import BuiltinAuthPlugin
from httpie.plugins.registry import plugin_manager
from httpie.ssl_ import AVAILABLE_SSL_VERSION_ARG_MAPPING, DEFAULT_SSL_CIPHERS_STRING
@ -26,12 +45,12 @@ from httpie.ssl_ import AVAILABLE_SSL_VERSION_ARG_MAPPING, DEFAULT_SSL_CIPHERS_S
# Man pages are static (built when making a release).
# We use this check to not include generated, system-specific information there (e.g., default --ciphers).
IS_MAN_PAGE = bool(os.environ.get('HTTPIE_BUILDING_MAN_PAGES'))
IS_MAN_PAGE = bool(os.environ.get("HTTPIE_BUILDING_MAN_PAGES"))
options = ParserSpec(
'http',
description=f'{__doc__.strip()} <https://httpie.io>',
"http",
description=f"{__doc__.strip()} <https://httpie.io>",
epilog="""
For every --OPTION there is also a --no-OPTION that reverts OPTION
to its default value.
@ -39,7 +58,7 @@ options = ParserSpec(
Suggestions and bug reports are greatly appreciated:
https://github.com/httpie/cli/issues
""",
source_file=__file__
source_file=__file__,
)
#######################################################################
@ -47,7 +66,7 @@ options = ParserSpec(
#######################################################################
positional_arguments = options.add_group(
'Positional arguments',
"Positional arguments",
description="""
These arguments come after any flags and in the order they are listed here.
Only URL is required.
@ -55,11 +74,11 @@ positional_arguments = options.add_group(
)
positional_arguments.add_argument(
dest='method',
metavar='METHOD',
dest="method",
metavar="METHOD",
nargs=Qualifiers.OPTIONAL,
default=None,
short_help='The HTTP method to be used for the request (GET, POST, PUT, DELETE, ...).',
short_help="The HTTP method to be used for the request (GET, POST, PUT, DELETE, ...).",
help="""
The HTTP method to be used for the request (GET, POST, PUT, DELETE, ...).
@ -72,9 +91,9 @@ positional_arguments.add_argument(
""",
)
positional_arguments.add_argument(
dest='url',
metavar='URL',
short_help='The request URL.',
dest="url",
metavar="URL",
short_help="The request URL.",
help="""
The request URL. Scheme defaults to 'http://' if the URL
does not include one. (You can override this with: --default-scheme=http/https)
@ -87,21 +106,29 @@ positional_arguments.add_argument(
""",
)
positional_arguments.add_argument(
dest='request_items',
metavar='REQUEST_ITEM',
dest="request_items",
metavar="REQUEST_ITEM",
nargs=Qualifiers.ZERO_OR_MORE,
default=None,
type=KeyValueArgType(*SEPARATOR_GROUP_ALL_ITEMS),
short_help=(
'HTTPies request items syntax for specifying HTTP headers, JSON/Form'
'data, files, and URL parameters.'
"HTTPies request items syntax for specifying HTTP headers, JSON/Form"
"data, files, and URL parameters."
),
nested_options=[
('HTTP Headers', 'Name:Value', 'Arbitrary HTTP header, e.g X-API-Token:123'),
('URL Parameters', 'name==value', 'Querystring parameter to the URL, e.g limit==50'),
('Data Fields', 'field=value', 'Data fields to be serialized as JSON (default) or Form Data (with --form)'),
('Raw JSON Fields', 'field:=json', 'Data field for real JSON types.'),
('File upload Fields', 'field@/dir/file', 'Path field for uploading a file.'),
("HTTP Headers", "Name:Value", "Arbitrary HTTP header, e.g X-API-Token:123"),
(
"URL Parameters",
"name==value",
"Querystring parameter to the URL, e.g limit==50",
),
(
"Data Fields",
"field=value",
"Data fields to be serialized as JSON (default) or Form Data (with --form)",
),
("Raw JSON Fields", "field:=json", "Data field for real JSON types."),
("File upload Fields", "field@/dir/file", "Path field for uploading a file."),
],
help=r"""
Optional key-value pairs to be included in the request. The separator used
@ -148,15 +175,15 @@ positional_arguments.add_argument(
# Content type.
#######################################################################
content_types = options.add_group('Predefined content types')
content_types = options.add_group("Predefined content types")
content_types.add_argument(
'--json',
'-j',
action='store_const',
"--json",
"-j",
action="store_const",
const=RequestType.JSON,
dest='request_type',
short_help='(default) Serialize data items from the command line as a JSON object.',
dest="request_type",
short_help="(default) Serialize data items from the command line as a JSON object.",
help="""
(default) Data items from the command line are serialized as a JSON object.
The Content-Type and Accept headers are set to application/json
@ -165,12 +192,12 @@ content_types.add_argument(
""",
)
content_types.add_argument(
'--form',
'-f',
action='store_const',
"--form",
"-f",
action="store_const",
const=RequestType.FORM,
dest='request_type',
short_help='Serialize data items from the command line as form field data.',
dest="request_type",
short_help="Serialize data items from the command line as form field data.",
help="""
Data items from the command line are serialized as form fields.
@ -181,25 +208,25 @@ content_types.add_argument(
""",
)
content_types.add_argument(
'--multipart',
action='store_const',
"--multipart",
action="store_const",
const=RequestType.MULTIPART,
dest='request_type',
dest="request_type",
short_help=(
'Similar to --form, but always sends a multipart/form-data '
'request (i.e., even without files).'
)
"Similar to --form, but always sends a multipart/form-data "
"request (i.e., even without files)."
),
)
content_types.add_argument(
'--boundary',
"--boundary",
short_help=(
'Specify a custom boundary string for multipart/form-data requests. '
'Only has effect only together with --form.'
)
"Specify a custom boundary string for multipart/form-data requests. "
"Only has effect only together with --form."
),
)
content_types.add_argument(
'--raw',
short_help='Pass raw request data without extra processing.',
"--raw",
short_help="Pass raw request data without extra processing.",
help="""
This option allows you to pass raw request data without extra processing
(as opposed to the structured request items syntax):
@ -224,7 +251,7 @@ content_types.add_argument(
default=False,
short_help="Parse and send an HTTP request from a .http file",
help="""
Parse and send an HTTP request from a file in .http format.
Parse and send an HTTP request from a file in .http format.
The file should contain a valid HTTP request with headers and body.
If this is specified, URL will be treated as a file path.
""",
@ -234,14 +261,14 @@ content_types.add_argument(
# Content processing.
#######################################################################
processing_options = options.add_group('Content processing options')
processing_options = options.add_group("Content processing options")
processing_options.add_argument(
'--compress',
'-x',
action='count',
"--compress",
"-x",
action="count",
default=0,
short_help='Compress the content with Deflate algorithm.',
short_help="Compress the content with Deflate algorithm.",
help="""
Content compressed (encoded) with Deflate algorithm.
The Content-Encoding header is set to deflate.
@ -265,9 +292,9 @@ def format_style_help(available_styles, *, isolation_mode: bool = False):
{available_styles}
"""
if isolation_mode:
text += '\n\n'
text += 'For finding out all available styles in your system, try:\n\n'
text += ' $ http --style\n'
text += "\n\n"
text += "For finding out all available styles in your system, try:\n\n"
text += " $ http --style\n"
text += textwrap.dedent("""
The "{auto_style}" style follows your terminal's ANSI color styles.
For non-{auto_style} styles to work properly, please make sure that the
@ -278,9 +305,8 @@ def format_style_help(available_styles, *, isolation_mode: bool = False):
if isolation_mode:
available_styles = sorted(BUNDLED_STYLES)
available_styles_text = '\n'.join(
f' {line.strip()}'
for line in textwrap.wrap(', '.join(available_styles), 60)
available_styles_text = "\n".join(
f" {line.strip()}" for line in textwrap.wrap(", ".join(available_styles), 60)
).strip()
return text.format(
default=DEFAULT_STYLE,
@ -290,24 +316,24 @@ def format_style_help(available_styles, *, isolation_mode: bool = False):
_sorted_kwargs = {
'action': 'append_const',
'const': SORTED_FORMAT_OPTIONS_STRING,
'dest': 'format_options',
"action": "append_const",
"const": SORTED_FORMAT_OPTIONS_STRING,
"dest": "format_options",
}
_unsorted_kwargs = {
'action': 'append_const',
'const': UNSORTED_FORMAT_OPTIONS_STRING,
'dest': 'format_options',
"action": "append_const",
"const": UNSORTED_FORMAT_OPTIONS_STRING,
"dest": "format_options",
}
output_processing = options.add_group('Output processing')
output_processing = options.add_group("Output processing")
output_processing.add_argument(
'--pretty',
dest='prettify',
"--pretty",
dest="prettify",
default=PRETTY_STDOUT_TTY_ONLY,
choices=sorted(PRETTY_MAP.keys()),
short_help='Control the processing of console outputs.',
short_help="Control the processing of console outputs.",
help="""
Controls output processing. The value can be "none" to not prettify
the output (default for redirected output), "all" to apply both colors
@ -316,12 +342,12 @@ output_processing.add_argument(
""",
)
output_processing.add_argument(
'--style',
'-s',
dest='style',
metavar='STYLE',
"--style",
"-s",
dest="style",
metavar="STYLE",
default=DEFAULT_STYLE,
action='lazy_choices',
action="lazy_choices",
getter=get_available_styles,
short_help=f'Output coloring style (default is "{DEFAULT_STYLE}").',
help_formatter=format_style_help,
@ -330,16 +356,16 @@ output_processing.add_argument(
# The closest approx. of the documented resetting to default via --no-<option>.
# We hide them from the doc because they act only as low-level aliases here.
output_processing.add_argument(
'--no-unsorted', **_sorted_kwargs, help=Qualifiers.SUPPRESS
"--no-unsorted", **_sorted_kwargs, help=Qualifiers.SUPPRESS
)
output_processing.add_argument(
'--no-sorted', **_unsorted_kwargs, help=Qualifiers.SUPPRESS
"--no-sorted", **_unsorted_kwargs, help=Qualifiers.SUPPRESS
)
output_processing.add_argument(
'--unsorted',
"--unsorted",
**_unsorted_kwargs,
short_help='Disables all sorting while formatting output.',
short_help="Disables all sorting while formatting output.",
help=f"""
Disables all sorting while formatting output. It is a shortcut for:
@ -348,9 +374,9 @@ output_processing.add_argument(
""",
)
output_processing.add_argument(
'--sorted',
"--sorted",
**_sorted_kwargs,
short_help='Re-enables all sorting options while formatting output.',
short_help="Re-enables all sorting options while formatting output.",
help=f"""
Re-enables all sorting options while formatting output. It is a shortcut for:
@ -359,10 +385,10 @@ output_processing.add_argument(
""",
)
output_processing.add_argument(
'--response-charset',
metavar='ENCODING',
"--response-charset",
metavar="ENCODING",
type=response_charset_type,
short_help='Override the response encoding for terminal display purposes.',
short_help="Override the response encoding for terminal display purposes.",
help="""
Override the response encoding for terminal display purposes, e.g.:
@ -372,10 +398,10 @@ output_processing.add_argument(
""",
)
output_processing.add_argument(
'--response-mime',
metavar='MIME_TYPE',
"--response-mime",
metavar="MIME_TYPE",
type=response_mime_type,
short_help='Override the response mime type for coloring and formatting for the terminal.',
short_help="Override the response mime type for coloring and formatting for the terminal.",
help="""
Override the response mime type for coloring and formatting for the terminal, e.g.:
@ -385,9 +411,9 @@ output_processing.add_argument(
""",
)
output_processing.add_argument(
'--format-options',
action='append',
short_help='Controls output formatting.',
"--format-options",
action="append",
short_help="Controls output formatting.",
help="""
Controls output formatting. Only relevant when formatting is enabled
through (explicit or implied) --pretty=all or --pretty=format.
@ -404,8 +430,8 @@ output_processing.add_argument(
This is something you will typically put into your config file.
""".format(
option_list='\n'.join(
f' {option}' for option in DEFAULT_FORMAT_OPTIONS
option_list="\n".join(
f" {option}" for option in DEFAULT_FORMAT_OPTIONS
).strip()
),
)
@ -414,14 +440,14 @@ output_processing.add_argument(
# Output options
#######################################################################
output_options = options.add_group('Output options')
output_options = options.add_group("Output options")
output_options.add_argument(
'--print',
'-p',
dest='output_options',
metavar='WHAT',
short_help='Options to specify what the console output should contain.',
"--print",
"-p",
dest="output_options",
metavar="WHAT",
short_help="Options to specify what the console output should contain.",
help=f"""
String specifying what the output should contain:
@ -439,36 +465,36 @@ output_options.add_argument(
""",
)
output_options.add_argument(
'--headers',
'-h',
dest='output_options',
action='store_const',
"--headers",
"-h",
dest="output_options",
action="store_const",
const=OUT_RESP_HEAD,
short_help='Print only the response headers.',
short_help="Print only the response headers.",
help=f"""
Print only the response headers. Shortcut for --print={OUT_RESP_HEAD}.
""",
)
output_options.add_argument(
'--meta',
'-m',
dest='output_options',
action='store_const',
"--meta",
"-m",
dest="output_options",
action="store_const",
const=OUT_RESP_META,
short_help='Print only the response metadata.',
short_help="Print only the response metadata.",
help=f"""
Print only the response metadata. Shortcut for --print={OUT_RESP_META}.
""",
)
output_options.add_argument(
'--body',
'-b',
dest='output_options',
action='store_const',
"--body",
"-b",
dest="output_options",
action="store_const",
const=OUT_RESP_BODY,
short_help='Print only the response body.',
short_help="Print only the response body.",
help=f"""
Print only the response body. Shortcut for --print={OUT_RESP_BODY}.
@ -476,27 +502,27 @@ output_options.add_argument(
)
output_options.add_argument(
'--verbose',
'-v',
dest='verbose',
action='count',
"--verbose",
"-v",
dest="verbose",
action="count",
default=0,
short_help='Make output more verbose.',
short_help="Make output more verbose.",
help=f"""
Verbose output. For the level one (with single `-v`/`--verbose`), print
the whole request as well as the response. Also print any intermediary
requests/responses (such as redirects). For the second level and higher,
print these as well as the response metadata.
Level one is a shortcut for: --all --print={''.join(sorted(BASE_OUTPUT_OPTIONS))}
Level two is a shortcut for: --all --print={''.join(sorted(OUTPUT_OPTIONS))}
Level one is a shortcut for: --all --print={"".join(sorted(BASE_OUTPUT_OPTIONS))}
Level two is a shortcut for: --all --print={"".join(sorted(OUTPUT_OPTIONS))}
""",
)
output_options.add_argument(
'--all',
"--all",
default=False,
action='store_true',
short_help='Show any intermediary requests/responses.',
action="store_true",
short_help="Show any intermediary requests/responses.",
help="""
By default, only the final request/response is shown. Use this flag to show
any intermediary requests/responses as well. Intermediary requests include
@ -506,18 +532,18 @@ output_options.add_argument(
""",
)
output_options.add_argument(
'--history-print',
'-P',
dest='output_options_history',
metavar='WHAT',
"--history-print",
"-P",
dest="output_options_history",
metavar="WHAT",
help=Qualifiers.SUPPRESS,
)
output_options.add_argument(
'--stream',
'-S',
action='store_true',
"--stream",
"-S",
action="store_true",
default=False,
short_help='Always stream the response body by line, i.e., behave like `tail -f`.',
short_help="Always stream the response body by line, i.e., behave like `tail -f`.",
help="""
Always stream the response body by line, i.e., behave like `tail -f'.
@ -533,12 +559,12 @@ output_options.add_argument(
""",
)
output_options.add_argument(
'--output',
'-o',
type=FileType('a+b'),
dest='output_file',
metavar='FILE',
short_help='Save output to FILE instead of stdout.',
"--output",
"-o",
type=FileType("a+b"),
dest="output_file",
metavar="FILE",
short_help="Save output to FILE instead of stdout.",
help="""
Save output to FILE instead of stdout. If --download is also set, then only
the response body is saved to FILE. Other parts of the HTTP exchange are
@ -548,11 +574,11 @@ output_options.add_argument(
)
output_options.add_argument(
'--download',
'-d',
action='store_true',
"--download",
"-d",
action="store_true",
default=False,
short_help='Download the body to a file instead of printing it to stdout.',
short_help="Download the body to a file instead of printing it to stdout.",
help="""
Do not print the response body to stdout. Rather, download it and store it
in a file. The filename is guessed unless specified with --output
@ -561,12 +587,12 @@ output_options.add_argument(
""",
)
output_options.add_argument(
'--continue',
'-c',
dest='download_resume',
action='store_true',
"--continue",
"-c",
dest="download_resume",
action="store_true",
default=False,
short_help='Resume an interrupted download (--output needs to be specified).',
short_help="Resume an interrupted download (--output needs to be specified).",
help="""
Resume an interrupted download. Note that the --output option needs to be
specified as well.
@ -574,11 +600,11 @@ output_options.add_argument(
""",
)
output_options.add_argument(
'--quiet',
'-q',
action='count',
"--quiet",
"-q",
action="count",
default=0,
short_help='Do not print to stdout or stderr, except for errors and warnings when provided once.',
short_help="Do not print to stdout or stderr, except for errors and warnings when provided once.",
help="""
Do not print to stdout or stderr, except for errors and warnings when provided once.
Provide twice to suppress warnings as well.
@ -593,16 +619,16 @@ output_options.add_argument(
#######################################################################
session_name_validator = SessionNameValidator(
'Session name contains invalid characters.'
"Session name contains invalid characters."
)
sessions = options.add_group('Sessions', is_mutually_exclusive=True)
sessions = options.add_group("Sessions", is_mutually_exclusive=True)
sessions.add_argument(
'--session',
metavar='SESSION_NAME_OR_PATH',
"--session",
metavar="SESSION_NAME_OR_PATH",
type=session_name_validator,
short_help='Create, or reuse and update a session.',
short_help="Create, or reuse and update a session.",
help="""
Create, or reuse and update a session. Within a session, custom headers,
auth credential, as well as any cookies sent by the server persist between
@ -618,10 +644,10 @@ sessions.add_argument(
""",
)
sessions.add_argument(
'--session-read-only',
metavar='SESSION_NAME_OR_PATH',
"--session-read-only",
metavar="SESSION_NAME_OR_PATH",
type=session_name_validator,
short_help='Create or read a session without updating it',
short_help="Create or read a session without updating it",
help="""
Create or read a session without updating it form the request/response
exchange.
@ -649,24 +675,23 @@ def format_auth_help(auth_plugins_mapping, *, isolation_mode: bool = False):
for auth_plugin in auth_plugins
if issubclass(auth_plugin, BuiltinAuthPlugin)
]
text += '\n'
text += 'To see all available auth types on your system, including ones installed via plugins, run:\n\n'
text += ' $ http --auth-type'
text += "\n"
text += "To see all available auth types on your system, including ones installed via plugins, run:\n\n"
text += " $ http --auth-type"
auth_types = '\n\n '.join(
auth_types = "\n\n ".join(
'"{type}": {name}{package}{description}'.format(
type=plugin.auth_type,
name=plugin.name,
package=(
''
""
if issubclass(plugin, BuiltinAuthPlugin)
else f' (provided by {plugin.package_name})'
else f" (provided by {plugin.package_name})"
),
description=(
''
""
if not plugin.description
else '\n '
+ ('\n '.join(textwrap.wrap(plugin.description)))
else "\n " + ("\n ".join(textwrap.wrap(plugin.description)))
),
)
for plugin in auth_plugins
@ -678,14 +703,14 @@ def format_auth_help(auth_plugins_mapping, *, isolation_mode: bool = False):
)
authentication = options.add_group('Authentication')
authentication = options.add_group("Authentication")
authentication.add_argument(
'--auth',
'-a',
"--auth",
"-a",
default=None,
metavar='USER[:PASS] | TOKEN',
short_help='Credentials for the selected (-A) authentication method.',
metavar="USER[:PASS] | TOKEN",
short_help="Credentials for the selected (-A) authentication method.",
help="""
For username/password based authentication mechanisms (e.g
basic auth or digest auth) if only the username is provided
@ -694,42 +719,42 @@ authentication.add_argument(
""",
)
authentication.add_argument(
'--auth-type',
'-A',
action='lazy_choices',
"--auth-type",
"-A",
action="lazy_choices",
default=None,
getter=plugin_manager.get_auth_plugin_mapping,
sort=True,
cache=False,
short_help='The authentication mechanism to be used.',
short_help="The authentication mechanism to be used.",
help_formatter=format_auth_help,
)
authentication.add_argument(
'--ignore-netrc',
"--ignore-netrc",
default=False,
action='store_true',
short_help='Ignore credentials from .netrc.'
action="store_true",
short_help="Ignore credentials from .netrc.",
)
#######################################################################
# Network
#######################################################################
network = options.add_group('Network')
network = options.add_group("Network")
network.add_argument(
'--offline',
"--offline",
default=False,
action='store_true',
short_help='Build the request and print it but dont actually send it.'
action="store_true",
short_help="Build the request and print it but dont actually send it.",
)
network.add_argument(
'--proxy',
"--proxy",
default=[],
action='append',
metavar='PROTOCOL:PROXY_URL',
action="append",
metavar="PROTOCOL:PROXY_URL",
type=KeyValueArgType(SEPARATOR_PROXY),
short_help='String mapping of protocol to the URL of the proxy.',
short_help="String mapping of protocol to the URL of the proxy.",
help="""
String mapping protocol to the URL of the proxy
(e.g. http:http://foo.bar:3128). You can specify multiple proxies with
@ -739,39 +764,39 @@ network.add_argument(
""",
)
network.add_argument(
'--follow',
'-F',
"--follow",
"-F",
default=False,
action='store_true',
short_help='Follow 30x Location redirects.'
action="store_true",
short_help="Follow 30x Location redirects.",
)
network.add_argument(
'--max-redirects',
"--max-redirects",
type=int,
default=30,
short_help='The maximum number of redirects that should be followed (with --follow).',
short_help="The maximum number of redirects that should be followed (with --follow).",
help="""
By default, requests have a limit of 30 redirects (works with --follow).
""",
)
network.add_argument(
'--max-headers',
"--max-headers",
type=int,
default=0,
short_help=(
'The maximum number of response headers to be read before '
'giving up (default 0, i.e., no limit).'
)
"The maximum number of response headers to be read before "
"giving up (default 0, i.e., no limit)."
),
)
network.add_argument(
'--timeout',
"--timeout",
type=float,
default=0,
metavar='SECONDS',
short_help='The connection timeout of the request in seconds.',
metavar="SECONDS",
short_help="The connection timeout of the request in seconds.",
help="""
The connection timeout of the request in seconds.
The default value is 0, i.e., there is no timeout limit.
@ -783,10 +808,10 @@ network.add_argument(
""",
)
network.add_argument(
'--check-status',
"--check-status",
default=False,
action='store_true',
short_help='Exit with an error status code if the server replies with an error.',
action="store_true",
short_help="Exit with an error status code if the server replies with an error.",
help="""
By default, HTTPie exits with 0 when no network or other fatal errors
occur. This flag instructs HTTPie to also check the HTTP status code and
@ -800,30 +825,30 @@ network.add_argument(
""",
)
network.add_argument(
'--path-as-is',
"--path-as-is",
default=False,
action='store_true',
short_help='Bypass dot segment (/../ or /./) URL squashing.'
action="store_true",
short_help="Bypass dot segment (/../ or /./) URL squashing.",
)
network.add_argument(
'--chunked',
"--chunked",
default=False,
action='store_true',
action="store_true",
short_help=(
'Enable streaming via chunked transfer encoding. '
'The Transfer-Encoding header is set to chunked.'
)
"Enable streaming via chunked transfer encoding. "
"The Transfer-Encoding header is set to chunked."
),
)
#######################################################################
# SSL
#######################################################################
ssl = options.add_group('SSL')
ssl = options.add_group("SSL")
ssl.add_argument(
'--verify',
default='yes',
"--verify",
default="yes",
short_help='If "no", skip SSL verification. If a file path, use it as a CA bundle.',
help="""
Set to "no" (or "false") to skip checking the host's SSL certificate.
@ -833,10 +858,10 @@ ssl.add_argument(
""",
)
ssl.add_argument(
'--ssl',
dest='ssl_version',
"--ssl",
dest="ssl_version",
choices=sorted(AVAILABLE_SSL_VERSION_ARG_MAPPING.keys()),
short_help='The desired protocol version to used.',
short_help="The desired protocol version to used.",
help="""
The desired protocol version to use. This will default to
SSL v2.3 which will negotiate the highest protocol that both
@ -852,8 +877,8 @@ CIPHERS_CURRENT_DEFAULTS = (
See `http --help` for the default ciphers list on you system.
"""
if IS_MAN_PAGE else
f"""
if IS_MAN_PAGE
else f"""
By default, the following ciphers are used on your system:
{DEFAULT_SSL_CIPHERS_STRING}
@ -861,21 +886,21 @@ CIPHERS_CURRENT_DEFAULTS = (
"""
)
ssl.add_argument(
'--ciphers',
short_help='A string in the OpenSSL cipher list format.',
"--ciphers",
short_help="A string in the OpenSSL cipher list format.",
help=f"""
A string in the OpenSSL cipher list format.
{CIPHERS_CURRENT_DEFAULTS}
"""
""",
)
ssl.add_argument(
'--cert',
"--cert",
default=None,
type=readable_file_arg,
short_help='Specifies a local cert to use as the client-side SSL certificate.',
short_help="Specifies a local cert to use as the client-side SSL certificate.",
help="""
You can specify a local cert to use as client side SSL certificate.
This file may either contain both private key and certificate or you may
@ -884,10 +909,10 @@ ssl.add_argument(
""",
)
ssl.add_argument(
'--cert-key',
"--cert-key",
default=None,
type=readable_file_arg,
short_help='The private key to use with SSL. Only needed if --cert is given.',
short_help="The private key to use with SSL. Only needed if --cert is given.",
help="""
The private key to use with SSL. Only needed if --cert is given and the
certificate file does not contain the private key.
@ -896,63 +921,63 @@ ssl.add_argument(
)
ssl.add_argument(
'--cert-key-pass',
"--cert-key-pass",
default=None,
type=SSLCredentials,
short_help='The passphrase to be used to with the given private key.',
short_help="The passphrase to be used to with the given private key.",
help="""
The passphrase to be used to with the given private key. Only needed if --cert-key
is given and the key file requires a passphrase.
If not provided, youll be prompted interactively.
"""
""",
)
#######################################################################
# Troubleshooting
#######################################################################
troubleshooting = options.add_group('Troubleshooting')
troubleshooting = options.add_group("Troubleshooting")
troubleshooting.add_argument(
'--ignore-stdin',
'-I',
action='store_true',
"--ignore-stdin",
"-I",
action="store_true",
default=False,
short_help='Do not attempt to read stdin'
short_help="Do not attempt to read stdin",
)
troubleshooting.add_argument(
'--help',
action='help',
"--help",
action="help",
default=Qualifiers.SUPPRESS,
short_help='Show this help message and exit.',
short_help="Show this help message and exit.",
)
troubleshooting.add_argument(
'--manual',
action='manual',
"--manual",
action="manual",
default=Qualifiers.SUPPRESS,
short_help='Show the full manual.',
short_help="Show the full manual.",
)
troubleshooting.add_argument(
'--version',
action='version',
"--version",
action="version",
version=__version__,
short_help='Show version and exit.',
short_help="Show version and exit.",
)
troubleshooting.add_argument(
'--traceback',
action='store_true',
"--traceback",
action="store_true",
default=False,
short_help='Prints the exception traceback should one occur.',
short_help="Prints the exception traceback should one occur.",
)
troubleshooting.add_argument(
'--default-scheme',
default='http',
short_help='The default scheme to use if not specified in the URL.'
"--default-scheme",
default="http",
short_help="The default scheme to use if not specified in the URL.",
)
troubleshooting.add_argument(
'--debug',
action='store_true',
"--debug",
action="store_true",
default=False,
short_help='Print useful diagnostic information for bug reports.',
short_help="Print useful diagnostic information for bug reports.",
help="""
Prints the exception traceback should one occur, as well as other
information useful for debugging HTTPie itself and for reporting bugs.

View File

@ -3,7 +3,7 @@ import os
import platform
import sys
import socket
from typing import List, Optional, Union, Callable
from typing import List, Optional, Union, Callable, Iterable, Dict, Tuple
import requests
from pygments import __version__ as pygments_version
@ -12,21 +12,29 @@ from requests import __version__ as requests_version
from . import __version__ as httpie_version
from .cli.constants import OUT_REQ_BODY
from .cli.nested_json import NestedJSONSyntaxError
from .client import collect_messages
from .client import collect_messages, RequestsMessage
from .context import Environment, LogLevel
from .downloads import Downloader
from .http_parser import http_parser
from .models import (
RequestsMessageKind,
OutputOptions
from .http_parser import (
parse_single_request,
replace_global,
split_requests,
replace_dependencies
)
from .models import RequestsMessageKind, OutputOptions
from .output.models import ProcessingOptions
from .output.writer import write_message, write_stream, write_raw_data, MESSAGE_SEPARATOR_BYTES
from .output.writer import (
write_message,
write_stream,
write_raw_data,
MESSAGE_SEPARATOR_BYTES,
)
from .plugins.registry import plugin_manager
from .status import ExitStatus, http_status_to_exit_status
from .utils import unwrap_context
from .internal.update_warnings import check_updates
from .internal.daemon_runner import is_daemon_mode, run_daemon_task
from pathlib import Path
# noinspection PyDefaultArgument
@ -49,27 +57,27 @@ def raw_main(
if use_default_options and env.config.default_options:
args = env.config.default_options + args
include_debug_info = '--debug' in args
include_traceback = include_debug_info or '--traceback' in args
include_debug_info = "--debug" in args
include_traceback = include_debug_info or "--traceback" in args
def handle_generic_error(e, annotation=None):
msg = str(e)
if hasattr(e, 'request'):
if hasattr(e, "request"):
request = e.request
if hasattr(request, 'url'):
if hasattr(request, "url"):
msg = (
f'{msg} while doing a {request.method}'
f' request to URL: {request.url}'
f"{msg} while doing a {request.method}"
f" request to URL: {request.url}"
)
if annotation:
msg += annotation
env.log_error(f'{type(e).__name__}: {msg}')
env.log_error(f"{type(e).__name__}: {msg}")
if include_traceback:
raise
if include_debug_info:
print_debug_info(env)
if args == ['--debug']:
if args == ["--debug"]:
return ExitStatus.SUCCESS
exit_status = ExitStatus.SUCCESS
@ -85,13 +93,13 @@ def raw_main(
raise
exit_status = ExitStatus.ERROR
except KeyboardInterrupt:
env.stderr.write('\n')
env.stderr.write("\n")
if include_traceback:
raise
exit_status = ExitStatus.ERROR_CTRL_C
except SystemExit as e:
if e.code != ExitStatus.SUCCESS:
env.stderr.write('\n')
env.stderr.write("\n")
if include_traceback:
raise
exit_status = ExitStatus.ERROR
@ -103,33 +111,32 @@ def raw_main(
env=env,
)
except KeyboardInterrupt:
env.stderr.write('\n')
env.stderr.write("\n")
if include_traceback:
raise
exit_status = ExitStatus.ERROR_CTRL_C
except SystemExit as e:
if e.code != ExitStatus.SUCCESS:
env.stderr.write('\n')
env.stderr.write("\n")
if include_traceback:
raise
exit_status = ExitStatus.ERROR
except requests.Timeout:
exit_status = ExitStatus.ERROR_TIMEOUT
env.log_error(f'Request timed out ({parsed_args.timeout}s).')
env.log_error(f"Request timed out ({parsed_args.timeout}s).")
except requests.TooManyRedirects:
exit_status = ExitStatus.ERROR_TOO_MANY_REDIRECTS
env.log_error(
f'Too many redirects'
f' (--max-redirects={parsed_args.max_redirects}).'
f"Too many redirects (--max-redirects={parsed_args.max_redirects})."
)
except requests.exceptions.ConnectionError as exc:
annotation = None
original_exc = unwrap_context(exc)
if isinstance(original_exc, socket.gaierror):
if original_exc.errno == socket.EAI_AGAIN:
annotation = '\nCouldnt connect to a DNS server. Please check your connection and try again.'
annotation = "\nCouldnt connect to a DNS server. Please check your connection and try again."
elif original_exc.errno == socket.EAI_NONAME:
annotation = '\nCouldnt resolve the given hostname. Please check the URL and try again.'
annotation = "\nCouldnt resolve the given hostname. Please check the URL and try again."
propagated_exc = original_exc
else:
propagated_exc = exc
@ -145,8 +152,7 @@ def raw_main(
def main(
args: List[Union[str, bytes]] = sys.argv,
env: Environment = Environment()
args: List[Union[str, bytes]] = sys.argv, env: Environment = Environment()
) -> ExitStatus:
"""
The main function.
@ -160,12 +166,7 @@ def main(
from .cli.definition import parser
return raw_main(
parser=parser,
main_program=program,
args=args,
env=env
)
return raw_main(parser=parser, main_program=program, args=args, env=env)
def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
@ -174,7 +175,7 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
"""
def actual_program(args: argparse.Namespace, env: Environment) -> ExitStatus:
def actual_program(args: argparse.Namespace, env: Environment) -> Tuple[ExitStatus, Iterable[RequestsMessage]]:
# TODO: Refactor and drastically simplify, especially so that the separator logic is elsewhere.
exit_status = ExitStatus.SUCCESS
downloader = None
@ -183,7 +184,7 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
processing_options = ProcessingOptions.from_raw_args(args)
def separate():
getattr(env.stdout, 'buffer', env.stdout).write(MESSAGE_SEPARATOR_BYTES)
getattr(env.stdout, "buffer", env.stdout).write(MESSAGE_SEPARATOR_BYTES)
def request_body_read_callback(chunk: bytes):
should_pipe_to_stdout = bool(
@ -199,27 +200,35 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
env,
chunk,
processing_options=processing_options,
headers=initial_request.headers
headers=initial_request.headers,
)
try:
if args.download:
args.follow = True # --download implies --follow.
downloader = Downloader(env, output_file=args.output_file, resume=args.download_resume)
downloader = Downloader(
env, output_file=args.output_file, resume=args.download_resume
)
downloader.pre_request(args.headers)
messages = collect_messages(env, args=args,
request_body_read_callback=request_body_read_callback)
messages = collect_messages(
env, args=args, request_body_read_callback=request_body_read_callback
)
force_separator = False
prev_with_body = False
# Process messages as theyre generated
for message in messages:
output_options = OutputOptions.from_message(message, args.output_options)
output_options = OutputOptions.from_message(
message, args.output_options
)
do_write_body = output_options.body
if prev_with_body and output_options.any() and (force_separator or not env.stdout_isatty):
if (
prev_with_body
and output_options.any()
and (force_separator or not env.stdout_isatty)
):
# Separate after a previous message with body, if needed. See test_tokens.py.
separate()
force_separator = False
@ -233,16 +242,21 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
else:
final_response = message
if args.check_status or downloader:
exit_status = http_status_to_exit_status(http_status=message.status_code, follow=args.follow)
if exit_status != ExitStatus.SUCCESS and (not env.stdout_isatty or args.quiet == 1):
env.log_error(f'HTTP {message.raw.status} {message.raw.reason}', level=LogLevel.WARNING)
exit_status = http_status_to_exit_status(
http_status=message.status_code, follow=args.follow
)
if exit_status != ExitStatus.SUCCESS and (
not env.stdout_isatty or args.quiet == 1
):
env.log_error(
f"HTTP {message.raw.status} {message.raw.reason}",
level=LogLevel.WARNING,
)
write_message(
requests_message=message,
env=env,
output_options=output_options._replace(
body=do_write_body
),
processing_options=processing_options
output_options=output_options._replace(body=do_write_body),
processing_options=processing_options,
)
prev_with_body = output_options.body
@ -260,59 +274,78 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
if downloader.interrupted:
exit_status = ExitStatus.ERROR
env.log_error(
f'Incomplete download: size={downloader.status.total_size};'
f' downloaded={downloader.status.downloaded}'
f"Incomplete download: size={downloader.status.total_size};"
f" downloaded={downloader.status.downloaded}"
)
return exit_status
return exit_status, messages
finally:
if downloader and not downloader.finished:
downloader.failed()
if args.output_file and args.output_file_specified:
args.output_file.close()
if args.http_file:
# TODO: FILE PARSING TO REQUESTS ARRAY
requests_list = http_parser(args.url)
returns = []
for req in requests_list:
args.url = req.url
args.method = req.method
# args.headers = req.headers
# args.body = req.body
returns.append(actual_program(args, env))
return ExitStatus.SUCCESS if all(r is ExitStatus.SUCCESS for r in returns) else ExitStatus.ERROR
return actual_program(args, env)
http_file = Path(args.url)
if not http_file.exists():
raise FileNotFoundError(f"File not found: {args.url}")
if not http_file.is_file():
raise IsADirectoryError(f"Path is not a file: {args.url}")
http_contents = http_file.read_text()
raw_requests = split_requests(replace_global(http_contents))
raw_requests = [req.strip() for req in raw_requests if req.strip()]
parsed_requests = []
req_names = []
responses: Dict[str, RequestsMessage] = {}
Exit_status = []
for raw_req in raw_requests:
dependency_free_req = replace_dependencies(raw_req, responses)
new_req = parse_single_request(dependency_free_req)
if new_req is None:
continue
if new_req.name is not None:
req_names.append(new_req.name)
parsed_requests.append(new_req)
args.url = new_req.url
args.method = new_req.method
args.headers = new_req.headers
args.data = new_req.body
status, response = actual_program(args, env)
Exit_status.append(status)
if new_req.name is not None:
responses[new_req.name] = response
all_success = all(r is ExitStatus.SUCCESS for r in Exit_status)
return ExitStatus.SUCCESS if all_success else ExitStatus.ERROR
return actual_program(args, env)[0]
def print_debug_info(env: Environment):
env.stderr.writelines([
f'HTTPie {httpie_version}\n',
f'Requests {requests_version}\n',
f'Pygments {pygments_version}\n',
f'Python {sys.version}\n{sys.executable}\n',
f'{platform.system()} {platform.release()}',
])
env.stderr.write('\n\n')
env.stderr.writelines(
[
f"HTTPie {httpie_version}\n",
f"Requests {requests_version}\n",
f"Pygments {pygments_version}\n",
f"Python {sys.version}\n{sys.executable}\n",
f"{platform.system()} {platform.release()}",
]
)
env.stderr.write("\n\n")
env.stderr.write(repr(env))
env.stderr.write('\n\n')
env.stderr.write("\n\n")
env.stderr.write(repr(plugin_manager))
env.stderr.write('\n')
env.stderr.write("\n")
def decode_raw_args(
args: List[Union[str, bytes]],
stdin_encoding: str
) -> List[str]:
def decode_raw_args(args: List[Union[str, bytes]], stdin_encoding: str) -> List[str]:
"""
Convert all bytes args to str
by decoding them using stdin encoding.
"""
return [
arg.decode(stdin_encoding)
if type(arg) is bytes else arg
for arg in args
]
return [arg.decode(stdin_encoding) if type(arg) is bytes else arg for arg in args]

View File

@ -1,34 +1,167 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import re
from re import Match
from .client import RequestsMessage
from typing import Iterable, Dict, List
import json
from jsonpath_ng import parse as jsonpath_parse
from lxml import etree
@dataclass
class HttpFileRequest:
method: str
url: str
headers: dict
body: bytes
headers: Dict | None
body: bytes | None
name: str | None
def http_parser(filename: str) -> list[HttpFileRequest]:
http_file = Path(filename)
if not http_file.exists():
raise FileNotFoundError(f"File not found: {filename}")
if not http_file.is_file():
raise IsADirectoryError(f"Path is not a file: {filename}")
http_contents = http_file.read_text()
http_lines = [
line for line in http_contents.splitlines() if not line.startswith("#")
]
http_lines = [line for line in http_lines if line.strip()]
first_line = http_lines[0]
method, url = first_line.split(" ")
def split_requests(http_file_contents: str) -> List[str]:
"""Splits an HTTP file into individual requests but keeps the '###' in each request."""
parts = re.split(r"(^###.*)", http_file_contents, flags=re.MULTILINE)
requests = []
return [
HttpFileRequest(
method=method,
url=url,
headers={},
body=b"",
for i in range(1, len(parts), 2):
header = parts[i].strip()
body = parts[i + 1].strip() if i + 1 < len(parts) else ""
requests.append(f"{header}\n{body}")
return requests
def replace_dependencies(raw_http_request: str, responses: Dict[str, Iterable[RequestsMessage]]) -> str | None:
"""Replaces the dependencies dependent variables in the raw request with their values"""
def replace(match: Match[str]):
"""gives the string which should replaces the one given as a parameter"""
str = match.group(0)
var = str.lstrip("{").rstrip("}")
splitter = re.match(r"(?P<name>\w+)\.(?P<type>request|response)\.(?P<section>body|headers)\.(?P<extractor>.+)", var)
if not splitter:
raise ValueError(f"Difficulties replacing {str} in {raw_http_request}")
Dict = splitter.groupdict()
req_name = Dict["name"]
req_type = Dict["type"]
section = Dict["section"]
extractor = Dict["extractor"]
if responses.get(req_name) is None:
raise ValueError(f"{req_name} is not an existing request's name")
if req_type == "request":
msg = responses[req_name][0]
elif req_type == "response":
msg: RequestsMessage = responses[req_name][1]
if section == "body":
if extractor == "*":
return msg.body # Return full body
elif extractor.startswith("$."): # JSONPath
try:
json_data = msg.json() # Convert response to JSON
jsonpath_expr = jsonpath_parse(extractor)
parsed_data = jsonpath_expr.find(json_data)
return [matched.value for matched in parsed_data] if parsed_data else None
except json.JSONDecodeError:
return None # Not a valid JSON
elif extractor.startswith("/"): # XPath
try:
xml_tree = etree.fromstring(msg.content) # Parse XML
return xml_tree.xpath(extractor)
except etree.XMLSyntaxError:
return None # Not a valid XML
elif section == "headers":
return msg.headers[extractor]
raise ValueError(f"Incoherent request {str}")
pattern = r"\{\{(.*?)\}\}"
return re.sub(pattern, replace, raw_http_request)
def get_name(raw_http_request: str) -> str | None:
"""
Returns the name of the HTTP request if it has one, None otherwise.
The expected pattern is either a comment starting with '//' or '#' (optionally preceded by whitespace)
followed by '@name' and the name.
"""
# Allow leading whitespace before the comment marker.
matches = re.findall(r"^\s*(?://|#)\s*@name\s+(.+)$", raw_http_request, re.MULTILINE)
if len(matches) == 0:
return None
elif len(matches) == 1:
return matches[0].strip() # strip extra whitespace if any
else:
# TODO: Handle error for multiple names found. Currently returns None.
return None
def replace_global(http_file_contents_raw: str) -> str:
"""finds and replaces all global variables by their values"""
# possible error when @variable=value is in the body
matches = re.findall(r"^@([A-Za-z0-9_]+)=(.+)$", http_file_contents_raw, flags=re.MULTILINE)
http_file_contents_cooking = http_file_contents_raw
for variableName, value in matches:
http_file_contents_cooking = re.sub(
rf"{{{{({re.escape(variableName)})}}}}", value, http_file_contents_cooking
)
]
return http_file_contents_cooking
def extract_headers(raw_text: List[str]) -> Dict:
"""
Extract the headers of the .http file
Args:
raw_text: the lines of the .http file containing the headers
Returns:
Dict: containing the parsed headers
"""
headers = {}
for line in raw_text:
if not line.strip() or ':' not in line:
continue
header_name, header_value = line.split(':', 1)
headers[header_name.strip()] = header_value.strip()
return headers
def parse_body(raw_text: str) -> bytes:
"""
parse the body of the .http file
"""
return b""
def parse_single_request(raw_text: str) -> HttpFileRequest:
"""Parse a single request from .http file format to HttpFileRequest """
lines = raw_text.strip().splitlines()
lines = [line.strip() for line in lines if not line.strip().startswith("#")]
method, url = lines[0].split(" ")
raw_headers = []
raw_body = []
is_body = False
for line in lines[1:]:
if not line.strip():
is_body = True
continue
if not is_body:
raw_headers.append(line)
else:
raw_body.append(line)
return HttpFileRequest(
method=method,
url=url,
headers=extract_headers(raw_headers),
body=parse_body("\n".join(raw_body)),
name=get_name(raw_text)
)

View File

@ -58,6 +58,8 @@ install_requires =
importlib-metadata>=1.4.0; python_version<"3.8"
rich>=9.10.0
colorama>=0.2.4; sys_platform=="win32"
jsonpath_ng
lxml
python_requires = >=3.7

569
tests/test_http_parser.py Normal file
View File

@ -0,0 +1,569 @@
import pytest
import requests
from httpie.http_parser import (
split_requests,
replace_dependencies,
get_name,
replace_global,
extract_headers,
parse_body,
parse_single_request,
)
def normalize_whitespace(text):
"""Removes excessive newlines and spaces for consistent comparison."""
return "\n".join(line.rstrip() for line in text.splitlines()).strip()
# TESTS FOR split_requests -->> REQ_002
def test_split_requests():
# Test case: Multiple HTTP requests
http_file = """### Request 1
GET /users
### Request 2
POST /users
Content-Type: application/json
{"name": "John"}"""
expected_output = [
"### Request 1\nGET /users",
"### Request 2\nPOST /users\nContent-Type: application/json\n\n{\"name\": \"John\"}"
]
assert list(map(normalize_whitespace, split_requests(http_file))) == list(
map(normalize_whitespace, expected_output)
)
def test_split_single_request():
"""
This test ensures that a single HTTP request with a '###' header is correctly parsed
without any unexpected modifications.
"""
http_file = """### Only Request
GET /status"""
expected_output = ["### Only Request\nGET /status"]
assert list(map(normalize_whitespace, split_requests(http_file))) == list(
map(normalize_whitespace, expected_output)
)
def test_split_empty_file():
"""
This test checks if an empty input correctly returns an empty list,
ensuring there are no errors when handling empty strings.
"""
assert split_requests("") == []
def test_split_request_no_body():
"""
This test verifies that requests with no body (only headers and method)
are parsed correctly without adding unnecessary spaces or newlines.
"""
http_file = """### No Body Request
GET /ping"""
expected_output = ["### No Body Request\nGET /ping"]
assert list(map(normalize_whitespace, split_requests(http_file))) == list(
map(normalize_whitespace, expected_output)
)
def test_split_request_with_extra_newlines():
"""
This test ensures that the function correctly handles requests that
contain extra blank lines while preserving necessary formatting.
"""
http_file = """### Request 1
GET /data
### Request 2
POST /submit
{"key": "value"}
"""
expected_output = [
"### Request 1\nGET /data", # Normalized extra newline
"### Request 2\nPOST /submit\n\n{\"key\": \"value\"}" # Normalized newlines inside request
]
assert list(map(normalize_whitespace, split_requests(http_file))) == list(
map(normalize_whitespace, expected_output)
)
def test_split_request_without_header():
"""
This test ensures that requests without a '###' header are ignored and
do not cause the function to fail. The function should return an empty list
in such cases.
"""
http_file = """GET /withoutHeader"""
expected_output = [] # No '###' header means no valid requests should be returned
assert split_requests(http_file) == expected_output
# TESTS FOR get_dependencies -->> REQ_007
def test_replace_dependencies_no_placeholders():
"""
This test verifies that if a request does not contain any {{placeholders}},
the function correctly doesn't change anything.
"""
raw_request = """GET /users"""
assert replace_dependencies(raw_request, None) == """GET /users"""
def test_replace_dependencies_invalid_dependency():
"""
This test ensures that if the request references a dependency that is
not in the provided possible_names list, the function correctly raises an ValueError.
"""
raw_request = """DELETE /items/{{InvalidRequest}}"""
responses = {"Request1": None, "Request2": None}
with pytest.raises(ValueError):
replace_dependencies(raw_request, responses)
def test_replace_dependencies_Req_single():
"""
This test checks that a single valid dependency is correctly extracted
from a request and returned in a list.
"""
raw_request = """GET /update/{{Request1.request.headers.id}}"""
url = "https://api.example.com"
request = requests.Request('GET', url)
response = None
responses = {"Request1": [request, response]}
request.headers["id"] = str(1)
assert replace_dependencies(raw_request, responses) == """GET /update/1"""
def test_replace_dependencies_PreReq_single():
"""
This test checks that a single valid dependency is correctly extracted
from a PreparedRequest and returned in a list.
"""
raw_request = """GET /update/{{Request1.request.headers.id}}"""
url = "https://api.example.com"
session = requests.Session()
request = requests.Request('GET', url)
prepared_request = session.prepare_request(request)
response = None
responses = {"Request1": [prepared_request, response]}
prepared_request.headers["id"] = str(1)
assert replace_dependencies(raw_request, responses) == """GET /update/1"""
def test_replace_multiple_dependencies():
"""
This test verifies that multiple dependencies are correctly identified
and replaced in the request.
"""
raw_request = """GET /update/{{Request1.request.headers.id}}/{{Request1.request.headers.name}}"""
url = "https://api.example.com"
request = requests.Request('GET', url)
response = None
responses = {"Request1": [request, response]}
request.headers["id"] = str(1)
request.headers["name"] = "Jack"
assert replace_dependencies(raw_request, responses) == """GET /update/1/Jack"""
def test_replace_dependencies_empty_request():
"""
This test checks that an empty request string returns None
since there are no placeholders.
"""
raw_request = ""
assert replace_dependencies(raw_request, None) == ""
# TESTS FOR get_name --> REQ_003
def test_get_name_with_hash_comment():
"""
Ensures that get_name correctly extracts a request name
when defined with '#' as a comment.
"""
raw_request = """# @name Request1
GET /users"""
expected_output = "Request1"
assert get_name(raw_request) == expected_output
def test_get_name_with_double_slash_comment():
"""
Ensures that get_name correctly extracts a request name
when defined with '//' as a comment.
"""
raw_request = """// @name GetUser
GET /users/{id}"""
expected_output = "GetUser"
assert get_name(raw_request) == expected_output
def test_get_name_no_name():
"""
Ensures that if no '@name' is present, get_name returns None.
"""
raw_request = """GET /users"""
assert get_name(raw_request) is None
def test_get_name_multiple_names():
"""
Ensures that if multiple '@name' occurrences exist,
the function returns None to indicate an error.
"""
raw_request = """# @name FirstName
GET /users
# @name SecondName
POST /users"""
assert get_name(raw_request) is None # Multiple names should result in None
def test_get_name_with_extra_whitespace():
"""
Ensures that extra spaces around @name do not affect the extracted name.
"""
raw_request = """ # @name MyRequest
GET /data"""
expected_output = "MyRequest"
assert get_name(raw_request) == expected_output
def test_get_name_without_request():
"""
Ensures that a request with only an @name definition still correctly extracts the name.
"""
raw_request = """// @name LoneRequest"""
expected_output = "LoneRequest"
assert get_name(raw_request) == expected_output
def test_get_name_inline_invalid():
"""
Ensures that @name only works when it starts a line,
and does not extract names from inline comments.
"""
raw_request = """GET /users # @name InlineName"""
assert get_name(raw_request) is None # Inline @name should not be detected
def test_get_name_mixed_comment_styles():
"""
Ensures that if multiple valid @name comments exist,
the function returns None to indicate an error.
"""
raw_request = """# @name FirstRequest
// @name SecondRequest
GET /items"""
assert get_name(raw_request) is None
# TESTS FOR replace_global --> REQ_005
def test_replace_global_no_definitions():
"""
Ensures that if no global variable definitions are present,
the file contents remain unchanged.
"""
raw_contents = "GET /users/{{id}}"
expected_output = raw_contents # No replacement should occur
assert replace_global(raw_contents) == expected_output
def test_replace_global_single_variable():
"""
Ensures that a single global variable definition is correctly used to replace
all its corresponding placeholders in the file.
"""
raw_contents = """@host=example.com
GET http://{{host}}/users"""
expected_output = """@host=example.com
GET http://example.com/users"""
assert replace_global(raw_contents) == expected_output
def test_replace_global_multiple_variables():
"""
Ensures that multiple global variable definitions are correctly used to replace
their corresponding placeholders in the file.
"""
raw_contents = """@host=example.com
@port=8080
GET http://{{host}}:{{port}}/users"""
expected_output = """@host=example.com
@port=8080
GET http://example.com:8080/users"""
assert replace_global(raw_contents) == expected_output
def test_replace_global_multiple_occurrences():
"""
Ensures that if a variable appears multiple times in the file,
all occurrences are replaced.
"""
raw_contents = """@name=Test
GET /api?param={{name}}&other={{name}}"""
expected_output = """@name=Test
GET /api?param=Test&other=Test"""
assert replace_global(raw_contents) == expected_output
def test_replace_global_value_with_spaces():
"""
Ensures that global variable definitions with spaces in their values are handled correctly.
"""
raw_contents = """@greeting=Hello World
GET /message?text={{greeting}}"""
expected_output = """@greeting=Hello World
GET /message?text=Hello World"""
assert replace_global(raw_contents) == expected_output
def test_replace_global_definition_without_placeholder():
"""
Ensures that if a global variable is defined but its placeholder is not present,
the file remains unchanged.
"""
raw_contents = """@unused=Value
GET /info"""
expected_output = raw_contents # No replacement should occur
assert replace_global(raw_contents) == expected_output
# TESTS FOR extract_headers --> REQ_003
def test_extract_headers_empty():
"""
Test 1: Empty list should return an empty dictionary.
"""
raw_text = []
expected = {}
assert extract_headers(raw_text) == expected
def test_extract_headers_only_empty_lines():
"""
Test 2: Lines that are empty or only whitespace should be ignored.
"""
raw_text = ["", " ", "\t"]
expected = {}
assert extract_headers(raw_text) == expected
def test_extract_headers_single_header():
"""
Test 3: A single valid header line.
"""
raw_text = ["Content-Type: application/json"]
expected = {"Content-Type": "application/json"}
assert extract_headers(raw_text) == expected
def test_extract_headers_multiple_headers():
"""
Test 4: Multiple header lines should be parsed into a dictionary.
"""
raw_text = [
"Content-Type: application/json",
"Authorization: Bearer token123"
]
expected = {
"Content-Type": "application/json",
"Authorization": "Bearer token123"
}
assert extract_headers(raw_text) == expected
def test_extract_headers_line_without_colon():
"""
Test 5: Lines without a colon should be ignored.
"""
raw_text = [
"This is not a header",
"Content-Length: 123"
]
expected = {"Content-Length": "123"}
assert extract_headers(raw_text) == expected
def test_extract_headers_extra_spaces():
"""
Test 6: Extra whitespace around header names and values should be trimmed.
"""
raw_text = [
" Accept : text/html "
]
expected = {"Accept": "text/html"}
assert extract_headers(raw_text) == expected
def test_extract_headers_multiple_colons():
"""
Test 7: Only the first colon should be used to split the header name and value.
"""
raw_text = [
"Custom-Header: value:with:colons"
]
expected = {"Custom-Header": "value:with:colons"}
assert extract_headers(raw_text) == expected
def test_extract_headers_duplicate_headers():
"""
Test 8: If a header appears more than once, the last occurrence should overwrite previous ones.
"""
raw_text = [
"X-Header: one",
"X-Header: two"
]
expected = {"X-Header": "two"}
assert extract_headers(raw_text) == expected
# TESTS FOR parse_body -->> REQ_002
# TODO: create tests after function definition is done
# TESTS FOR parse_single_request -->> REQ_002
def test_parse_single_request_minimal():
"""
A minimal HTTP request that only contains the request line (method and URL).
Expected:
- method and URL are parsed correctly.
- headers is an empty dict.
- body is empty (after processing by parse_body).
- dependencies is an empty dict.
- name is None (since no @name comment exists).
"""
raw_text = "GET http://example.com"
result = parse_single_request(raw_text)
assert result.method == "GET"
assert result.url == "http://example.com"
assert result.headers == {}
expected_body = parse_body("")
assert result.body == expected_body
assert result.name is None
def test_parse_single_request_with_headers_and_body():
"""
Tests a request that includes a request line, headers, and a body.
Expected:
- Correctly parsed method and URL.
- Headers are extracted into a dictionary.
- The body is passed through parse_body and matches the expected output.
- No @name is defined, so name is None.
"""
raw_text = """POST http://example.com/api
Content-Type: application/json
Authorization: Bearer token
{
"key": "value"
}"""
result = parse_single_request(raw_text)
assert result.method == "POST"
assert result.url == "http://example.com/api"
assert result.headers == {
"Content-Type": "application/json",
"Authorization": "Bearer token"
}
expected_body = parse_body("{\n \"key\": \"value\"\n}")
assert result.body == expected_body
assert result.name is None
def test_parse_single_request_with_name():
"""
Tests a request that includes a @name comment.
The @name line is removed from the parsed lines (since lines starting with '#' are filtered out)
but get_name is still applied on the original raw text.
Expected:
- name is extracted as defined by get_name.
- Other fields (method, URL, headers, body) are parsed normally.
"""
raw_text = """# @name MyTestRequest
GET http://example.com
Content-Type: text/plain
Hello, world!
"""
result = parse_single_request(raw_text)
assert result.method == "GET"
assert result.url == "http://example.com"
assert result.headers == {"Content-Type": "text/plain"}
expected_body = parse_body("Hello, world!")
assert result.body == expected_body
assert result.name == "MyTestRequest"
def test_parse_single_request_extra_blank_lines():
"""
Tests that multiple blank lines (which trigger the switch from headers to body)
are handled properly.
Expected:
- The request line is parsed.
- Headers are extracted before the first blank line.
- Everything after the blank lines is treated as the body.
"""
raw_text = """PUT http://example.com/update
Accept: application/json
Line one of the body.
Line two of the body.
"""
result = parse_single_request(raw_text)
assert result.method == "PUT"
assert result.url == "http://example.com/update"
assert result.headers == {"Accept": "application/json"}
expected_body = parse_body("Line one of the body.\nLine two of the body.")
assert result.body == expected_body
assert result.name is None
def test_parse_single_request_ignore_comments():
"""
Tests that lines starting with '#' (comments) are removed from the parsed headers.
Note: Even if the @name line is a comment, get_name is called on the original raw text,
so it may still extract a name.
Expected:
- Headers only include valid header lines.
- The @name is still extracted if present in the raw text.
"""
raw_text = """# @name CommentedRequest
GET http://example.com/data
# This comment should be ignored
Content-Length: 123
"""
result = parse_single_request(raw_text)
assert result.method == "GET"
assert result.url == "http://example.com/data"
assert result.headers == {"Content-Length": "123"}
expected_body = parse_body("")
assert result.body == expected_body
assert result.name == "CommentedRequest"
if __name__ == "__main__":
pytest.main()