""" Persistent, JSON-serialized sessions. """ import os import re from http.cookies import SimpleCookie from pathlib import Path from typing import Iterable, Optional, Union from urllib.parse import urlsplit from requests.auth import AuthBase from requests.cookies import RequestsCookieJar, create_cookie from .cli.dicts import RequestHeadersDict from .config import BaseConfigDict, DEFAULT_CONFIG_DIR from .plugins.registry import plugin_manager SESSIONS_DIR_NAME = 'sessions' DEFAULT_SESSIONS_DIR = DEFAULT_CONFIG_DIR / SESSIONS_DIR_NAME VALID_SESSION_NAME_PATTERN = re.compile('^[a-zA-Z0-9_.-]+$') # Request headers starting with these prefixes won't be stored in sessions. # They are specific to each request. # SESSION_IGNORED_HEADER_PREFIXES = ['Content-', 'If-'] def get_httpie_session( config_dir: Path, session_name: str, host: Optional[str], url: str, ) -> 'Session': if os.path.sep in session_name: path = os.path.expanduser(session_name) else: hostname = host or urlsplit(url).netloc.split('@')[-1] if not hostname: # HACK/FIXME: httpie-unixsocket's URLs have no hostname. hostname = 'localhost' # host:port => host_port hostname = hostname.replace(':', '_') path = ( config_dir / SESSIONS_DIR_NAME / hostname / f'{session_name}.json' ) session = Session(path) session.load() return session class Session(BaseConfigDict): helpurl = 'https://httpie.org/doc#sessions' about = 'HTTPie session file' def __init__(self, path: Union[str, Path]): super().__init__(path=Path(path)) self['headers'] = {} self['cookies'] = {} self['auth'] = { 'type': None, 'username': None, 'password': None } def update_headers(self, request_headers: RequestHeadersDict): """ Update the session headers with the request ones while ignoring certain name prefixes. """ headers = self.headers for name, value in request_headers.copy().items(): if value is None: continue # Ignore explicitly unset headers if type(value) is not str: value = value.decode() if name.lower() == 'user-agent' and value.startswith('HTTPie/'): continue if name.lower() == 'cookie': for cookie_name, morsel in SimpleCookie(value).items(): self['cookies'][cookie_name] = {'value': morsel.value} del request_headers[name] continue for prefix in SESSION_IGNORED_HEADER_PREFIXES: if name.lower().startswith(prefix.lower()): break else: headers[name] = value self['headers'] = dict(headers) @property def headers(self) -> RequestHeadersDict: return RequestHeadersDict(self['headers']) @property def cookies(self) -> RequestsCookieJar: jar = RequestsCookieJar() for name, cookie_dict in self['cookies'].items(): jar.set_cookie(create_cookie( name, cookie_dict.pop('value'), **cookie_dict)) jar.clear_expired_cookies() return jar @cookies.setter def cookies(self, jar: RequestsCookieJar): # stored_attrs = ['value', 'path', 'secure', 'expires'] self['cookies'] = {} for cookie in jar: self['cookies'][cookie.name] = { attname: getattr(cookie, attname) for attname in stored_attrs } @property def auth(self) -> Optional[AuthBase]: auth = self.get('auth', None) if not auth or not auth['type']: return plugin = plugin_manager.get_auth_plugin(auth['type'])() credentials = {'username': None, 'password': None} try: # New style plugin.raw_auth = auth['raw_auth'] except KeyError: # Old style credentials = { 'username': auth['username'], 'password': auth['password'], } else: if plugin.auth_parse: from .cli.argtypes import parse_auth parsed = parse_auth(plugin.raw_auth) credentials = { 'username': parsed.key, 'password': parsed.value, } return plugin.get_auth(**credentials) @auth.setter def auth(self, auth: dict): assert {'type', 'raw_auth'} == auth.keys() self['auth'] = auth def remove_cookies(self, names: Iterable[str]): for name in names: if name in self['cookies']: del self['cookies'][name]