"""
Composable functions for extracting data using
`lxml <http://lxml.de/>`_.
"""
from __future__ import absolute_import, unicode_literals, print_function
import wex.py2compat ; assert wex.py2compat
import logging
import codecs
from itertools import islice, chain
from copy import deepcopy
from operator import methodcaller, itemgetter
from six import string_types
from six.moves import map, reduce
from six.moves.urllib_parse import urljoin, quote, unquote
from lxml.etree import (XPath,
_ElementTree,
_Element,
Element,
FunctionNamespace)
from lxml.cssselect import CSSSelector
from lxml.html import XHTML_NAMESPACE, HTMLParser
from .composed import composable, Composable
from .cache import cached
from .iterable import _do_not_iter_append, filter_if_iter
from .ncr import replace_invalid_ncr
from .url import URL, public_suffix
NEWLINE = u'\n'
EMPTY = u''
SPACE = u' '
# we do not want to flatten etree elements
_do_not_iter_append(_Element)
UNPARSEABLE = Element('unparseable')
base_href = XPath('//base[@href]/@href | //x:base[@href]/@href',
namespaces={'x': XHTML_NAMESPACE})
default_namespaces = {'re': 'http://exslt.org/regular-expressions'}
# see http://lxml.de/extensions.html#the-functionnamespace
function_namespace = FunctionNamespace(None)
_html_text_nodes = XPath(
'descendant-or-self::node()' +
'[not(local-name()) or not(text())]' +
'[not(ancestor::script or ancestor::style)]'
)
def _wex_html_text(context, arg=None):
if arg is None:
arg = [context.context_node]
html_text = []
for node in chain.from_iterable(map(_html_text_nodes, arg)):
tag = getattr(node, 'tag', None)
if tag is None:
html_text.append(node)
elif tag == 'br':
html_text.append(NEWLINE)
else:
html_text.append(EMPTY)
return EMPTY.join(html_text)
function_namespace['wex-html-text'] = _wex_html_text
def create_html_parser(headers):
charset = headers.get_content_charset()
try:
if charset and codecs.lookup(charset).name == 'iso8859-1':
charset = 'windows-1252'
except LookupError:
pass
# if charset is not specified in the Content-Type, this will be
# None ; encoding=None produces default (ISO 8859-1) behavior.
return HTMLParser(encoding=charset)
@composable
@cached
def parse(src):
""" Returns an element tree create by `LXML <http://lxml.de/>`_.
:param src: A readable object such as a :class:`wex.response.Response`.
"""
if not hasattr(src, 'read'):
return src
parser = create_html_parser(src.headers)
etree = _ElementTree()
try:
# Sometimes we get URLs containing characters that aren't
# acceptable to lxml (e.g. "http:/foo.com/bar?this=array[]").
# When this happens lxml will quote the whole URL.
# We don't want to have to check for this so we just always
# quote it here and then unquote it in the `base_url` function.
quoted_base_url = quote(src.url) if src.url else src.url
fp = replace_invalid_ncr(src)
etree.parse(fp, parser=parser, base_url=quoted_base_url)
except IOError as exc:
logger = logging.getLogger(__name__)
logger.warning("IOError parsing %s (%s)", src.url, exc)
root = etree.getroot()
if root is None:
etree._setroot(UNPARSEABLE)
return etree
@cached
def get_base_url_from_root(root):
if root.base_url:
# see :func:`.parse` for why we need to unquote
base_url = unquote(root.base_url)
else:
base_url = root.base_url
return reduce(urljoin, base_href(root)[:1], base_url)
def get_base_url(elem_or_tree):
if hasattr(elem_or_tree, 'getroottree'):
tree = elem_or_tree.getroottree()
else:
# if it doesn't have getroottree() we presume it's a tree!
tree = elem_or_tree
return get_base_url_from_root(tree.getroot())
class map_if_list(Composable):
def __init__(self, func):
self.func = func
def __repr__(self):
return '%s(%r)' % (self.__class__, self.func)
def __compose__(self):
return (self,)
def __call__(self, *args, **kwargs):
if args and isinstance(args[0], list):
return [res for res in map(self.func, *args, **kwargs)]
return self.func(*args, **kwargs)
[docs]def css(expression):
""" Returns a :func:`composable <wex.composed.composable>` callable that
will select elements defined by a
`CSS selector <http://en.wikipedia.org/wiki/Cascading_Style_Sheets#Selector>`_
expression.
:param expression: The CSS selector expression.
The callable returned accepts a :class:`wex.response.Response`, a
list of elements or an individual element as an argument.
"""
return parse | map_if_list(CSSSelector(expression))
[docs]def xpath(expression, namespaces=default_namespaces):
""" Returns :func:`composable <wex.composed.composable>` callable that will
select elements defined by an
`XPath <http://en.wikipedia.org/wiki/XPath>`_ expression.
:param expression: The XPath expression.
:param namespaces: The namespaces.
The callable returned accepts a :class:`wex.response.Response`, a
list of elements or an individual element as an argument.
For example:
.. code-block:: pycon
>>> from lxml.html import fromstring
>>> tree = fromstring('<h1>Hello</h1>')
>>> selector = xpath('//h1')
"""
return parse | map_if_list(XPath(expression, namespaces=namespaces))
def attrib(name, default=None):
getter = methodcaller('get', name, default)
return map_if_list(getter)
[docs]def base_url_pair_getter(get_url):
""" Returns a function for gettting a tuple of `(base_url, url)` when
called with an etree `Element` or `ElementTree`.
In the returned pair `base_url` is the value returned from
`:func:get_base_url` on the etree `Element` or `ElementTree`.
There second value is the value returned by calling the `get_url`
on the same the same etree `Element` or `ElementTree`, joined to
the `base_url` using `urljoin`. This allows `get_url` to return
a relative URL.
"""
@composable
def get_base_url_pair(elem_or_tree):
base_url = get_base_url(elem_or_tree)
url = get_url(elem_or_tree)
if url:
url = URL(urljoin(base_url, url.strip()))
return (URL(base_url), url)
return get_base_url_pair
[docs]def same_domain(url_pair):
""" Return second url of pair if both are from same domain. """
if not all(url_pair):
return None
base_url, url = map(URL, islice(url_pair, 2))
if base_url.parsed[:2] == url.parsed[:2]:
return url
return None
[docs]def same_suffix(url_pair):
""" Return second url of pair if both have the same public suffix. """
if not all(url_pair):
return None
base_url, url = map(URL, islice(url_pair, 2))
if url.parsed.hostname is None:
return None
base_suffix = public_suffix(base_url)
dot_suffix = '.' + base_suffix
dot_hostname = '.' + url.parsed.hostname
if dot_hostname.endswith(dot_suffix):
return url
src_base_url_pair = base_url_pair_getter(methodcaller('get', 'src'))
href_base_url_pair = base_url_pair_getter(methodcaller('get', 'href'))
# helpers that operate on exactly one element
src_url_1 = src_base_url_pair | itemgetter(1)
href_url_1 = href_base_url_pair | same_domain
href_url_same_suffix_1 = href_base_url_pair | same_suffix
href_any_url_1 = href_base_url_pair | itemgetter(1)
#: A :class:`wex.composed.ComposedFunction` that returns the absolute
#: URL from an ``href`` attribute as long as it is from the same domain
#: as the base URl of the response.
href_url = map_if_list(href_url_1) | filter_if_iter(bool)
#: A :class:`wex.composed.ComposedFunction` that returns the absolute
#: URL from an ``href`` attribute as long as it is from the same
#: `public suffix <https://publicsuffix.org/>`_
#: as the base URl of the response.
href_url_same_suffix = (map_if_list(href_url_same_suffix_1) |
filter_if_iter(bool))
#: A :class:`wex.composed.ComposedFunction` that returns the absolute
#: URL from an ``href`` attribute.
href_any_url = map_if_list(href_any_url_1) | filter_if_iter(bool)
#: A :class:`wex.composed.ComposedFunction` that returns the absolute
#: URL from an ``src`` attribute.
src_url = map_if_list(src_url_1) | filter_if_iter(bool)
[docs]def itertext(*tags, **kw):
""" Return a function that will return an iterator for text. """
with_tail=kw.pop('with_tail', True)
if kw:
raise ValueError('unexpected keyword arguments %s' % kw.keys())
@composable
def _itertext(src):
if hasattr(src, 'itertext'):
return src.itertext(*tags, with_tail=with_tail)
elif hasattr(src, '__iter__') and not isinstance(src, string_types):
text_nodes = (_itertext(i) for i in src)
return chain.from_iterable(text_nodes)
raise ValueError("%r is not iterable" % src)
return _itertext
[docs]def drop_tree(*selectors):
""" Return a function that will remove trees selected by `selectors`. """
@map_if_list
def tree_dropper(src):
copied = None
for selector in selectors:
selected = selector(copied if copied is not None else src)
if selected and copied is None:
copied = deepcopy(src)
selected = selector(copied)
for selection in selected:
selection.drop_tree()
return copied if copied is not None else src
return tree_dropper
@map_if_list
def normalize_space(obj):
""" Normalize space according to standard Python rules.
The definition of what is space used in XPath's
`normalize-space <http://www.w3.org/TR/xpath/#function-normalize-space>`_
is a small subset of the characters defined as space in the
`unicode <https://en.wikipedia.org/wiki/Whitespace_character#Unicode>`_
rules that Python uses.
"""
if hasattr(obj, 'split'):
obj_as_text = obj
else:
obj_as_text = text_content(obj)
return SPACE.join(obj_as_text.split())
def list2set(obj):
if isinstance(obj, list):
return set(obj)
return obj
#: Return text content from an object (typically node-set) excluding from
#: content from within `<script>` or `<style>` elements.
text_content = xpath('wex-html-text(.)')
#: Alias for `normalize-space | list2set`
text = normalize_space | list2set