Code Coverage for nltk.corpus.reader.xmldocs
Untested Functions
|
Partially Tested Functions
|
"""
Corpus reader for corpora whose documents are xml files.
(note -- not named 'xml' to avoid conflicting w/ standard xml package)
"""
from nltk.corpus.reader.api import CorpusReader
from nltk.corpus.reader.util import *
from nltk.data import SeekableUnicodeStreamReader
from nltk.internals import deprecated, ElementWrapper
import codecs
try: from xml.etree import cElementTree as ElementTree
except ImportError: from nltk.etree import ElementTree
class XMLCorpusReader(CorpusReader):
"""
Corpus reader for corpora whose documents are xml files.
Note that the C{XMLCorpusReader} constructor does not take an
C{encoding} argument, because the unicode encoding is specified by
the XML files themselves. See the XML specs for more info.
"""
def __init__(self, root, files, wrap_etree=False):
self._wrap_etree = wrap_etree
CorpusReader.__init__(self, root, files)
def xml(self, fileid=None):
if fileid is None and len(self._files) == 1:
fileid = self._files[0]
if not isinstance(fileid, basestring):
raise TypeError('Expected a single file identifier string')
elt = ElementTree.parse(self.abspath(fileid).open()).getroot()
if self._wrap_etree:
elt = ElementWrapper(elt)
return elt
def raw(self, files=None):
if files is None: files = self._files
elif isinstance(files, basestring): files = [files]
return concat([self.open(f).read() for f in files])
@deprecated("Use .raw() or .xml() instead.")
def read(self, items=None, format='xml'):
if format == 'raw': return self.raw(items)
if format == 'xml': return self.xml(items)
raise ValueError('bad format %r' % format)
class XMLCorpusView(StreamBackedCorpusView):
"""
A corpus view that selects out specified elements from an XML
file, and provides a flat list-like interface for accessing them.
(Note: C{XMLCorpusView} is not used by L{XMLCorpusReader} itself,
but may be used by subclasses of L{XMLCorpusReader}.)
Every XML corpus view has a X{tag specification}, indicating what
XML elements should be included in the view; and each (non-nested)
element that matches this specification corresponds to one item in
the view. Tag specifications are regular expressions over tag
paths, where a tag path is a list of element tag names, sepaated
by '/', indicating the ancestry of the element. Some examples:
- C{'foo'}: A top-level element whose tag is C{foo}.
- C{'foo/bar'}: An element whose tag is C{bar} and whose parent
is a top-level element whose tag is C{foo}.
- C{'.*/foo'}: An element whose tag is C{foo}, appearing anywhere
in the xml tree.
- C{'.*/(foo|bar)'}: An wlement whose tag is C{foo} or C{bar},
appearing anywhere in the xml tree.
The view items are generated from the selected XML elements via
the method L{handle_elt()}. By default, this method returns the
element as-is (i.e., as an ElementTree object); but it can be
overridden, either via subclassing or via the C{elt_handler}
constructor parameter.
"""
_DEBUG = False
_BLOCK_SIZE = 1024
def __init__(self, filename, tagspec, elt_handler=None):
"""
Create a new corpus view based on a specified XML file.
Note that the C{XMLCorpusView} constructor does not take an
C{encoding} argument, because the unicode encoding is
specified by the XML files themselves.
@type tagspec: C{str}
@param tagspec: A tag specification, indicating what XML
elements should be included in the view. Each non-nested
element that matches this specification corresponds to one
item in the view.
@param elt_handler: A function used to transform each element
to a value for the view. If no handler is specified, then
L{self.handle_elt()} is called, which returns the element
as an ElementTree object. The signature of elt_handler is::
elt_handler(elt, tagspec) -> value
"""
if elt_handler: self.handle_elt = elt_handler
self._tagspec = re.compile(tagspec+r'\Z')
"""The tag specification for this corpus view."""
self._tag_context = {0: ()}
"""A dictionary mapping from file positions (as returned by
C{stream.seek()} to XML contexts. An XML context is a
tuple of XML tag names, indicating which tags have not yet
been closed."""
encoding = self._detect_encoding(filename)
StreamBackedCorpusView.__init__(self, filename, encoding=encoding)
def _detect_encoding(self, filename):
if isinstance(filename, PathPointer):
s = filename.open().readline()
else:
s = open(filename, 'rb').readline()
if s.startswith(codecs.BOM_UTF16_BE):
return 'utf-16-be'
if s.startswith(codecs.BOM_UTF16_LE):
return 'utf-16-le'
if s.startswith(codecs.BOM_UTF32_BE):
return 'utf-32-be'
if s.startswith(codecs.BOM_UTF32_LE):
return 'utf-32-le'
if s.startswith(codecs.BOM_UTF8):
return 'utf-8'
m = re.match(r'\s*<?xml\b.*\bencoding="([^"]+)"', s)
if m: return m.group(1)
m = re.match(r"\s*<?xml\b.*\bencoding='([^']+)'", s)
if m: return m.group(1)
return 'utf-8'
def handle_elt(self, elt, context):
"""
Convert an element into an appropriate value for inclusion in
the view. Unless overridden by a subclass or by the
C{elt_handler} constructor argument, this method simply
returns C{elt}.
@return: The view value corresponding to C{elt}.
@type elt: C{ElementTree}
@param elt: The element that should be converted.
@type context: C{str}
@param context: A string composed of element tags separated by
forward slashes, indicating the XML context of the given
element. For example, the string C{'foo/bar/baz'}
indicates that the element is a C{baz} element whose
parent is a C{bar} element and whose grandparent is a
top-level C{foo} element.
"""
return elt
_VALID_XML_RE = re.compile(r"""
[^<]*
(
((<!--.*?-->) | # comment
(<![CDATA[.*?]]) | # raw character data
(<!DOCTYPE\s+[^\[]*(\[[^\]]*])?\s*>) | # doctype decl
(<[^>]*>)) # tag or PI
[^<]*)*
\Z""",
re.DOTALL|re.VERBOSE)
_XML_TAG_NAME = re.compile('<\s*/?\s*([^\s>]+)')
_XML_PIECE = re.compile(r"""
# Include these so we can skip them:
(?P<COMMENT> <!--.*?--> )|
(?P<CDATA> <![CDATA[.*?]]> )|
(?P<PI> <\?.*?\?> )|
(?P<DOCTYPE> <!DOCTYPE\s+[^\[]*(\[[^\]]*])?\s*> )|
# These are the ones we actually care about:
(?P<EMPTY_ELT_TAG> <\s*[^>/\?!\s][^>]*/\s*> )|
(?P<START_TAG> <\s*[^>/\?!\s][^>]*> )|
(?P<END_TAG> <\s*/[^>/\?!\s][^>]*> )""",
re.DOTALL|re.VERBOSE)
def _read_xml_fragment(self, stream):
"""
Read a string from the given stream that does not contain any
un-closed tags. In particular, this function first reads a
block from the stream of size L{self._BLOCK_SIZE}. It then
checks if that block contains an un-closed tag. If it does,
then this function either backtracks to the last '<', or reads
another block.
"""
fragment = ''
while True:
if isinstance(stream, SeekableUnicodeStreamReader):
startpos = stream.tell()
xml_block = stream.read(self._BLOCK_SIZE)
fragment += xml_block
if self._VALID_XML_RE.match(fragment):
return fragment
if re.search('[<>]', fragment).group(0) == '>':
pos = stream.tell() - (
len(fragment)-re.search('[<>]', fragment).end())
raise ValueError('Unexpected ">" near char %s' % pos)
if not xml_block:
raise ValueError('Unexpected end of file: tag not closed')
last_open_bracket = fragment.rfind('<')
if last_open_bracket > 0:
if self._VALID_XML_RE.match(fragment[:last_open_bracket]):
if isinstance(stream, SeekableUnicodeStreamReader):
stream.seek(startpos)
stream.char_seek_forward(last_open_bracket)
else:
stream.seek(-(len(fragment)-last_open_bracket), 1)
return fragment[:last_open_bracket]
def read_block(self, stream, tagspec=None, elt_handler=None):
"""
Read from C{stream} until we find at least one element that
matches C{tagspec}, and return the result of applying
C{elt_handler} to each element found.
"""
if tagspec is None: tagspec = self._tagspec
if elt_handler is None: elt_handler = self.handle_elt
context = list(self._tag_context.get(stream.tell()))
assert context is not None
elts = []
elt_start = None
elt_depth = None
elt_text = ''
while elts==[] or elt_start is not None:
if isinstance(stream, SeekableUnicodeStreamReader):
startpos = stream.tell()
xml_fragment = self._read_xml_fragment(stream)
if not xml_fragment:
if elt_start is None: break
else: raise ValueError('Unexpected end of file')
for piece in self._XML_PIECE.finditer(xml_fragment):
if self._DEBUG:
print '%25s %s' % ('/'.join(context)[-20:], piece.group())
if piece.group('START_TAG'):
name = self._XML_TAG_NAME.match(piece.group()).group(1)
context.append(name)
if elt_start is None:
if re.match(tagspec, '/'.join(context)):
elt_start = piece.start()
elt_depth = len(context)
elif piece.group('END_TAG'):
name = self._XML_TAG_NAME.match(piece.group()).group(1)
if not context:
raise ValueError('Unmatched tag </%s>' % name)
if name != context[-1]:
raise ValueError('Unmatched tag <%s>...</%s>' %
(context[-1], name))
if elt_start is not None and elt_depth == len(context):
elt_text += xml_fragment[elt_start:piece.end()]
elts.append( (elt_text, '/'.join(context)) )
elt_start = elt_depth = None
elt_text = ''
context.pop()
elif piece.group('EMPTY_ELT_TAG'):
name = self._XML_TAG_NAME.match(piece.group()).group(1)
if elt_start is None:
if re.match(tagspec, '/'.join(context)+'/'+name):
elts.append((piece.group(),
'/'.join(context)+'/'+name))
if elt_start is not None:
if elts == []:
elt_text += xml_fragment[elt_start:]
elt_start = 0
else:
if self._DEBUG:
print ' '*36+'(backtrack)'
if isinstance(stream, SeekableUnicodeStreamReader):
stream.seek(startpos)
stream.char_seek_forward(elt_start)
else:
stream.seek(-(len(xml_fragment)-elt_start), 1)
context = context[:elt_depth-1]
elt_start = elt_depth = None
elt_text = ''
pos = stream.tell()
if pos in self._tag_context:
assert tuple(context) == self._tag_context[pos]
else:
self._tag_context[pos] = tuple(context)
return [elt_handler(ElementTree.fromstring(
elt.encode('ascii', 'xmlcharrefreplace')),
context)
for (elt, context) in elts]