import zlib from typing import Callable, IO, Iterable, Tuple, Union from urllib.parse import urlencode import requests from requests.utils import super_len from requests_toolbelt import MultipartEncoder from .cli.dicts import MultipartRequestDataDict, RequestDataDict class ChunkedUploadStream: def __init__(self, stream: Iterable, callback: Callable): self.callback = callback self.stream = stream def __iter__(self) -> Iterable[Union[str, bytes]]: for chunk in self.stream: self.callback(chunk) yield chunk class ChunkedMultipartUploadStream: chunk_size = 100 * 1024 def __init__(self, encoder: MultipartEncoder): self.encoder = encoder def __iter__(self) -> Iterable[Union[str, bytes]]: while True: chunk = self.encoder.read(self.chunk_size) if not chunk: break yield chunk def prepare_request_body( body: Union[str, bytes, IO, MultipartEncoder, RequestDataDict], body_read_callback: Callable[[bytes], bytes], content_length_header_value: int = None, chunked=False, offline=False, ) -> Union[str, bytes, IO, MultipartEncoder, ChunkedUploadStream]: is_file_like = hasattr(body, 'read') if isinstance(body, RequestDataDict): body = urlencode(body, doseq=True) if offline: if is_file_like: return body.read() return body if not is_file_like: if chunked: body = ChunkedUploadStream( # Pass the entire body as one chunk. stream=(chunk.encode() for chunk in [body]), callback=body_read_callback, ) else: # File-like object. if not super_len(body): # Zero-length -> assume stdin. if content_length_header_value is None and not chunked: # # Read the whole stdin to determine `Content-Length`. # # TODO: Instead of opt-in --chunked, consider making # `Transfer-Encoding: chunked` for STDIN opt-out via # something like --no-chunked. # This would be backwards-incompatible so wait until v3.0.0. # body = body.read() else: orig_read = body.read def new_read(*args): chunk = orig_read(*args) body_read_callback(chunk) return chunk body.read = new_read if chunked: if isinstance(body, MultipartEncoder): body = ChunkedMultipartUploadStream( encoder=body, ) else: body = ChunkedUploadStream( stream=body, callback=body_read_callback, ) return body def get_multipart_data_and_content_type( data: MultipartRequestDataDict, boundary: str = None, content_type: str = None, ) -> Tuple[MultipartEncoder, str]: encoder = MultipartEncoder( fields=data.items(), boundary=boundary, ) if content_type: content_type = content_type.strip() if 'boundary=' not in content_type: content_type = f'{content_type}; boundary={encoder.boundary_value}' else: content_type = encoder.content_type data = encoder return data, content_type def compress_request( request: requests.PreparedRequest, always: bool, ): deflater = zlib.compressobj() if isinstance(request.body, str): body_bytes = request.body.encode() elif hasattr(request.body, 'read'): body_bytes = request.body.read() else: body_bytes = request.body deflated_data = deflater.compress(body_bytes) deflated_data += deflater.flush() is_economical = len(deflated_data) < len(body_bytes) if is_economical or always: request.body = deflated_data request.headers['Content-Encoding'] = 'deflate' request.headers['Content-Length'] = str(len(deflated_data))