Merge pull request #11 from Group-18-DD2480/feat-http-file-cli-argument

Add --http-file CLI argument and read file
This commit is contained in:
Elias Floreteng 2025-03-02 14:49:44 +01:00 committed by GitHub
commit dd29414308
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 158 additions and 88 deletions

View File

@ -203,6 +203,10 @@ class HTTPieArgumentParser(BaseHTTPieArgumentParser):
}
def _process_url(self):
if self.args.http_file:
# do not add default scheme
# treat URL as a filename if --http-file is specified
return
if self.args.url.startswith('://'):
# Paste URL & add space shortcut: `http ://pie.dev` → `http://pie.dev`
self.args.url = self.args.url[3:]

View File

@ -218,6 +218,18 @@ content_types.add_argument(
""",
)
content_types.add_argument(
"--http-file",
action="store_true",
default=False,
short_help="Parse and send an HTTP request from a .http file",
help="""
Parse and send an HTTP request from a file in .http format.
The file should contain a valid HTTP request with headers and body.
If this is specified, URL will be treated as a file path.
""",
)
#######################################################################
# Content processing.
#######################################################################

View File

@ -15,6 +15,7 @@ from .cli.nested_json import NestedJSONSyntaxError
from .client import collect_messages
from .context import Environment, LogLevel
from .downloads import Downloader
from .http_parser import http_parser
from .models import (
RequestsMessageKind,
OutputOptions
@ -172,99 +173,118 @@ def program(args: argparse.Namespace, env: Environment) -> ExitStatus:
The main program without error handling.
"""
# TODO: Refactor and drastically simplify, especially so that the separator logic is elsewhere.
exit_status = ExitStatus.SUCCESS
downloader = None
initial_request: Optional[requests.PreparedRequest] = None
final_response: Optional[requests.Response] = None
processing_options = ProcessingOptions.from_raw_args(args)
def separate():
getattr(env.stdout, 'buffer', env.stdout).write(MESSAGE_SEPARATOR_BYTES)
def actual_program(args: argparse.Namespace, env: Environment) -> ExitStatus:
# TODO: Refactor and drastically simplify, especially so that the separator logic is elsewhere.
exit_status = ExitStatus.SUCCESS
downloader = None
initial_request: Optional[requests.PreparedRequest] = None
final_response: Optional[requests.Response] = None
processing_options = ProcessingOptions.from_raw_args(args)
def request_body_read_callback(chunk: bytes):
should_pipe_to_stdout = bool(
# Request body output desired
OUT_REQ_BODY in args.output_options
# & not `.read()` already pre-request (e.g., for compression)
and initial_request
# & non-EOF chunk
and chunk
)
if should_pipe_to_stdout:
return write_raw_data(
env,
chunk,
processing_options=processing_options,
headers=initial_request.headers
def separate():
getattr(env.stdout, 'buffer', env.stdout).write(MESSAGE_SEPARATOR_BYTES)
def request_body_read_callback(chunk: bytes):
should_pipe_to_stdout = bool(
# Request body output desired
OUT_REQ_BODY in args.output_options
# & not `.read()` already pre-request (e.g., for compression)
and initial_request
# & non-EOF chunk
and chunk
)
try:
if args.download:
args.follow = True # --download implies --follow.
downloader = Downloader(env, output_file=args.output_file, resume=args.download_resume)
downloader.pre_request(args.headers)
messages = collect_messages(env, args=args,
request_body_read_callback=request_body_read_callback)
force_separator = False
prev_with_body = False
# Process messages as theyre generated
for message in messages:
output_options = OutputOptions.from_message(message, args.output_options)
do_write_body = output_options.body
if prev_with_body and output_options.any() and (force_separator or not env.stdout_isatty):
# Separate after a previous message with body, if needed. See test_tokens.py.
separate()
force_separator = False
if output_options.kind is RequestsMessageKind.REQUEST:
if not initial_request:
initial_request = message
if output_options.body:
is_streamed_upload = not isinstance(message.body, (str, bytes))
do_write_body = not is_streamed_upload
force_separator = is_streamed_upload and env.stdout_isatty
else:
final_response = message
if args.check_status or downloader:
exit_status = http_status_to_exit_status(http_status=message.status_code, follow=args.follow)
if exit_status != ExitStatus.SUCCESS and (not env.stdout_isatty or args.quiet == 1):
env.log_error(f'HTTP {message.raw.status} {message.raw.reason}', level=LogLevel.WARNING)
write_message(
requests_message=message,
env=env,
output_options=output_options._replace(
body=do_write_body
),
processing_options=processing_options
)
prev_with_body = output_options.body
# Cleanup
if force_separator:
separate()
if downloader and exit_status == ExitStatus.SUCCESS:
# Last response body download.
download_stream, download_to = downloader.start(
initial_url=initial_request.url,
final_response=final_response,
)
write_stream(stream=download_stream, outfile=download_to, flush=False)
downloader.finish()
if downloader.interrupted:
exit_status = ExitStatus.ERROR
env.log_error(
f'Incomplete download: size={downloader.status.total_size};'
f' downloaded={downloader.status.downloaded}'
if should_pipe_to_stdout:
return write_raw_data(
env,
chunk,
processing_options=processing_options,
headers=initial_request.headers
)
return exit_status
finally:
if downloader and not downloader.finished:
downloader.failed()
if args.output_file and args.output_file_specified:
args.output_file.close()
try:
if args.download:
args.follow = True # --download implies --follow.
downloader = Downloader(env, output_file=args.output_file, resume=args.download_resume)
downloader.pre_request(args.headers)
messages = collect_messages(env, args=args,
request_body_read_callback=request_body_read_callback)
force_separator = False
prev_with_body = False
# Process messages as theyre generated
for message in messages:
output_options = OutputOptions.from_message(message, args.output_options)
do_write_body = output_options.body
if prev_with_body and output_options.any() and (force_separator or not env.stdout_isatty):
# Separate after a previous message with body, if needed. See test_tokens.py.
separate()
force_separator = False
if output_options.kind is RequestsMessageKind.REQUEST:
if not initial_request:
initial_request = message
if output_options.body:
is_streamed_upload = not isinstance(message.body, (str, bytes))
do_write_body = not is_streamed_upload
force_separator = is_streamed_upload and env.stdout_isatty
else:
final_response = message
if args.check_status or downloader:
exit_status = http_status_to_exit_status(http_status=message.status_code, follow=args.follow)
if exit_status != ExitStatus.SUCCESS and (not env.stdout_isatty or args.quiet == 1):
env.log_error(f'HTTP {message.raw.status} {message.raw.reason}', level=LogLevel.WARNING)
write_message(
requests_message=message,
env=env,
output_options=output_options._replace(
body=do_write_body
),
processing_options=processing_options
)
prev_with_body = output_options.body
# Cleanup
if force_separator:
separate()
if downloader and exit_status == ExitStatus.SUCCESS:
# Last response body download.
download_stream, download_to = downloader.start(
initial_url=initial_request.url,
final_response=final_response,
)
write_stream(stream=download_stream, outfile=download_to, flush=False)
downloader.finish()
if downloader.interrupted:
exit_status = ExitStatus.ERROR
env.log_error(
f'Incomplete download: size={downloader.status.total_size};'
f' downloaded={downloader.status.downloaded}'
)
return exit_status
finally:
if downloader and not downloader.finished:
downloader.failed()
if args.output_file and args.output_file_specified:
args.output_file.close()
if args.http_file:
# TODO: FILE PARSING TO REQUESTS ARRAY
requests_list = http_parser(args.url)
returns = []
for req in requests_list:
args.url = req.url
args.method = req.method
# args.headers = req.headers
# args.body = req.body
returns.append(actual_program(args, env))
return ExitStatus.SUCCESS if all(r is ExitStatus.SUCCESS for r in returns) else ExitStatus.ERROR
return actual_program(args, env)
def print_debug_info(env: Environment):

34
httpie/http_parser.py Normal file
View File

@ -0,0 +1,34 @@
from dataclasses import dataclass
from pathlib import Path
@dataclass
class HttpFileRequest:
method: str
url: str
headers: dict
body: bytes
def http_parser(filename: str) -> list[HttpFileRequest]:
http_file = Path(filename)
if not http_file.exists():
raise FileNotFoundError(f"File not found: {filename}")
if not http_file.is_file():
raise IsADirectoryError(f"Path is not a file: {filename}")
http_contents = http_file.read_text()
http_lines = [
line for line in http_contents.splitlines() if not line.startswith("#")
]
http_lines = [line for line in http_lines if line.strip()]
first_line = http_lines[0]
method, url = first_line.split(" ")
return [
HttpFileRequest(
method=method,
url=url,
headers={},
body=b"",
)
]