httpie-cli/tests/utils/matching/parsing.py

97 lines
2.4 KiB
Python
Raw Normal View History

import re
from typing import Iterable
from httpie.output.writer import MESSAGE_SEPARATOR
from .tokens import Expect
from ...utils import CRLF
SEPARATOR_RE = re.compile(f'^{MESSAGE_SEPARATOR}')
def make_headers_re(message_type: Expect):
assert message_type in {Expect.REQUEST_HEADERS, Expect.RESPONSE_HEADERS}
# language=RegExp
crlf = r'[\r][\n]'
non_crlf = rf'[^{CRLF}]'
# language=RegExp
http_version = r'HTTP/\d+\.\d+'
if message_type is Expect.REQUEST_HEADERS:
# POST /post HTTP/1.1
start_line_re = fr'{non_crlf}*{http_version}{crlf}'
else:
# HTTP/1.1 200 OK
start_line_re = fr'{http_version}{non_crlf}*{crlf}'
return re.compile(
fr'''
^
{start_line_re}
({non_crlf}+:{non_crlf}+{crlf})+
{crlf}
''',
flags=re.VERBOSE
)
BODY_ENDINGS = [
MESSAGE_SEPARATOR,
CRLF, # Not really but useful for testing (just remember not to include it in a body).
]
TOKEN_REGEX_MAP = {
Expect.REQUEST_HEADERS: make_headers_re(Expect.REQUEST_HEADERS),
Expect.RESPONSE_HEADERS: make_headers_re(Expect.RESPONSE_HEADERS),
Expect.SEPARATOR: SEPARATOR_RE,
}
class OutputMatchingError(ValueError):
pass
def expect_tokens(tokens: Iterable[Expect], s: str):
for token in tokens:
s = expect_token(token, s)
if s:
raise OutputMatchingError(f'Unmatched remaining output for {tokens} in {s!r}')
def expect_token(token: Expect, s: str) -> str:
if token is Expect.BODY:
s = expect_body(s)
else:
s = expect_regex(token, s)
return s
def expect_regex(token: Expect, s: str) -> str:
match = TOKEN_REGEX_MAP[token].match(s)
if not match:
raise OutputMatchingError(f'No match for {token} in {s!r}')
return s[match.end():]
def expect_body(s: str) -> str:
"""
We require some text, and continue to read until we find an ending or until the end of the string.
"""
if 'content-disposition:' in s.lower():
# Multipart body heuristic.
final_boundary_re = re.compile('\r\n--[^-]+?--\r\n')
match = final_boundary_re.search(s)
if match:
return s[match.end():]
endings = [s.index(sep) for sep in BODY_ENDINGS if sep in s]
if not endings:
s = '' # Only body
else:
end = min(endings)
if end == 0:
raise OutputMatchingError(f'Empty body: {s!r}')
s = s[end:]
return s