httpie-cli/httpie/cli/nested_json/interpret.py

130 lines
4.2 KiB
Python
Raw Normal View History

from typing import Type, Union, Any, Iterable, Tuple
from .parse import parse, assert_cant_happen
from .errors import NestedJSONSyntaxError
from .tokens import EMPTY_STRING, TokenKind, Token, PathAction, Path, NestedJSONArray
__all__ = [
'interpret_nested_json',
'unwrap_top_level_list_if_needed',
]
JSONType = Type[Union[dict, list, int, float, str]]
JSON_TYPE_MAPPING = {
dict: 'object',
list: 'array',
int: 'number',
float: 'number',
str: 'string',
}
def interpret_nested_json(pairs: Iterable[Tuple[str, str]]) -> dict:
context = None
for key, value in pairs:
context = interpret(context, key, value)
return wrap_with_dict(context)
def interpret(context: Any, key: str, value: Any) -> Any:
cursor = context
paths = list(parse(key))
paths.append(Path(PathAction.SET, value))
# noinspection PyShadowingNames
def type_check(index: int, path: Path, expected_type: JSONType):
if not isinstance(cursor, expected_type):
if path.tokens:
pseudo_token = Token(
kind=TokenKind.PSEUDO,
value='',
start=path.tokens[0].start,
end=path.tokens[-1].end,
)
else:
pseudo_token = None
cursor_type = JSON_TYPE_MAPPING.get(type(cursor), type(cursor).__name__)
required_type = JSON_TYPE_MAPPING[expected_type]
message = f'Cannot perform {path.kind.to_string()!r} based access on '
message += repr(''.join(path.reconstruct() for path in paths[:index]))
message += f' which has a type of {cursor_type!r} but this operation'
message += f' requires a type of {required_type!r}.'
raise NestedJSONSyntaxError(
source=key,
token=pseudo_token,
message=message,
message_kind='Type',
)
def object_for(kind: PathAction) -> Any:
if kind is PathAction.KEY:
return {}
elif kind in {PathAction.INDEX, PathAction.APPEND}:
return []
else:
assert_cant_happen()
for index, (path, next_path) in enumerate(zip(paths, paths[1:])):
# If there is no context yet, set it.
if cursor is None:
context = cursor = object_for(path.kind)
if path.kind is PathAction.KEY:
type_check(index, path, dict)
if next_path.kind is PathAction.SET:
cursor[path.accessor] = next_path.accessor
break
cursor = cursor.setdefault(path.accessor, object_for(next_path.kind))
elif path.kind is PathAction.INDEX:
type_check(index, path, list)
if path.accessor < 0:
raise NestedJSONSyntaxError(
source=key,
token=path.tokens[1],
message='Negative indexes are not supported.',
message_kind='Value',
)
cursor.extend([None] * (path.accessor - len(cursor) + 1))
if next_path.kind is PathAction.SET:
cursor[path.accessor] = next_path.accessor
break
if cursor[path.accessor] is None:
cursor[path.accessor] = object_for(next_path.kind)
cursor = cursor[path.accessor]
elif path.kind is PathAction.APPEND:
type_check(index, path, list)
if next_path.kind is PathAction.SET:
cursor.append(next_path.accessor)
break
cursor.append(object_for(next_path.kind))
cursor = cursor[-1]
else:
assert_cant_happen()
return context
def wrap_with_dict(context):
if context is None:
return {}
elif isinstance(context, list):
return {
EMPTY_STRING: NestedJSONArray(context),
}
else:
assert isinstance(context, dict)
return context
def unwrap_top_level_list_if_needed(data: dict):
"""
Propagate the top-level list, if thats what we got.
"""
if len(data) == 1:
key, value = list(data.items())[0]
if isinstance(value, NestedJSONArray):
assert key == EMPTY_STRING
return value
return data