Source code for wex.http

""" Functions for getting responses for HTTP urls. """

from __future__ import unicode_literals, print_function
import wex.py2compat ; assert wex.py2compat
import io
import requests
from requests.sessions import merge_setting
from gzip import GzipFile
from .readable import ChainedReadable


GZIP_MAGIC = b'\x1f\x8b'
CRLF = '\r\n'
timeout = 30.0


[docs]def request(url, method, session=None, **kw): """ Makes an HTTP request following redirects. """ if session is None: session = requests.Session() session.stream = True decode_content = kw.get('decode_content', True) proxies = kw.get('proxies', None) headers = merge_setting(method.args.get('headers'), kw.get('headers')) context = kw.get('context', {}) response = session.request( method.name, url, allow_redirects=False, cookies=method.args.get('cookies', None), data=method.args.get('data', None), headers=headers, params=method.args.get('params', None), proxies=proxies, timeout=timeout, ) yield readable_from_response(response, url, decode_content, context) redirects = session.resolve_redirects(response, response.request, proxies=proxies, stream=True, timeout=timeout) for redirect in redirects: yield readable_from_response(redirect, url, decode_content, context)
[docs]def readable_from_response(response, url, decode_content, context): """ Make an object that is readable by `Response`.from_file. """ headers = io.TextIOWrapper(io.BytesIO(), encoding='utf-8', newline='\n') protocol = 'HTTP' version = '{:.1f}'.format(response.raw.version / 10.0) code = response.status_code reason = response.reason status_line = format_status_line(protocol, version, code, reason) response.raw.decode_content = decode_content magic_bytes = response.raw.read(2) headers.write(status_line) for name, value in response.headers.items(): headers.write(format_header(name.capitalize(), value)) headers.write(format_header('X-wex-request-url', url)) if response.url != url: # the URL for the response is not the same as the requested URL headers.write(format_header('X-wex-url', response.url)) if magic_bytes == GZIP_MAGIC: headers.write(format_header('X-wex-has-gzip-magic', '1')) for name, value in context.items(): headers.write(format_header('X-wex-context-{}'.format(name), value)) headers.write(CRLF) headers.seek(0) return ChainedReadable(headers.detach(), io.BytesIO(magic_bytes), response.raw)
def decode(src): content_encoding = src.headers.get('content-encoding', '') has_gzip_magic = (src.headers.get('X-wex-has-gzip-magic', '0') == '1') gzip = ( src.headers.get_content_subtype() == 'x-gzip' or content_encoding == 'x-gzip' or (content_encoding == 'gzip' and has_gzip_magic) ) if gzip: src.seek(0) return GzipFile(fileobj=src.fp, mode='rb') return src format_status_line = '{}/{} {} {}\r\n'.format format_header = '{}: {}\r\n'.format