import os import functools from typing import Callable, Dict, IO, List, Optional, Tuple, Union from .argtypes import KeyValueArg from .constants import ( SEPARATORS_GROUP_MULTIPART, SEPARATOR_DATA_EMBED_FILE_CONTENTS, SEPARATOR_DATA_EMBED_RAW_JSON_FILE, SEPARATOR_GROUP_NESTED_JSON_ITEMS, SEPARATOR_DATA_RAW_JSON, SEPARATOR_DATA_STRING, SEPARATOR_FILE_UPLOAD, SEPARATOR_FILE_UPLOAD_TYPE, SEPARATOR_HEADER, SEPARATOR_HEADER_EMPTY, SEPARATOR_HEADER_EMBED, SEPARATOR_QUERY_PARAM, SEPARATOR_QUERY_EMBED_FILE, RequestType ) from .dicts import ( BaseMultiDict, MultipartRequestDataDict, RequestDataDict, RequestFilesDict, HTTPHeadersDict, RequestJSONDataDict, RequestQueryParamsDict, ) from .exceptions import ParseError from .nested_json import interpret_nested_json from ..utils import get_content_type, load_json_preserve_order_and_dupe_keys, split_iterable class RequestItems: def __init__(self, request_type: Optional[RequestType] = None): self.headers = HTTPHeadersDict() self.request_type = request_type self.is_json = request_type is None or request_type is RequestType.JSON self.data = RequestJSONDataDict() if self.is_json else RequestDataDict() self.files = RequestFilesDict() self.params = RequestQueryParamsDict() # To preserve the order of fields in file upload multipart requests. self.multipart_data = MultipartRequestDataDict() @classmethod def from_args( cls, request_item_args: List[KeyValueArg], request_type: Optional[RequestType] = None, ) -> 'RequestItems': instance = cls(request_type=request_type) rules: Dict[str, Tuple[Callable, dict]] = { SEPARATOR_HEADER: ( process_header_arg, instance.headers, ), SEPARATOR_HEADER_EMPTY: ( process_empty_header_arg, instance.headers, ), SEPARATOR_HEADER_EMBED: ( process_embed_header_arg, instance.headers, ), SEPARATOR_QUERY_PARAM: ( process_query_param_arg, instance.params, ), SEPARATOR_QUERY_EMBED_FILE: ( process_embed_query_param_arg, instance.params, ), SEPARATOR_FILE_UPLOAD: ( process_file_upload_arg, instance.files, ), SEPARATOR_DATA_STRING: ( process_data_item_arg, instance.data, ), SEPARATOR_DATA_EMBED_FILE_CONTENTS: ( process_data_embed_file_contents_arg, instance.data, ), SEPARATOR_GROUP_NESTED_JSON_ITEMS: ( process_data_nested_json_embed_args, instance.data, ), SEPARATOR_DATA_RAW_JSON: ( convert_json_value_to_form_if_needed( in_json_mode=instance.is_json, processor=process_data_raw_json_embed_arg ), instance.data, ), SEPARATOR_DATA_EMBED_RAW_JSON_FILE: ( convert_json_value_to_form_if_needed( in_json_mode=instance.is_json, processor=process_data_embed_raw_json_file_arg, ), instance.data, ), } if instance.is_json: json_item_args, request_item_args = split_iterable( iterable=request_item_args, key=lambda arg: arg.sep in SEPARATOR_GROUP_NESTED_JSON_ITEMS ) if json_item_args: pairs = [(arg.key, rules[arg.sep][0](arg)) for arg in json_item_args] processor_func, target_dict = rules[SEPARATOR_GROUP_NESTED_JSON_ITEMS] value = processor_func(pairs) target_dict.update(value) # Then handle all other items. for arg in request_item_args: processor_func, target_dict = rules[arg.sep] value = processor_func(arg) if arg.sep in SEPARATORS_GROUP_MULTIPART: instance.multipart_data[arg.key] = value if isinstance(target_dict, BaseMultiDict): target_dict.add(arg.key, value) else: target_dict[arg.key] = value return instance JSONType = Union[str, bool, int, list, dict] def process_header_arg(arg: KeyValueArg) -> Optional[str]: return arg.value or None def process_embed_header_arg(arg: KeyValueArg) -> str: return load_text_file(arg).rstrip('\n') def process_empty_header_arg(arg: KeyValueArg) -> str: if not arg.value: return arg.value raise ParseError( f'Invalid item {arg.orig!r} (to specify an empty header use `Header;`)' ) def process_query_param_arg(arg: KeyValueArg) -> str: return arg.value def process_embed_query_param_arg(arg: KeyValueArg) -> str: return load_text_file(arg).rstrip('\n') def process_file_upload_arg(arg: KeyValueArg) -> Tuple[str, IO, str]: parts = arg.value.split(SEPARATOR_FILE_UPLOAD_TYPE) filename = parts[0] mime_type = parts[1] if len(parts) > 1 else None try: f = open(os.path.expanduser(filename), 'rb') except OSError as e: raise ParseError(f'{arg.orig!r}: {e}') return ( os.path.basename(filename), f, mime_type or get_content_type(filename), ) def convert_json_value_to_form_if_needed(in_json_mode: bool, processor: Callable[[KeyValueArg], JSONType]) -> Callable[[], str]: """ We allow primitive values to be passed to forms via JSON key/value syntax. But complex values lead to an error because there’s no clear way to serialize them. """ if in_json_mode: return processor @functools.wraps(processor) def wrapper(*args, **kwargs) -> str: try: output = processor(*args, **kwargs) except ParseError: output = None if isinstance(output, (str, int, float)): return str(output) else: raise ParseError('Cannot use complex JSON value types with --form/--multipart.') return wrapper def process_data_item_arg(arg: KeyValueArg) -> str: return arg.value def process_data_embed_file_contents_arg(arg: KeyValueArg) -> str: return load_text_file(arg) def process_data_embed_raw_json_file_arg(arg: KeyValueArg) -> JSONType: contents = load_text_file(arg) value = load_json(arg, contents) return value def process_data_raw_json_embed_arg(arg: KeyValueArg) -> JSONType: value = load_json(arg, arg.value) return value def process_data_nested_json_embed_args(pairs) -> Dict[str, JSONType]: return interpret_nested_json(pairs) def load_text_file(item: KeyValueArg) -> str: path = item.value try: with open(os.path.expanduser(path), 'rb') as f: return f.read().decode() except OSError as e: raise ParseError(f'{item.orig!r}: {e}') except UnicodeDecodeError: raise ParseError( f'{item.orig!r}: cannot embed the content of {item.value!r},' ' not a UTF-8 or ASCII-encoded text file' ) def load_json(arg: KeyValueArg, contents: str) -> JSONType: try: return load_json_preserve_order_and_dupe_keys(contents) except ValueError as e: raise ParseError(f'{arg.orig!r}: {e}')