httpie-cli/httpie/output/streams.py

272 lines
9.5 KiB
Python

from abc import ABCMeta, abstractmethod
from itertools import chain
from typing import Callable, Iterable, Optional, Union
from .processing import Conversion, Formatting
from ..context import Environment
from ..encoding import smart_decode, smart_encode, UTF8
from ..models import HTTPMessage, OutputOptions, RequestsMessageKind
from ..utils import parse_content_type_header
BINARY_SUPPRESSED_NOTICE = (
b'\n'
b'+-----------------------------------------+\n'
b'| NOTE: binary data not shown in terminal |\n'
b'+-----------------------------------------+'
)
class DataSuppressedError(Exception):
message = None
class BinarySuppressedError(DataSuppressedError):
"""An error indicating that the body is binary and won't be written,
e.g., for terminal output)."""
message = BINARY_SUPPRESSED_NOTICE
class BaseStream(metaclass=ABCMeta):
"""Base HTTP message output stream class."""
def __init__(
self,
msg: HTTPMessage,
output_options: OutputOptions,
on_body_chunk_downloaded: Callable[[bytes], None] = None,
**kwargs
):
"""
:param msg: a :class:`models.HTTPMessage` subclass
:param output_options: a :class:`OutputOptions` instance to represent
which parts of the message is printed.
"""
assert output_options.any()
self.msg = msg
self.output_options = output_options
self.on_body_chunk_downloaded = on_body_chunk_downloaded
self.extra_options = kwargs
def get_headers(self) -> bytes:
"""Return the headers' bytes."""
return self.msg.headers.encode()
def get_metadata(self) -> bytes:
"""Return the message metadata."""
return self.msg.metadata.encode()
@abstractmethod
def iter_body(self) -> Iterable[bytes]:
"""Return an iterator over the message body."""
def __iter__(self) -> Iterable[bytes]:
"""Return an iterator over `self.msg`."""
if self.output_options.meta and self.output_options.kind is RequestsMessageKind.REQUEST:
yield self.get_metadata()
yield b'\n\n'
if self.output_options.headers:
yield self.get_headers()
yield b'\r\n\r\n'
if self.output_options.body:
try:
for chunk in self.iter_body():
yield chunk
if self.on_body_chunk_downloaded:
# Niquests 3.7+ have a way to determine the "real" amt of raw data collected
# Useful when the remote compress the body. We use the "untouched" amt of data to determine
# the download speed.
if hasattr(self.msg, "_orig") and hasattr(self.msg._orig, "download_progress") and self.msg._orig.download_progress:
# this is plan A: using public interfaces!
self.on_body_chunk_downloaded(self.msg._orig.download_progress.total)
elif hasattr(self.msg, "_orig") and hasattr(self.msg._orig, "raw") and hasattr(self.msg._orig.raw, "_fp_bytes_read"):
# plan B, falling back on a private property that may disapear from urllib3-future...
# this case is mandatory due to how the mocking library works. it does not use any "socket" but
# rather a simple io.BytesIO.
self.on_body_chunk_downloaded(self.msg._orig.raw._fp_bytes_read)
else:
# well. this case will certainly cause issues if the body is compressed.
self.on_body_chunk_downloaded(chunk)
except DataSuppressedError as e:
if self.output_options.headers:
yield b'\n'
yield e.message
if self.output_options.meta and self.output_options.kind is RequestsMessageKind.RESPONSE:
if self.output_options.body:
yield b'\n\n'
yield self.get_metadata()
yield b'\n\n'
class RawStream(BaseStream):
"""The message is streamed in chunks with no processing."""
CHUNK_SIZE = -1 # '-1' means that we want to receive chunks exactly as they arrive.
CHUNK_SIZE_BY_LINE = 1
def __init__(self, chunk_size=CHUNK_SIZE, **kwargs):
super().__init__(**kwargs)
self.chunk_size = chunk_size
def iter_body(self) -> Iterable[bytes]:
return self.msg.iter_body(self.chunk_size)
ENCODING_GUESS_THRESHOLD = 3
class EncodedStream(BaseStream):
"""Encoded HTTP message stream.
The message bytes are converted to an encoding suitable for
`self.env.stdout`. Unicode errors are replaced and binary data
is suppressed. The body is always streamed by line.
"""
CHUNK_SIZE = 1
def __init__(
self,
env=Environment(),
mime_overwrite: str = None,
encoding_overwrite: str = None,
**kwargs
):
super().__init__(**kwargs)
if mime_overwrite:
self.mime = mime_overwrite
else:
self.mime, _ = parse_content_type_header(self.msg.content_type)
self._encoding = encoding_overwrite or self.msg.encoding
self._encoding_guesses = []
if env.stdout_isatty:
# Use the encoding supported by the terminal.
output_encoding = env.stdout_encoding
else:
# Preserve the message encoding.
output_encoding = self.msg.encoding
# Default to UTF-8 when unsure.
self.output_encoding = output_encoding or UTF8
def iter_body(self) -> Iterable[bytes]:
for line, lf in self.msg.iter_lines(self.CHUNK_SIZE):
if b'\0' in line:
raise BinarySuppressedError()
line = self.decode_chunk(line)
yield smart_encode(line, self.output_encoding) + lf
def decode_chunk(self, raw_chunk: str) -> str:
chunk, guessed_encoding = smart_decode(raw_chunk, self.encoding)
self._encoding_guesses.append(guessed_encoding)
return chunk
@property
def encoding(self) -> Optional[str]:
if self._encoding:
return self._encoding
# If we find a reliable (used consecutively) encoding, than
# use it for the next iterations.
if len(self._encoding_guesses) < ENCODING_GUESS_THRESHOLD:
return None
guess_1, guess_2 = self._encoding_guesses[-2:]
if guess_1 == guess_2:
self._encoding = guess_1
return guess_1
@encoding.setter
def encoding(self, value) -> None:
self._encoding = value
class PrettyStream(EncodedStream):
"""In addition to :class:`EncodedStream` behaviour, this stream applies
content processing.
Useful for long-lived HTTP responses that stream by lines
such as the Twitter streaming API.
"""
CHUNK_SIZE = 1
def __init__(
self, conversion: Conversion,
formatting: Formatting,
**kwargs,
):
super().__init__(**kwargs)
self.formatting = formatting
self.conversion = conversion
def get_headers(self) -> bytes:
return self.formatting.format_headers(
self.msg.headers).encode(self.output_encoding)
def get_metadata(self) -> bytes:
return self.formatting.format_metadata(
self.msg.metadata).encode(self.output_encoding)
def iter_body(self) -> Iterable[bytes]:
first_chunk = True
iter_lines = self.msg.iter_lines(self.CHUNK_SIZE)
for line, lf in iter_lines:
if b'\0' in line:
if first_chunk:
converter = self.conversion.get_converter(self.mime)
if converter:
body = bytearray()
# noinspection PyAssignmentToLoopOrWithParameter
for line, lf in chain([(line, lf)], iter_lines):
body.extend(line)
body.extend(lf)
self.mime, body = converter.convert(body)
assert isinstance(body, str)
yield self.process_body(body)
return
raise BinarySuppressedError()
yield self.process_body(line) + lf
first_chunk = False
def process_body(self, chunk: Union[str, bytes]) -> bytes:
if not isinstance(chunk, str):
# Text when a converter has been used,
# otherwise it will always be bytes.
chunk = self.decode_chunk(chunk)
chunk = self.formatting.format_body(content=chunk, mime=self.mime)
return smart_encode(chunk, self.output_encoding)
class BufferedPrettyStream(PrettyStream):
"""The same as :class:`PrettyStream` except that the body is fully
fetched before it's processed.
Suitable regular HTTP responses.
"""
CHUNK_SIZE = 1024 * 10
def iter_body(self) -> Iterable[bytes]:
# Read the whole body before prettifying it,
# but bail out immediately if the body is binary.
converter = None
body = bytearray()
for chunk in self.msg.iter_body(self.CHUNK_SIZE):
if not converter and b'\0' in chunk:
converter = self.conversion.get_converter(self.mime)
if not converter:
raise BinarySuppressedError()
body.extend(chunk)
if converter:
self.mime, body = converter.convert(body)
yield self.process_body(body)