Source code for wex.url

import os
import errno
import json
import logging
from operator import attrgetter, methodcaller
from hashlib import md5
from contextlib import contextmanager
from six import text_type, string_types, next
from six.moves import filter
from six.moves.urllib_parse import (urlparse,
                                    urlunparse,
                                    parse_qs,
                                    parse_qsl,
                                    urlencode,
                                    unquote,
                                    quote)
from pkg_resources import iter_entry_points
from publicsuffix import PublicSuffixList

from .composed import composable
from .iterable import map_if_iter
from .value import encode_json

logger = logging.getLogger(__name__)



DEFAULT_METHOD = 'get'

if hasattr(os, 'pathconf'):
    PC_NAME_MAX = os.pathconf(os.path.dirname(__file__), 'PC_NAME_MAX')
else:
    PC_NAME_MAX = 255  # pragma: no cover


@contextmanager
def eexist_is_ok():
    try:
        yield
    except OSError as exc:
        if exc.errno != errno.EEXIST:
            raise


[docs]class Method(object): """ Method objects 'get' responses from a url. The Method object looks-up the correct implementation based on its name and the scheme of the url. The default method name is 'get'. Other method names can be specified in the fragment of the url. """ def __init__(self, scheme, name, args=None): self.scheme = scheme self.name = name self.args = args or {} @property def group(self): return 'wex.method.{}'.format(self.scheme)
[docs] def get(self, url, **kw): """ Get responses for 'url'. """ entry_points = iter_entry_points(self.group, self.name) try: ep = next(entry_points) except StopIteration: raise ValueError("Missing method '%s' in '%s'" % (self.name, self.group)) method = ep.load() return method(url, self, **kw)
[docs]class URL(text_type): """ URL objects. """ def __new__(cls, urlstring): if not isinstance(urlstring, string_types): raise ValueError url = super(URL, cls).__new__(cls, urlstring) url.parsed = urlparse(url) return url @property def fragment_dict(self): """ Client side data dict represented as JSON in the fragment. """ if not self.parsed.fragment: return {} if self.parsed.fragment.startswith('%7B'): fragment = unquote(self.parsed.fragment) elif not self.parsed.fragment.startswith('{'): return {} else: fragment = self.parsed.fragment try: data = json.loads(fragment) if not isinstance(data, dict): data = {} except ValueError as exc: logger.error("%s. Unable to parse %r", exc, fragment) data = {} return data def update_fragment_dict(self, **kw): fragment_dict = dict(self.fragment_dict) fragment_dict.update(kw) fragment = encode_json(fragment_dict) return self.__class__(urlunparse(self.parsed._replace(fragment=fragment))) @property def method(self): """ The `Method` for this URL. """ if not self.parsed.scheme: raise ValueError("URL has no scheme") method = self.fragment_dict.get('method', DEFAULT_METHOD) if isinstance(method, string_types): return Method(self.parsed.scheme, method, {}) try: ((name, args),) = method.items() except: raise ValueError("invalid method %r" % method) return Method(self.parsed.scheme, name, args)
[docs] def get(self, **kw): """ Get `url` using the appropriate `Method`. """ return self.method.get(self, **kw)
def mkdirs(self, top): dirpath = top for dirname in self.dirnames(): dirpath = os.path.join(dirpath, dirname) with eexist_is_ok(): os.mkdir(dirpath) return dirpath def dirnames(self): encoded = self.encode('utf-8') hexdigest = md5(encoded).hexdigest() names = [self.parsed.scheme, self.parsed.netloc] names.extend(self.parsed.path.split('/')) if self.parsed.query: names.extend(self.parsed.query.split('&')) names.append(hexdigest) return [quote(name, safe='')[:PC_NAME_MAX] for name in names] # # URL related composable helpers #============================================================ # Work-around for: # https://bitbucket.org/pypa/wheel/issue/120
def open_publicsuffix_txt(): import sys import codecs from pkg_resources import resource_filename basename = 'publicsuffix.txt' paths = [resource_filename('publicsuffix', basename), # for some reason wheel installation puts the file here os.path.join(sys.prefix, basename)] for path in paths: if os.path.exists(path): return codecs.open(path, 'r', 'utf-8') public_suffix_list = PublicSuffixList(open_publicsuffix_txt()) @composable @map_if_iter def url(obj): return getattr(obj, 'url', obj) parse_url = url | map_if_iter(urlparse) url_query = parse_url | map_if_iter(attrgetter('query')) url_path = parse_url | map_if_iter(attrgetter('path')) url_hostname = parse_url | map_if_iter(attrgetter('hostname')) url_query_dict = url_query | map_if_iter(parse_qs) url_query_list = url_query | map_if_iter(parse_qsl) def url_query_param(name, default=[]): return url_query_dict | map_if_iter(methodcaller('get', name, default)) def filter_url_query(*names, **kw): names = set(names) exclude = kw.pop('exclude', False) def included(pair): return pair[0] in names def excluded(pair): return pair[0] not in names if exclude: pred = excluded else: pred = included @composable @map_if_iter def url_query_filter(obj): parsed = parse_url(obj) qsl = list(filter(pred, parse_qsl(parsed.query))) filtered_query = urlencode(qsl) return urlunparse(parsed._replace(query=filtered_query)) return url_query_filter strip_url_query = filter_url_query() @composable @map_if_iter def public_suffix(src): return public_suffix_list.get_public_suffix(url_hostname(src) or src)