"""Output streaming, processing and formatting. """ import json from functools import partial from itertools import chain import pygments from pygments import token, lexer from pygments.styles import get_style_by_name, STYLE_MAP from pygments.lexers import get_lexer_for_mimetype, get_lexer_by_name from pygments.formatters.terminal import TerminalFormatter from pygments.formatters.terminal256 import Terminal256Formatter from pygments.util import ClassNotFound from requests.compat import is_windows from .solarized import Solarized256Style from .models import HTTPRequest, HTTPResponse, Environment from .input import (OUT_REQ_BODY, OUT_REQ_HEAD, OUT_RESP_HEAD, OUT_RESP_BODY) # Colors on Windows via colorama aren't that great and fruity # seems to give the best result there. DEFAULT_STYLE = 'solarized' if not is_windows else 'fruity' #noinspection PySetFunctionToLiteral AVAILABLE_STYLES = set([DEFAULT_STYLE]) | set(STYLE_MAP.keys()) BINARY_SUPPRESSED_NOTICE = ( b'\n' b'+-----------------------------------------+\n' b'| NOTE: binary data not shown in terminal |\n' b'+-----------------------------------------+' ) class BinarySuppressedError(Exception): """An error indicating that the body is binary and won't be written, e.g., for terminal output).""" message = BINARY_SUPPRESSED_NOTICE ############################################################################### # Output Streams ############################################################################### def write(stream, outfile, flush): """Write the output stream.""" try: # Writing bytes so we use the buffer interface (Python 3). buf = outfile.buffer except AttributeError: buf = outfile for chunk in stream: buf.write(chunk) if flush: outfile.flush() def output_stream(args, env, request, response): """Build and return a chain of iterators over the `request`-`response` exchange each of which yields `bytes` chunks. """ Stream = make_stream(env, args) req_h = OUT_REQ_HEAD in args.output_options req_b = OUT_REQ_BODY in args.output_options resp_h = OUT_RESP_HEAD in args.output_options resp_b = OUT_RESP_BODY in args.output_options req = req_h or req_b resp = resp_h or resp_b output = [] if req: output.append(Stream( msg=HTTPRequest(request), with_headers=req_h, with_body=req_b)) if req and resp: output.append([b'\n\n\n']) if resp: output.append(Stream( msg=HTTPResponse(response), with_headers=resp_h, with_body=resp_b)) if env.stdout_isatty: output.append([b'\n\n']) return chain(*output) def make_stream(env, args): """Pick the right stream type based on `env` and `args`. Wrap it in a partial with the type-specific args so that we don't need to think what stream we are dealing with. """ if not env.stdout_isatty and not args.prettify: Stream = partial( RawStream, chunk_size=RawStream.CHUNK_SIZE_BY_LINE if args.stream else RawStream.CHUNK_SIZE) elif args.prettify: Stream = partial( PrettyStream if args.stream else BufferedPrettyStream, processor=OutputProcessor(env, pygments_style=args.style), env=env) else: Stream = partial(EncodedStream, env=env) return Stream class BaseStream(object): """Base HTTP message stream class.""" def __init__(self, msg, with_headers=True, with_body=True): """ :param msg: a :class:`models.HTTPMessage` subclass :param with_headers: if `True`, headers will be included :param with_body: if `True`, body will be included """ self.msg = msg self.with_headers = with_headers self.with_body = with_body def _headers(self): """Return the headers' bytes.""" return self.msg.headers.encode('ascii') def _body(self): """Return an iterator over the message body.""" raise NotImplementedError() def __iter__(self): """Return an iterator over `self.msg`.""" if self.with_headers: yield self._headers() if self.with_body: it = self._body() try: if self.with_headers: # Yield the headers/body separator only if needed. chunk = next(it) if chunk: yield b'\n\n' yield chunk for chunk in it: yield chunk except BinarySuppressedError as e: if self.with_headers: yield b'\n' yield e.message class RawStream(BaseStream): """The message is streamed in chunks with no processing.""" CHUNK_SIZE = 1024 * 100 CHUNK_SIZE_BY_LINE = 1024 * 5 def __init__(self, chunk_size=CHUNK_SIZE, **kwargs): super(RawStream, self).__init__(**kwargs) self.chunk_size = chunk_size def _body(self): return self.msg.iter_body(self.chunk_size) class EncodedStream(BaseStream): """Encoded HTTP message stream. The message bytes are converted to an encoding suitable for `self.env.stdout`. Unicode errors are replaced and binary data is suppressed. The body is always streamed by line. """ CHUNK_SIZE = 1024 * 5 def __init__(self, env=Environment(), **kwargs): super(EncodedStream, self).__init__(**kwargs) if env.stdout_isatty: # Use the encoding supported by the terminal. output_encoding = getattr(env.stdout, 'encoding', None) else: # Preserve the message encoding. output_encoding = self.msg.encoding # Default to utf8 when unsure. self.output_encoding = output_encoding or 'utf8' def _body(self): for line, lf in self.msg.iter_lines(self.CHUNK_SIZE): if b'\0' in line: raise BinarySuppressedError() yield line.decode(self.msg.encoding)\ .encode(self.output_encoding, 'replace') + lf class PrettyStream(EncodedStream): """In addition to :class:`EncodedStream` behaviour, this stream applies content processing. Useful for long-lived HTTP responses that stream by lines such as the Twitter streaming API. """ CHUNK_SIZE = 1024 * 5 def __init__(self, processor, **kwargs): super(PrettyStream, self).__init__(**kwargs) self.processor = processor def _headers(self): return self.processor.process_headers( self.msg.headers).encode(self.output_encoding) def _body(self): for line, lf in self.msg.iter_lines(self.CHUNK_SIZE): if b'\0' in line: raise BinarySuppressedError() yield self._process_body(line) + lf def _process_body(self, chunk): return (self.processor .process_body( chunk.decode(self.msg.encoding, 'replace'), self.msg.content_type) .encode(self.output_encoding, 'replace')) class BufferedPrettyStream(PrettyStream): """The same as :class:`PrettyStream` except that the body is fully fetched before it's processed. Suitable regular HTTP responses. """ CHUNK_SIZE = 1024 * 10 def _body(self): #noinspection PyArgumentList # Read the whole body before prettifying it, # but bail out immediately if the body is binary. body = bytearray() for chunk in self.msg.iter_body(self.CHUNK_SIZE): if b'\0' in chunk: raise BinarySuppressedError() body.extend(chunk) yield self._process_body(body) ############################################################################### # Processing ############################################################################### class HTTPLexer(lexer.RegexLexer): """Simplified HTTP lexer for Pygments. It only operates on headers and provides a stronger contrast between their names and values than the original one bundled with Pygments (:class:`pygments.lexers.text import HttpLexer`), especially when Solarized color scheme is used. """ name = 'HTTP' aliases = ['http'] filenames = ['*.http'] tokens = { 'root': [ # Request-Line (r'([A-Z]+)( +)([^ ]+)( +)(HTTP)(/)(\d+\.\d+)', lexer.bygroups( token.Name.Function, token.Text, token.Name.Namespace, token.Text, token.Keyword.Reserved, token.Operator, token.Number )), # Response Status-Line (r'(HTTP)(/)(\d+\.\d+)( +)(\d{3})( +)(.+)', lexer.bygroups( token.Keyword.Reserved, # 'HTTP' token.Operator, # '/' token.Number, # Version token.Text, token.Number, # Status code token.Text, token.Name.Exception, # Reason )), # Header (r'(.*?)( *)(:)( *)(.+)', lexer.bygroups( token.Name.Attribute, # Name token.Text, token.Operator, # Colon token.Text, token.String # Value )) ]} class BaseProcessor(object): """Base, noop output processor class.""" enabled = True def __init__(self, env, **kwargs): """ :param env: an class:`Environment` instance :param kwargs: additional keyword argument that some processor might require. """ self.env = env self.kwargs = kwargs def process_headers(self, headers): """Return processed `headers` :param headers: The headers as text. """ return headers def process_body(self, content, content_type, subtype): """Return processed `content`. :param content: The body content as text :param content_type: Full content type, e.g., 'application/atom+xml'. :param subtype: E.g. 'xml'. """ return content class JSONProcessor(BaseProcessor): """JSON body processor.""" def process_body(self, content, content_type, subtype): if subtype == 'json': try: # Indent the JSON data, sort keys by name, and # avoid unicode escapes to improve readability. content = json.dumps(json.loads(content), sort_keys=True, ensure_ascii=False, indent=4) except ValueError: # Invalid JSON but we don't care. pass return content class PygmentsProcessor(BaseProcessor): """A processor that applies syntax-highlighting using Pygments to the headers, and to the body as well if its content type is recognized. """ def __init__(self, *args, **kwargs): super(PygmentsProcessor, self).__init__(*args, **kwargs) # Cache that speeds up when we process streamed body by line. self.lexers_by_type = {} if not self.env.colors: self.enabled = False return try: style = get_style_by_name(self.kwargs['pygments_style']) except ClassNotFound: style = Solarized256Style if self.env.is_windows or self.env.colors == 256: fmt_class = Terminal256Formatter else: fmt_class = TerminalFormatter self.formatter = fmt_class(style=style) def process_headers(self, headers): return pygments.highlight( headers, HTTPLexer(), self.formatter).strip() def process_body(self, content, content_type, subtype): try: lexer = self.lexers_by_type.get(content_type) if not lexer: try: lexer = get_lexer_for_mimetype(content_type) except ClassNotFound: lexer = get_lexer_by_name(subtype) self.lexers_by_type[content_type] = lexer except ClassNotFound: pass else: content = pygments.highlight(content, lexer, self.formatter) return content.strip() class HeadersProcessor(BaseProcessor): """Sorts headers by name retaining relative order of multiple headers with the same name. """ def process_headers(self, headers): lines = headers.splitlines() headers = sorted(lines[1:], key=lambda h: h.split(':')[0]) return '\n'.join(lines[:1] + headers) class OutputProcessor(object): """A delegate class that invokes the actual processors.""" installed_processors = [ JSONProcessor, HeadersProcessor, PygmentsProcessor ] def __init__(self, env, **kwargs): processors = [ cls(env, **kwargs) for cls in self.installed_processors ] self.processors = [p for p in processors if p.enabled] def process_headers(self, headers): for processor in self.processors: headers = processor.process_headers(headers) return headers def process_body(self, content, content_type): # e.g., 'application/atom+xml' content_type = content_type.split(';')[0] # e.g., 'xml' subtype = content_type.split('/')[-1].split('+')[-1] for processor in self.processors: content = processor.process_body(content, content_type, subtype) return content