forked from extern/httpie-cli
139 lines
4.0 KiB
Python
139 lines
4.0 KiB
Python
import zlib
|
|
from typing import Callable, IO, Iterable, Tuple, Union
|
|
from urllib.parse import urlencode
|
|
|
|
import requests
|
|
from requests.utils import super_len
|
|
from requests_toolbelt import MultipartEncoder
|
|
|
|
from httpie.cli.dicts import MultipartRequestDataDict, RequestDataDict
|
|
|
|
|
|
class ChunkedUploadStream:
|
|
def __init__(self, stream: Iterable, callback: Callable):
|
|
self.callback = callback
|
|
self.stream = stream
|
|
|
|
def __iter__(self) -> Iterable[Union[str, bytes]]:
|
|
for chunk in self.stream:
|
|
self.callback(chunk)
|
|
yield chunk
|
|
|
|
|
|
class ChunkedMultipartUploadStream:
|
|
chunk_size = 100 * 1024
|
|
|
|
def __init__(self, encoder: MultipartEncoder):
|
|
self.encoder = encoder
|
|
|
|
def __iter__(self) -> Iterable[Union[str, bytes]]:
|
|
while True:
|
|
chunk = self.encoder.read(self.chunk_size)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
|
|
def prepare_request_body(
|
|
body: Union[str, bytes, IO, MultipartEncoder, RequestDataDict],
|
|
body_read_callback: Callable[[bytes], bytes],
|
|
content_length_header_value: int = None,
|
|
chunked=False,
|
|
offline=False,
|
|
) -> Union[str, bytes, IO, MultipartEncoder, ChunkedUploadStream]:
|
|
|
|
is_file_like = hasattr(body, 'read')
|
|
|
|
if isinstance(body, RequestDataDict):
|
|
body = urlencode(body, doseq=True)
|
|
|
|
if offline:
|
|
if is_file_like:
|
|
return body.read()
|
|
return body
|
|
|
|
if not is_file_like:
|
|
if chunked:
|
|
body = ChunkedUploadStream(
|
|
# Pass the entire body as one chunk.
|
|
stream=(chunk.encode() for chunk in [body]),
|
|
callback=body_read_callback,
|
|
)
|
|
else:
|
|
# File-like object.
|
|
|
|
if not super_len(body):
|
|
# Zero-length -> assume stdin.
|
|
if content_length_header_value is None and not chunked:
|
|
#
|
|
# Read the whole stdin to determine `Content-Length`.
|
|
#
|
|
# TODO: Instead of opt-in --chunked, consider making
|
|
# `Transfer-Encoding: chunked` for STDIN opt-out via
|
|
# something like --no-chunked.
|
|
# This would be backwards-incompatible so wait until v3.0.0.
|
|
#
|
|
body = body.read()
|
|
else:
|
|
orig_read = body.read
|
|
|
|
def new_read(*args):
|
|
chunk = orig_read(*args)
|
|
body_read_callback(chunk)
|
|
return chunk
|
|
|
|
body.read = new_read
|
|
|
|
if chunked:
|
|
if isinstance(body, MultipartEncoder):
|
|
body = ChunkedMultipartUploadStream(
|
|
encoder=body,
|
|
)
|
|
else:
|
|
body = ChunkedUploadStream(
|
|
stream=body,
|
|
callback=body_read_callback,
|
|
)
|
|
|
|
return body
|
|
|
|
|
|
def get_multipart_data_and_content_type(
|
|
data: MultipartRequestDataDict,
|
|
boundary: str = None,
|
|
content_type: str = None,
|
|
) -> Tuple[MultipartEncoder, str]:
|
|
encoder = MultipartEncoder(
|
|
fields=data.items(),
|
|
boundary=boundary,
|
|
)
|
|
if content_type:
|
|
content_type = content_type.strip()
|
|
if 'boundary=' not in content_type:
|
|
content_type = f'{content_type}; boundary={encoder.boundary_value}'
|
|
else:
|
|
content_type = encoder.content_type
|
|
|
|
data = encoder
|
|
return data, content_type
|
|
|
|
|
|
def compress_request(
|
|
request: requests.PreparedRequest,
|
|
always: bool,
|
|
):
|
|
deflater = zlib.compressobj()
|
|
if isinstance(request.body, str):
|
|
body_bytes = request.body.encode()
|
|
elif hasattr(request.body, 'read'):
|
|
body_bytes = request.body.read()
|
|
else:
|
|
body_bytes = request.body
|
|
deflated_data = deflater.compress(body_bytes)
|
|
deflated_data += deflater.flush()
|
|
is_economical = len(deflated_data) < len(body_bytes)
|
|
if is_economical or always:
|
|
request.body = deflated_data
|
|
request.headers['Content-Encoding'] = 'deflate'
|
|
request.headers['Content-Length'] = str(len(deflated_data))
|