import os import base64 import json import mimetypes import re import sys import time import tempfile import sysconfig from collections import OrderedDict from contextlib import contextmanager from http.cookiejar import parse_ns_headers from pathlib import Path from pprint import pformat from urllib.parse import urlsplit from typing import Any, List, Optional, Tuple, Generator, Callable, Iterable, IO, TypeVar import requests.auth RE_COOKIE_SPLIT = re.compile(r', (?=[^ ;]+=)') Item = Tuple[str, Any] Items = List[Item] T = TypeVar("T") class JsonDictPreservingDuplicateKeys(OrderedDict): """A specialized JSON dict preserving duplicate keys.""" # Python versions prior to 3.8 suffer from an issue with multiple keys with the same name. # `json.dumps(obj, indent=N, sort_keys=True)` will output sorted keys when they are unique, and # duplicate keys will be outputted as they were defined in the original data. # See for the behavior change between Python versions. SUPPORTS_SORTING = sys.version_info >= (3, 8) def __init__(self, items: Items): self._items = items self._ensure_items_used() def _ensure_items_used(self) -> None: """HACK: Force `json.dumps()` to use `self.items()` instead of an empty dict. Two JSON encoders are available on CPython: pure-Python (1) and C (2) implementations. (1) The pure-python implementation will do a simple `if not dict: return '{}'`, and we could fake that check by implementing the `__bool__()` method. Source: - (2) On the other hand, the C implementation will do a check on the number of items contained inside the dict, using a verification on `dict->ma_used`, which is updated only when an item is added/removed from the dict. For that case, there is no workaround but to add an item into the dict. Sources: - - - To please both implementations, we simply add one item to the dict. """ if self._items: self['__hack__'] = '__hack__' def items(self) -> Items: """Return all items, duplicate ones included. """ return self._items def load_json_preserve_order_and_dupe_keys(s): return json.loads(s, object_pairs_hook=JsonDictPreservingDuplicateKeys) def repr_dict(d: dict) -> str: return pformat(d) def humanize_bytes(n, precision=2): # Author: Doug Latornell # Licence: MIT # URL: https://code.activestate.com/recipes/577081/ """Return a humanized string representation of a number of bytes. >>> humanize_bytes(1) '1 B' >>> humanize_bytes(1024, precision=1) '1.0 kB' >>> humanize_bytes(1024 * 123, precision=1) '123.0 kB' >>> humanize_bytes(1024 * 12342, precision=1) '12.1 MB' >>> humanize_bytes(1024 * 12342, precision=2) '12.05 MB' >>> humanize_bytes(1024 * 1234, precision=2) '1.21 MB' >>> humanize_bytes(1024 * 1234 * 1111, precision=2) '1.31 GB' >>> humanize_bytes(1024 * 1234 * 1111, precision=1) '1.3 GB' """ abbrevs = [ (1 << 50, 'PB'), (1 << 40, 'TB'), (1 << 30, 'GB'), (1 << 20, 'MB'), (1 << 10, 'kB'), (1, 'B') ] if n == 1: return '1 B' for factor, suffix in abbrevs: if n >= factor: break # noinspection PyUnboundLocalVariable return f'{n / factor:.{precision}f} {suffix}' class ExplicitNullAuth(requests.auth.AuthBase): """Forces requests to ignore the ``.netrc``. """ def __call__(self, r): return r def get_content_type(filename): """ Return the content type for ``filename`` in format appropriate for Content-Type headers, or ``None`` if the file type is unknown to ``mimetypes``. """ return mimetypes.guess_type(filename, strict=False)[0] def split_cookies(cookies): """ When ``requests`` stores cookies in ``response.headers['Set-Cookie']`` it concatenates all of them through ``, ``. This function splits cookies apart being careful to not to split on ``, `` which may be part of cookie value. """ if not cookies: return [] return RE_COOKIE_SPLIT.split(cookies) def get_expired_cookies( cookies: str, now: float = None ) -> List[dict]: now = now or time.time() def is_expired(expires: Optional[float]) -> bool: return expires is not None and expires <= now attr_sets: List[Tuple[str, str]] = parse_ns_headers( split_cookies(cookies) ) cookies = [ # The first attr name is the cookie name. dict(attrs[1:], name=attrs[0][0]) for attrs in attr_sets ] _max_age_to_expires(cookies=cookies, now=now) return [ { 'name': cookie['name'], 'path': cookie.get('path', '/') } for cookie in cookies if is_expired(expires=cookie.get('expires')) ] def _max_age_to_expires(cookies, now): """ Translate `max-age` into `expires` for Requests to take it into account. HACK/FIXME: """ for cookie in cookies: if 'expires' in cookie: continue max_age = cookie.get('max-age') if max_age and max_age.isdigit(): cookie['expires'] = now + float(max_age) def parse_content_type_header(header): """Borrowed from requests.""" tokens = header.split(';') content_type, params = tokens[0].strip(), tokens[1:] params_dict = {} items_to_strip = "\"' " for param in params: param = param.strip() if param: key, value = param, True index_of_equals = param.find("=") if index_of_equals != -1: key = param[:index_of_equals].strip(items_to_strip) value = param[index_of_equals + 1:].strip(items_to_strip) params_dict[key.lower()] = value return content_type, params_dict def as_site(path: Path, **extra_vars) -> Path: site_packages_path = sysconfig.get_path( 'purelib', vars={'base': str(path), **extra_vars} ) return Path(site_packages_path) def get_site_paths(path: Path) -> Iterable[Path]: from httpie.compat import ( MIN_SUPPORTED_PY_VERSION, MAX_SUPPORTED_PY_VERSION, is_frozen ) if is_frozen: [major, min_minor] = MIN_SUPPORTED_PY_VERSION [major, max_minor] = MAX_SUPPORTED_PY_VERSION for minor in range(min_minor, max_minor + 1): yield as_site( path, py_version_short=f'{major}.{minor}' ) else: yield as_site(path) def split(iterable: Iterable[T], key: Callable[[T], bool]) -> Tuple[List[T], List[T]]: left, right = [], [] for item in iterable: if key(item): left.append(item) else: right.append(item) return left, right def unwrap_context(exc: Exception) -> Optional[Exception]: context = exc.__context__ if isinstance(context, Exception): return unwrap_context(context) else: return exc def url_as_host(url: str) -> str: return urlsplit(url).netloc.split('@')[-1] class LockFileError(ValueError): pass @contextmanager def open_with_lockfile(file: Path, *args, **kwargs) -> Generator[IO[Any], None, None]: file_id = base64.b64encode(os.fsencode(file)).decode() target_file = Path(tempfile.gettempdir()) / file_id # Have an atomic-like touch here, so we'll tighten the possibility of # a race occuring between multiple processes accessing the same file. try: target_file.touch(exist_ok=False) except FileExistsError as exc: raise LockFileError("Can't modify a locked file.") from exc try: with open(file, *args, **kwargs) as stream: yield stream finally: target_file.unlink() def is_version_greater(version_1: str, version_2: str) -> bool: # In an ideal scenario, we would depend on `packaging` in order # to offer PEP 440 compatible parsing. But since it might not be # commonly available for outside packages, and since we are only # going to parse HTTPie's own version it should be fine to compare # this in a SemVer subset fashion. def split_version(version: str) -> Tuple[int, ...]: parts = [] for part in version.split('.')[:3]: try: parts.append(int(part)) except ValueError: break return tuple(parts) return split_version(version_1) > split_version(version_2)