Source code for nltk.corpus.reader.xmldocs

# Natural Language Toolkit: XML Corpus Reader
#
# Copyright (C) 2001-2019 NLTK Project
# Author: Steven Bird <[email protected]>
# URL: <http://nltk.org/>
# For license information, see LICENSE.TXT

"""
Corpus reader for corpora whose documents are xml files.

(note -- not named 'xml' to avoid conflicting w/ standard xml package)
"""
from __future__ import print_function, unicode_literals

import codecs

# Use the c version of ElementTree, which is faster, if possible:
try:
    from xml.etree import cElementTree as ElementTree
except ImportError:
    from xml.etree import ElementTree

from six import string_types

from nltk.data import SeekableUnicodeStreamReader
from nltk.tokenize import WordPunctTokenizer
from nltk.internals import ElementWrapper

from nltk.corpus.reader.api import CorpusReader
from nltk.corpus.reader.util import *


[docs]class XMLCorpusReader(CorpusReader): """ Corpus reader for corpora whose documents are xml files. Note that the ``XMLCorpusReader`` constructor does not take an ``encoding`` argument, because the unicode encoding is specified by the XML files themselves. See the XML specs for more info. """ def __init__(self, root, fileids, wrap_etree=False): self._wrap_etree = wrap_etree CorpusReader.__init__(self, root, fileids)
[docs] def xml(self, fileid=None): # Make sure we have exactly one file -- no concatenating XML. if fileid is None and len(self._fileids) == 1: fileid = self._fileids[0] if not isinstance(fileid, string_types): raise TypeError('Expected a single file identifier string') # Read the XML in using ElementTree. elt = ElementTree.parse(self.abspath(fileid).open()).getroot() # If requested, wrap it. if self._wrap_etree: elt = ElementWrapper(elt) # Return the ElementTree element. return elt
[docs] def words(self, fileid=None): """ Returns all of the words and punctuation symbols in the specified file that were in text nodes -- ie, tags are ignored. Like the xml() method, fileid can only specify one file. :return: the given file's text nodes as a list of words and punctuation symbols :rtype: list(str) """ elt = self.xml(fileid) encoding = self.encoding(fileid) word_tokenizer = WordPunctTokenizer() iterator = elt.getiterator() out = [] for node in iterator: text = node.text if text is not None: if isinstance(text, bytes): text = text.decode(encoding) toks = word_tokenizer.tokenize(text) out.extend(toks) return out
[docs] def raw(self, fileids=None): if fileids is None: fileids = self._fileids elif isinstance(fileids, string_types): fileids = [fileids] return concat([self.open(f).read() for f in fileids])
[docs]class XMLCorpusView(StreamBackedCorpusView): """ A corpus view that selects out specified elements from an XML file, and provides a flat list-like interface for accessing them. (Note: ``XMLCorpusView`` is not used by ``XMLCorpusReader`` itself, but may be used by subclasses of ``XMLCorpusReader``.) Every XML corpus view has a "tag specification", indicating what XML elements should be included in the view; and each (non-nested) element that matches this specification corresponds to one item in the view. Tag specifications are regular expressions over tag paths, where a tag path is a list of element tag names, separated by '/', indicating the ancestry of the element. Some examples: - ``'foo'``: A top-level element whose tag is ``foo``. - ``'foo/bar'``: An element whose tag is ``bar`` and whose parent is a top-level element whose tag is ``foo``. - ``'.*/foo'``: An element whose tag is ``foo``, appearing anywhere in the xml tree. - ``'.*/(foo|bar)'``: An wlement whose tag is ``foo`` or ``bar``, appearing anywhere in the xml tree. The view items are generated from the selected XML elements via the method ``handle_elt()``. By default, this method returns the element as-is (i.e., as an ElementTree object); but it can be overridden, either via subclassing or via the ``elt_handler`` constructor parameter. """ #: If true, then display debugging output to stdout when reading #: blocks. _DEBUG = False #: The number of characters read at a time by this corpus reader. _BLOCK_SIZE = 1024 def __init__(self, fileid, tagspec, elt_handler=None): """ Create a new corpus view based on a specified XML file. Note that the ``XMLCorpusView`` constructor does not take an ``encoding`` argument, because the unicode encoding is specified by the XML files themselves. :type tagspec: str :param tagspec: A tag specification, indicating what XML elements should be included in the view. Each non-nested element that matches this specification corresponds to one item in the view. :param elt_handler: A function used to transform each element to a value for the view. If no handler is specified, then ``self.handle_elt()`` is called, which returns the element as an ElementTree object. The signature of elt_handler is:: elt_handler(elt, tagspec) -> value """ if elt_handler: self.handle_elt = elt_handler self._tagspec = re.compile(tagspec + r'\Z') """The tag specification for this corpus view.""" self._tag_context = {0: ()} """A dictionary mapping from file positions (as returned by ``stream.seek()`` to XML contexts. An XML context is a tuple of XML tag names, indicating which tags have not yet been closed.""" encoding = self._detect_encoding(fileid) StreamBackedCorpusView.__init__(self, fileid, encoding=encoding) def _detect_encoding(self, fileid): if isinstance(fileid, PathPointer): try: infile = fileid.open() s = infile.readline() finally: infile.close() else: with open(fileid, 'rb') as infile: s = infile.readline() if s.startswith(codecs.BOM_UTF16_BE): return 'utf-16-be' if s.startswith(codecs.BOM_UTF16_LE): return 'utf-16-le' if s.startswith(codecs.BOM_UTF32_BE): return 'utf-32-be' if s.startswith(codecs.BOM_UTF32_LE): return 'utf-32-le' if s.startswith(codecs.BOM_UTF8): return 'utf-8' m = re.match(br'\s*<\?xml\b.*\bencoding="([^"]+)"', s) if m: return m.group(1).decode() m = re.match(br"\s*<\?xml\b.*\bencoding='([^']+)'", s) if m: return m.group(1).decode() # No encoding found -- what should the default be? return 'utf-8'
[docs] def handle_elt(self, elt, context): """ Convert an element into an appropriate value for inclusion in the view. Unless overridden by a subclass or by the ``elt_handler`` constructor argument, this method simply returns ``elt``. :return: The view value corresponding to ``elt``. :type elt: ElementTree :param elt: The element that should be converted. :type context: str :param context: A string composed of element tags separated by forward slashes, indicating the XML context of the given element. For example, the string ``'foo/bar/baz'`` indicates that the element is a ``baz`` element whose parent is a ``bar`` element and whose grandparent is a top-level ``foo`` element. """ return elt
#: A regular expression that matches XML fragments that do not #: contain any un-closed tags. _VALID_XML_RE = re.compile( r""" [^<]* ( ((<!--.*?-->) | # comment (<![CDATA[.*?]]) | # raw character data (<!DOCTYPE\s+[^\[]*(\[[^\]]*])?\s*>) | # doctype decl (<[^!>][^>]*>)) # tag or PI [^<]*)* \Z""", re.DOTALL | re.VERBOSE, ) #: A regular expression used to extract the tag name from a start tag, #: end tag, or empty-elt tag string. _XML_TAG_NAME = re.compile('<\s*/?\s*([^\s>]+)') #: A regular expression used to find all start-tags, end-tags, and #: emtpy-elt tags in an XML file. This regexp is more lenient than #: the XML spec -- e.g., it allows spaces in some places where the #: spec does not. _XML_PIECE = re.compile( r""" # Include these so we can skip them: (?P<COMMENT> <!--.*?--> )| (?P<CDATA> <![CDATA[.*?]]> )| (?P<PI> <\?.*?\?> )| (?P<DOCTYPE> <!DOCTYPE\s+[^\[^>]*(\[[^\]]*])?\s*>)| # These are the ones we actually care about: (?P<EMPTY_ELT_TAG> <\s*[^>/\?!\s][^>]*/\s*> )| (?P<START_TAG> <\s*[^>/\?!\s][^>]*> )| (?P<END_TAG> <\s*/[^>/\?!\s][^>]*> )""", re.DOTALL | re.VERBOSE, ) def _read_xml_fragment(self, stream): """ Read a string from the given stream that does not contain any un-closed tags. In particular, this function first reads a block from the stream of size ``self._BLOCK_SIZE``. It then checks if that block contains an un-closed tag. If it does, then this function either backtracks to the last '<', or reads another block. """ fragment = '' if isinstance(stream, SeekableUnicodeStreamReader): startpos = stream.tell() while True: # Read a block and add it to the fragment. xml_block = stream.read(self._BLOCK_SIZE) fragment += xml_block # Do we have a well-formed xml fragment? if self._VALID_XML_RE.match(fragment): return fragment # Do we have a fragment that will never be well-formed? if re.search('[<>]', fragment).group(0) == '>': pos = stream.tell() - ( len(fragment) - re.search('[<>]', fragment).end() ) raise ValueError('Unexpected ">" near char %s' % pos) # End of file? if not xml_block: raise ValueError('Unexpected end of file: tag not closed') # If not, then we must be in the middle of a <..tag..>. # If appropriate, backtrack to the most recent '<' # character. last_open_bracket = fragment.rfind('<') if last_open_bracket > 0: if self._VALID_XML_RE.match(fragment[:last_open_bracket]): if isinstance(stream, SeekableUnicodeStreamReader): stream.seek(startpos) stream.char_seek_forward(last_open_bracket) else: stream.seek(-(len(fragment) - last_open_bracket), 1) return fragment[:last_open_bracket] # Otherwise, read another block. (i.e., return to the # top of the loop.)
[docs] def read_block(self, stream, tagspec=None, elt_handler=None): """ Read from ``stream`` until we find at least one element that matches ``tagspec``, and return the result of applying ``elt_handler`` to each element found. """ if tagspec is None: tagspec = self._tagspec if elt_handler is None: elt_handler = self.handle_elt # Use a stack of strings to keep track of our context: context = list(self._tag_context.get(stream.tell())) assert context is not None # check this -- could it ever happen? elts = [] elt_start = None # where does the elt start elt_depth = None # what context depth elt_text = '' while elts == [] or elt_start is not None: if isinstance(stream, SeekableUnicodeStreamReader): startpos = stream.tell() xml_fragment = self._read_xml_fragment(stream) # End of file. if not xml_fragment: if elt_start is None: break else: raise ValueError('Unexpected end of file') # Process each <tag> in the xml fragment. for piece in self._XML_PIECE.finditer(xml_fragment): if self._DEBUG: print('%25s %s' % ('/'.join(context)[-20:], piece.group())) if piece.group('START_TAG'): name = self._XML_TAG_NAME.match(piece.group()).group(1) # Keep context up-to-date. context.append(name) # Is this one of the elts we're looking for? if elt_start is None: if re.match(tagspec, '/'.join(context)): elt_start = piece.start() elt_depth = len(context) elif piece.group('END_TAG'): name = self._XML_TAG_NAME.match(piece.group()).group(1) # sanity checks: if not context: raise ValueError('Unmatched tag </%s>' % name) if name != context[-1]: raise ValueError( 'Unmatched tag <%s>...</%s>' % (context[-1], name) ) # Is this the end of an element? if elt_start is not None and elt_depth == len(context): elt_text += xml_fragment[elt_start : piece.end()] elts.append((elt_text, '/'.join(context))) elt_start = elt_depth = None elt_text = '' # Keep context up-to-date context.pop() elif piece.group('EMPTY_ELT_TAG'): name = self._XML_TAG_NAME.match(piece.group()).group(1) if elt_start is None: if re.match(tagspec, '/'.join(context) + '/' + name): elts.append((piece.group(), '/'.join(context) + '/' + name)) if elt_start is not None: # If we haven't found any elements yet, then keep # looping until we do. if elts == []: elt_text += xml_fragment[elt_start:] elt_start = 0 # If we've found at least one element, then try # backtracking to the start of the element that we're # inside of. else: # take back the last start-tag, and return what # we've gotten so far (elts is non-empty). if self._DEBUG: print(' ' * 36 + '(backtrack)') if isinstance(stream, SeekableUnicodeStreamReader): stream.seek(startpos) stream.char_seek_forward(elt_start) else: stream.seek(-(len(xml_fragment) - elt_start), 1) context = context[: elt_depth - 1] elt_start = elt_depth = None elt_text = '' # Update the _tag_context dict. pos = stream.tell() if pos in self._tag_context: assert tuple(context) == self._tag_context[pos] else: self._tag_context[pos] = tuple(context) return [ elt_handler( ElementTree.fromstring(elt.encode('ascii', 'xmlcharrefreplace')), context, ) for (elt, context) in elts ]