mirror of
https://github.com/httpie/cli.git
synced 2024-11-21 15:23:11 +01:00
310 lines
8.9 KiB
Python
310 lines
8.9 KiB
Python
import os
|
|
import base64
|
|
import json
|
|
import mimetypes
|
|
import re
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import sysconfig
|
|
|
|
from collections import OrderedDict
|
|
from contextlib import contextmanager
|
|
from http.cookiejar import parse_ns_headers
|
|
from pathlib import Path
|
|
from pprint import pformat
|
|
from urllib.parse import urlsplit
|
|
from typing import Any, List, Optional, Tuple, Generator, Callable, Iterable, IO, TypeVar
|
|
|
|
import requests.auth
|
|
|
|
RE_COOKIE_SPLIT = re.compile(r', (?=[^ ;]+=)')
|
|
Item = Tuple[str, Any]
|
|
Items = List[Item]
|
|
T = TypeVar("T")
|
|
|
|
|
|
class JsonDictPreservingDuplicateKeys(OrderedDict):
|
|
"""A specialized JSON dict preserving duplicate keys."""
|
|
|
|
# Python versions prior to 3.8 suffer from an issue with multiple keys with the same name.
|
|
# `json.dumps(obj, indent=N, sort_keys=True)` will output sorted keys when they are unique, and
|
|
# duplicate keys will be outputted as they were defined in the original data.
|
|
# See <https://bugs.python.org/issue23493#msg400929> for the behavior change between Python versions.
|
|
SUPPORTS_SORTING = sys.version_info >= (3, 8)
|
|
|
|
def __init__(self, items: Items):
|
|
self._items = items
|
|
self._ensure_items_used()
|
|
|
|
def _ensure_items_used(self) -> None:
|
|
"""HACK: Force `json.dumps()` to use `self.items()` instead of an empty dict.
|
|
|
|
Two JSON encoders are available on CPython: pure-Python (1) and C (2) implementations.
|
|
|
|
(1) The pure-python implementation will do a simple `if not dict: return '{}'`,
|
|
and we could fake that check by implementing the `__bool__()` method.
|
|
Source:
|
|
- <https://github.com/python/cpython/blob/9d318ad/Lib/json/encoder.py#L334-L336>
|
|
|
|
(2) On the other hand, the C implementation will do a check on the number of
|
|
items contained inside the dict, using a verification on `dict->ma_used`, which
|
|
is updated only when an item is added/removed from the dict. For that case,
|
|
there is no workaround but to add an item into the dict.
|
|
Sources:
|
|
- <https://github.com/python/cpython/blob/9d318ad/Modules/_json.c#L1581-L1582>
|
|
- <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L53>
|
|
- <https://github.com/python/cpython/blob/9d318ad/Include/cpython/dictobject.h#L17-L18>
|
|
|
|
To please both implementations, we simply add one item to the dict.
|
|
|
|
"""
|
|
if self._items:
|
|
self['__hack__'] = '__hack__'
|
|
|
|
def items(self) -> Items:
|
|
"""Return all items, duplicate ones included.
|
|
|
|
"""
|
|
return self._items
|
|
|
|
|
|
def load_json_preserve_order_and_dupe_keys(s):
|
|
return json.loads(s, object_pairs_hook=JsonDictPreservingDuplicateKeys)
|
|
|
|
|
|
def repr_dict(d: dict) -> str:
|
|
return pformat(d)
|
|
|
|
|
|
def humanize_bytes(n, precision=2):
|
|
# Author: Doug Latornell
|
|
# Licence: MIT
|
|
# URL: https://code.activestate.com/recipes/577081/
|
|
"""Return a humanized string representation of a number of bytes.
|
|
|
|
>>> humanize_bytes(1)
|
|
'1 B'
|
|
>>> humanize_bytes(1024, precision=1)
|
|
'1.0 kB'
|
|
>>> humanize_bytes(1024 * 123, precision=1)
|
|
'123.0 kB'
|
|
>>> humanize_bytes(1024 * 12342, precision=1)
|
|
'12.1 MB'
|
|
>>> humanize_bytes(1024 * 12342, precision=2)
|
|
'12.05 MB'
|
|
>>> humanize_bytes(1024 * 1234, precision=2)
|
|
'1.21 MB'
|
|
>>> humanize_bytes(1024 * 1234 * 1111, precision=2)
|
|
'1.31 GB'
|
|
>>> humanize_bytes(1024 * 1234 * 1111, precision=1)
|
|
'1.3 GB'
|
|
|
|
"""
|
|
abbrevs = [
|
|
(1 << 50, 'PB'),
|
|
(1 << 40, 'TB'),
|
|
(1 << 30, 'GB'),
|
|
(1 << 20, 'MB'),
|
|
(1 << 10, 'kB'),
|
|
(1, 'B')
|
|
]
|
|
|
|
if n == 1:
|
|
return '1 B'
|
|
|
|
for factor, suffix in abbrevs:
|
|
if n >= factor:
|
|
break
|
|
|
|
# noinspection PyUnboundLocalVariable
|
|
return f'{n / factor:.{precision}f} {suffix}'
|
|
|
|
|
|
class ExplicitNullAuth(requests.auth.AuthBase):
|
|
"""Forces requests to ignore the ``.netrc``.
|
|
<https://github.com/psf/requests/issues/2773#issuecomment-174312831>
|
|
"""
|
|
|
|
def __call__(self, r):
|
|
return r
|
|
|
|
|
|
def get_content_type(filename):
|
|
"""
|
|
Return the content type for ``filename`` in format appropriate
|
|
for Content-Type headers, or ``None`` if the file type is unknown
|
|
to ``mimetypes``.
|
|
|
|
"""
|
|
return mimetypes.guess_type(filename, strict=False)[0]
|
|
|
|
|
|
def split_cookies(cookies):
|
|
"""
|
|
When ``requests`` stores cookies in ``response.headers['Set-Cookie']``
|
|
it concatenates all of them through ``, ``.
|
|
|
|
This function splits cookies apart being careful to not to
|
|
split on ``, `` which may be part of cookie value.
|
|
"""
|
|
if not cookies:
|
|
return []
|
|
return RE_COOKIE_SPLIT.split(cookies)
|
|
|
|
|
|
def get_expired_cookies(
|
|
cookies: str,
|
|
now: float = None
|
|
) -> List[dict]:
|
|
|
|
now = now or time.time()
|
|
|
|
def is_expired(expires: Optional[float]) -> bool:
|
|
return expires is not None and expires <= now
|
|
|
|
attr_sets: List[Tuple[str, str]] = parse_ns_headers(
|
|
split_cookies(cookies)
|
|
)
|
|
|
|
cookies = [
|
|
# The first attr name is the cookie name.
|
|
dict(attrs[1:], name=attrs[0][0])
|
|
for attrs in attr_sets
|
|
]
|
|
|
|
_max_age_to_expires(cookies=cookies, now=now)
|
|
|
|
return [
|
|
{
|
|
'name': cookie['name'],
|
|
'path': cookie.get('path', '/')
|
|
}
|
|
for cookie in cookies
|
|
if is_expired(expires=cookie.get('expires'))
|
|
]
|
|
|
|
|
|
def _max_age_to_expires(cookies, now):
|
|
"""
|
|
Translate `max-age` into `expires` for Requests to take it into account.
|
|
|
|
HACK/FIXME: <https://github.com/psf/requests/issues/5743>
|
|
|
|
"""
|
|
for cookie in cookies:
|
|
if 'expires' in cookie:
|
|
continue
|
|
max_age = cookie.get('max-age')
|
|
if max_age and max_age.isdigit():
|
|
cookie['expires'] = now + float(max_age)
|
|
|
|
|
|
def parse_content_type_header(header):
|
|
"""Borrowed from requests."""
|
|
tokens = header.split(';')
|
|
content_type, params = tokens[0].strip(), tokens[1:]
|
|
params_dict = {}
|
|
items_to_strip = "\"' "
|
|
for param in params:
|
|
param = param.strip()
|
|
if param:
|
|
key, value = param, True
|
|
index_of_equals = param.find("=")
|
|
if index_of_equals != -1:
|
|
key = param[:index_of_equals].strip(items_to_strip)
|
|
value = param[index_of_equals + 1:].strip(items_to_strip)
|
|
params_dict[key.lower()] = value
|
|
return content_type, params_dict
|
|
|
|
|
|
def as_site(path: Path, **extra_vars) -> Path:
|
|
site_packages_path = sysconfig.get_path(
|
|
'purelib',
|
|
vars={'base': str(path), **extra_vars}
|
|
)
|
|
return Path(site_packages_path)
|
|
|
|
|
|
def get_site_paths(path: Path) -> Iterable[Path]:
|
|
from httpie.compat import (
|
|
MIN_SUPPORTED_PY_VERSION,
|
|
MAX_SUPPORTED_PY_VERSION,
|
|
is_frozen
|
|
)
|
|
|
|
if is_frozen:
|
|
[major, min_minor] = MIN_SUPPORTED_PY_VERSION
|
|
[major, max_minor] = MAX_SUPPORTED_PY_VERSION
|
|
for minor in range(min_minor, max_minor + 1):
|
|
yield as_site(
|
|
path,
|
|
py_version_short=f'{major}.{minor}'
|
|
)
|
|
else:
|
|
yield as_site(path)
|
|
|
|
|
|
def split_iterable(iterable: Iterable[T], key: Callable[[T], bool]) -> Tuple[List[T], List[T]]:
|
|
left, right = [], []
|
|
for item in iterable:
|
|
if key(item):
|
|
left.append(item)
|
|
else:
|
|
right.append(item)
|
|
return left, right
|
|
|
|
|
|
def unwrap_context(exc: Exception) -> Optional[Exception]:
|
|
context = exc.__context__
|
|
if isinstance(context, Exception):
|
|
return unwrap_context(context)
|
|
else:
|
|
return exc
|
|
|
|
|
|
def url_as_host(url: str) -> str:
|
|
return urlsplit(url).netloc.split('@')[-1]
|
|
|
|
|
|
class LockFileError(ValueError):
|
|
pass
|
|
|
|
|
|
@contextmanager
|
|
def open_with_lockfile(file: Path, *args, **kwargs) -> Generator[IO[Any], None, None]:
|
|
file_id = base64.b64encode(os.fsencode(file)).decode()
|
|
target_file = Path(tempfile.gettempdir()) / file_id
|
|
|
|
# Have an atomic-like touch here, so we'll tighten the possibility of
|
|
# a race occurring between multiple processes accessing the same file.
|
|
try:
|
|
target_file.touch(exist_ok=False)
|
|
except FileExistsError as exc:
|
|
raise LockFileError("Can't modify a locked file.") from exc
|
|
|
|
try:
|
|
with open(file, *args, **kwargs) as stream:
|
|
yield stream
|
|
finally:
|
|
target_file.unlink()
|
|
|
|
|
|
def is_version_greater(version_1: str, version_2: str) -> bool:
|
|
# In an ideal scenario, we would depend on `packaging` in order
|
|
# to offer PEP 440 compatible parsing. But since it might not be
|
|
# commonly available for outside packages, and since we are only
|
|
# going to parse HTTPie's own version it should be fine to compare
|
|
# this in a SemVer subset fashion.
|
|
|
|
def split_version(version: str) -> Tuple[int, ...]:
|
|
parts = []
|
|
for part in version.split('.')[:3]:
|
|
try:
|
|
parts.append(int(part))
|
|
except ValueError:
|
|
break
|
|
return tuple(parts)
|
|
|
|
return split_version(version_1) > split_version(version_2)
|