httpie-cli/httpie/models.py
Batuhan Taskaya f3b500119c
Implement basic metrics layout & total elapsed time (#1250)
* Initial metadata processing

* Dynamic coloring and other stuff

* Use -vv / --meta

* More testing

* Cleanup

* Tweek message

Co-authored-by: Jakub Roztocil <jakub@roztocil.co>
2021-12-23 12:13:25 -08:00

209 lines
5.4 KiB
Python

import requests
from enum import Enum, auto
from typing import Iterable, Union, NamedTuple
from urllib.parse import urlsplit
from .cli.constants import (
OUT_REQ_BODY,
OUT_REQ_HEAD,
OUT_RESP_BODY,
OUT_RESP_HEAD,
OUT_RESP_META
)
from .compat import cached_property
from .utils import split_cookies, parse_content_type_header
class HTTPMessage:
"""Abstract class for HTTP messages."""
def __init__(self, orig):
self._orig = orig
def iter_body(self, chunk_size: int) -> Iterable[bytes]:
"""Return an iterator over the body."""
raise NotImplementedError
def iter_lines(self, chunk_size: int) -> Iterable[bytes]:
"""Return an iterator over the body yielding (`line`, `line_feed`)."""
raise NotImplementedError
@property
def headers(self) -> str:
"""Return a `str` with the message's headers."""
raise NotImplementedError
@property
def metadata(self) -> str:
"""Return metadata about the current message."""
raise NotImplementedError
@cached_property
def encoding(self) -> str:
ct, params = parse_content_type_header(self.content_type)
return params.get('charset', '')
@property
def content_type(self) -> str:
"""Return the message content type."""
ct = self._orig.headers.get('Content-Type', '')
if not isinstance(ct, str):
ct = ct.decode()
return ct
class HTTPResponse(HTTPMessage):
"""A :class:`requests.models.Response` wrapper."""
def iter_body(self, chunk_size=1):
return self._orig.iter_content(chunk_size=chunk_size)
def iter_lines(self, chunk_size):
return ((line, b'\n') for line in self._orig.iter_lines(chunk_size))
# noinspection PyProtectedMember
@property
def headers(self):
try:
raw_version = self._orig.raw._original_response.version
except AttributeError:
# Assume HTTP/1.1
raw_version = 11
version = {
9: '0.9',
10: '1.0',
11: '1.1',
20: '2',
}[raw_version]
original = self._orig
status_line = f'HTTP/{version} {original.status_code} {original.reason}'
headers = [status_line]
headers.extend(
': '.join(header)
for header in original.headers.items()
if header[0] != 'Set-Cookie'
)
headers.extend(
f'Set-Cookie: {cookie}'
for header, value in original.headers.items()
for cookie in split_cookies(value)
if header == 'Set-Cookie'
)
return '\r\n'.join(headers)
@property
def metadata(self) -> str:
data = {}
data['Elapsed time'] = str(self._orig.elapsed.total_seconds()) + 's'
return '\n'.join(
f'{key}: {value}'
for key, value in data.items()
)
class HTTPRequest(HTTPMessage):
"""A :class:`requests.models.Request` wrapper."""
def iter_body(self, chunk_size):
yield self.body
def iter_lines(self, chunk_size):
yield self.body, b''
@property
def headers(self):
url = urlsplit(self._orig.url)
request_line = '{method} {path}{query} HTTP/1.1'.format(
method=self._orig.method,
path=url.path or '/',
query=f'?{url.query}' if url.query else ''
)
headers = self._orig.headers.copy()
if 'Host' not in self._orig.headers:
headers['Host'] = url.netloc.split('@')[-1]
headers = [
f'{name}: {value if isinstance(value, str) else value.decode()}'
for name, value in headers.items()
]
headers.insert(0, request_line)
headers = '\r\n'.join(headers).strip()
return headers
@property
def body(self):
body = self._orig.body
if isinstance(body, str):
# Happens with JSON/form request data parsed from the command line.
body = body.encode()
return body or b''
RequestsMessage = Union[requests.PreparedRequest, requests.Response]
class RequestsMessageKind(Enum):
REQUEST = auto()
RESPONSE = auto()
def infer_requests_message_kind(message: RequestsMessage) -> RequestsMessageKind:
if isinstance(message, requests.PreparedRequest):
return RequestsMessageKind.REQUEST
elif isinstance(message, requests.Response):
return RequestsMessageKind.RESPONSE
else:
raise TypeError(f"Unexpected message type: {type(message).__name__}")
OPTION_TO_PARAM = {
RequestsMessageKind.REQUEST: {
'headers': OUT_REQ_HEAD,
'body': OUT_REQ_BODY,
},
RequestsMessageKind.RESPONSE: {
'headers': OUT_RESP_HEAD,
'body': OUT_RESP_BODY,
'meta': OUT_RESP_META
}
}
class OutputOptions(NamedTuple):
kind: RequestsMessageKind
headers: bool
body: bool
meta: bool = False
def any(self):
return (
self.headers
or self.body
or self.meta
)
@classmethod
def from_message(
cls,
message: RequestsMessage,
raw_args: str = '',
**kwargs
):
kind = infer_requests_message_kind(message)
options = {
option: param in raw_args
for option, param in OPTION_TO_PARAM[kind].items()
}
options.update(kwargs)
return cls(
kind=kind,
**options
)