httpie-cli/httpie/utils.py
Batuhan Taskaya e0e03f3237
Better DNS error handling (#1249)
* Better DNS error handling

* Update httpie/core.py

Co-authored-by: Batuhan Taskaya <isidentical@gmail.com>

Co-authored-by: Jakub Roztocil <jakub@roztocil.co>
2021-12-23 11:35:30 -08:00

240 lines
6.8 KiB
Python

import json
import mimetypes
import re
import sys
import time
import sysconfig
from collections import OrderedDict
from http.cookiejar import parse_ns_headers
from pathlib import Path
from pprint import pformat
from typing import Any, List, Optional, Tuple, Callable, Iterable, TypeVar
import requests.auth
RE_COOKIE_SPLIT = re.compile(r', (?=[^ ;]+=)')
Item = Tuple[str, Any]
Items = List[Item]
T = TypeVar("T")
class JsonDictPreservingDuplicateKeys(OrderedDict):
"""A specialized JSON dict preserving duplicate keys."""
# Python versions prior to 3.8 suffer from an issue with multiple keys with the same name.
# `json.dumps(obj, indent=N, sort_keys=True)` will output sorted keys when they are unique, and
# duplicate keys will be outputted as they were defined in the original data.
# See <https://bugs.python.org/issue23493#msg400929> for the behavior change between Python versions.
SUPPORTS_SORTING = sys.version_info >= (3, 8)
def __init__(self, items: Items):
self._items = items
self._ensure_items_used()
def _ensure_items_used(self) -> None:
"""HACK: Force `json.dumps()` to use `self.items()` instead of an empty dict.
Two JSON encoders are available on CPython: pure-Python (1) and C (2) implementations.
(1) The pure-python implementation will do a simple `if not dict: return '{}'`,
and we could fake that check by implementing the `__bool__()` method.
Source:
- <https://github.com/python/cpython/blob/9d318ad/Lib/json/encoder.py#L334-L336>
(2) On the other hand, the C implementation will do a check on the number of
items contained inside the dict, using a verification on `dict->ma_used`, which
is updated only when an item is added/removed from the dict. For that case,
there is no workaround but to add an item into the dict.
Sources:
- <https://github.com/python/cpython/blob/9d318ad/Modules/_json.c#L1581-L1582>
- <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L53>
- <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L17-L18>
To please both implementations, we simply add one item to the dict.
"""
if self._items:
self['__hack__'] = '__hack__'
def items(self) -> Items:
"""Return all items, duplicate ones included.
"""
return self._items
def load_json_preserve_order_and_dupe_keys(s):
return json.loads(s, object_pairs_hook=JsonDictPreservingDuplicateKeys)
def repr_dict(d: dict) -> str:
return pformat(d)
def humanize_bytes(n, precision=2):
# Author: Doug Latornell
# Licence: MIT
# URL: https://code.activestate.com/recipes/577081/
"""Return a humanized string representation of a number of bytes.
>>> humanize_bytes(1)
'1 B'
>>> humanize_bytes(1024, precision=1)
'1.0 kB'
>>> humanize_bytes(1024 * 123, precision=1)
'123.0 kB'
>>> humanize_bytes(1024 * 12342, precision=1)
'12.1 MB'
>>> humanize_bytes(1024 * 12342, precision=2)
'12.05 MB'
>>> humanize_bytes(1024 * 1234, precision=2)
'1.21 MB'
>>> humanize_bytes(1024 * 1234 * 1111, precision=2)
'1.31 GB'
>>> humanize_bytes(1024 * 1234 * 1111, precision=1)
'1.3 GB'
"""
abbrevs = [
(1 << 50, 'PB'),
(1 << 40, 'TB'),
(1 << 30, 'GB'),
(1 << 20, 'MB'),
(1 << 10, 'kB'),
(1, 'B')
]
if n == 1:
return '1 B'
for factor, suffix in abbrevs:
if n >= factor:
break
# noinspection PyUnboundLocalVariable
return f'{n / factor:.{precision}f} {suffix}'
class ExplicitNullAuth(requests.auth.AuthBase):
"""Forces requests to ignore the ``.netrc``.
<https://github.com/psf/requests/issues/2773#issuecomment-174312831>
"""
def __call__(self, r):
return r
def get_content_type(filename):
"""
Return the content type for ``filename`` in format appropriate
for Content-Type headers, or ``None`` if the file type is unknown
to ``mimetypes``.
"""
return mimetypes.guess_type(filename, strict=False)[0]
def split_cookies(cookies):
"""
When ``requests`` stores cookies in ``response.headers['Set-Cookie']``
it concatenates all of them through ``, ``.
This function splits cookies apart being careful to not to
split on ``, `` which may be part of cookie value.
"""
if not cookies:
return []
return RE_COOKIE_SPLIT.split(cookies)
def get_expired_cookies(
cookies: str,
now: float = None
) -> List[dict]:
now = now or time.time()
def is_expired(expires: Optional[float]) -> bool:
return expires is not None and expires <= now
attr_sets: List[Tuple[str, str]] = parse_ns_headers(
split_cookies(cookies)
)
cookies = [
# The first attr name is the cookie name.
dict(attrs[1:], name=attrs[0][0])
for attrs in attr_sets
]
_max_age_to_expires(cookies=cookies, now=now)
return [
{
'name': cookie['name'],
'path': cookie.get('path', '/')
}
for cookie in cookies
if is_expired(expires=cookie.get('expires'))
]
def _max_age_to_expires(cookies, now):
"""
Translate `max-age` into `expires` for Requests to take it into account.
HACK/FIXME: <https://github.com/psf/requests/issues/5743>
"""
for cookie in cookies:
if 'expires' in cookie:
continue
max_age = cookie.get('max-age')
if max_age and max_age.isdigit():
cookie['expires'] = now + float(max_age)
def parse_content_type_header(header):
"""Borrowed from requests."""
tokens = header.split(';')
content_type, params = tokens[0].strip(), tokens[1:]
params_dict = {}
items_to_strip = "\"' "
for param in params:
param = param.strip()
if param:
key, value = param, True
index_of_equals = param.find("=")
if index_of_equals != -1:
key = param[:index_of_equals].strip(items_to_strip)
value = param[index_of_equals + 1:].strip(items_to_strip)
params_dict[key.lower()] = value
return content_type, params_dict
def as_site(path: Path) -> Path:
site_packages_path = sysconfig.get_path(
'purelib',
vars={'base': str(path)}
)
return Path(site_packages_path)
def split(iterable: Iterable[T], key: Callable[[T], bool]) -> Tuple[List[T], List[T]]:
left, right = [], []
for item in iterable:
if key(item):
left.append(item)
else:
right.append(item)
return left, right
def unwrap_context(exc: Exception) -> Optional[Exception]:
context = exc.__context__
if isinstance(context, Exception):
return unwrap_context(context)
else:
return exc