Code Coverage for nltk.corpus.reader.conll
Untested Functions
- ConllCorpusReader: _get_iob_words(), _get_parsed_sent(), _get_srl_instances(), _get_srl_spans(), chunked_words(), iob_sents(), iob_words(), parsed_sents(), raw(), srl_instances(), srl_spans(), tagged(), tagged_sents()
- ConllSRLInstance: __init__(), __repr__(), pprint()
- ConllSRLInstanceList: __init__(), __str__(), _tree2conll(), pprint()
|
Partially Tested Functions
|
"""
Read CoNLL-style chunk files.
"""
from nltk.corpus.reader.util import *
from nltk.corpus.reader.api import *
from nltk import chunk, tree, Tree
import os, codecs
from nltk.internals import deprecated
from nltk import Tree, LazyMap, LazyConcatenation
import textwrap
class ConllCorpusReader(CorpusReader):
"""
A corpus reader for CoNLL-style files. These files consist of a
series of sentences, seperated by blank lines. Each sentence is
encoded using a table (or I{grid}) of values, where each line
corresponds to a single word, and each column corresponds to an
annotation type. The set of columns used by CoNLL-style files can
vary from corpus to corpus; the C{ConllCorpusReader} constructor
therefore takes an argument, C{columntypes}, which is used to
specify the columns that are used by a given corpus.
@todo: Add support for reading from corpora where different
parallel files contain different columns.
@todo: Possibly add caching of the grid corpus view? This would
allow the same grid view to be used by different data access
methods (eg words() and parsed_sents() could both share the
same grid corpus view object).
@todo: Better support for -DOCSTART-. Currently, we just ignore
it, but it could be used to define methods that retrieve a
document at a time (eg parsed_documents()).
"""
WORDS = 'words'
POS = 'pos'
TREE = 'tree'
CHUNK = 'chunk'
NE = 'ne'
SRL = 'srl'
IGNORE = 'ignore'
COLUMN_TYPES = (WORDS, POS, TREE, CHUNK, NE, SRL, IGNORE)
def __init__(self, root, files, columntypes,
chunk_types=None, top_node='S', pos_in_tree=False,
srl_includes_roleset=True, encoding=None,
tree_class=Tree):
for columntype in columntypes:
if columntype not in self.COLUMN_TYPES:
raise ValueError('Bad colum type %r' % columntyp)
if chunk_types is not None: chunk_types = tuple(chunk_types)
self._chunk_types = chunk_types
self._colmap = dict((c,i) for (i,c) in enumerate(columntypes))
self._pos_in_tree = pos_in_tree
self._top_node = top_node
self._srl_includes_roleset = srl_includes_roleset
self._tree_class = tree_class
CorpusReader.__init__(self, root, files, encoding)
def raw(self, files=None):
if files is None: files = self._files
elif isinstance(files, basestring): files = [files]
return concat([self.open(f).read() for f in files])
def words(self, files=None):
self._require(self.WORDS)
return LazyConcatenation(LazyMap(self._get_words, self._grids(files)))
def sents(self, files=None):
self._require(self.WORDS)
return LazyMap(self._get_words, self._grids(files))
def tagged_words(self, files=None):
self._require(self.WORDS, self.POS)
return LazyConcatenation(LazyMap(self._get_tagged_words,
self._grids(files)))
def tagged_sents(self, files=None):
self._require(self.WORDS, self.POS)
return LazyMap(self._get_tagged_words, self._grids(files))
def chunked_words(self, files=None, chunk_types=None):
self._require(self.WORDS, self.POS, self.CHUNK)
if chunk_types is None: chunk_types = self._chunk_types
def get_chunked_words(grid):
return self._get_chunked_words(grid, chunk_types)
return LazyConcatenation(LazyMap(get_chunked_words,
self._grids(files)))
def chunked_sents(self, files=None, chunk_types=None):
self._require(self.WORDS, self.POS, self.CHUNK)
if chunk_types is None: chunk_types = self._chunk_types
def get_chunked_words(grid):
return self._get_chunked_words(grid, chunk_types)
return LazyMap(get_chunked_words, self._grids(files))
def parsed_sents(self, files=None, pos_in_tree=None):
self._require(self.WORDS, self.POS, self.TREE)
if pos_in_tree is None: pos_in_tree = self._pos_in_tree
def get_parsed_sent(grid):
return self._get_parsed_sent(grid, pos_in_tree)
return LazyMap(get_parsed_sent, self._grids(files))
def srl_spans(self, files=None):
self._require(self.SRL)
return LazyMap(self._get_srl_spans, self._grids(files))
def srl_instances(self, files=None, pos_in_tree=None, flatten=True):
self._require(self.WORDS, self.POS, self.TREE, self.SRL)
if pos_in_tree is None: pos_in_tree = self._pos_in_tree
def get_srl_instances(grid):
return self._get_srl_instances(grid, pos_in_tree)
result = LazyMap(get_srl_instances, self._grids(files))
if flatten: result = LazyConcatenation(result)
return result
def iob_words(self, files=None):
"""
@return: a list of word/tag/IOB tuples
@rtype: C{list} of C{tuple}
@param files: the list of files that make up this corpus
@type files: C{None} or C{str} or C{list}
"""
self._require(self.WORDS, self.POS, self.CHUNK)
return LazyConcatenation(LazyMap(self._get_iob_words,
self._grids(files)))
def iob_sents(self, files=None):
"""
@return: a list of lists of word/tag/IOB tuples
@rtype: C{list} of C{list}
@param files: the list of files that make up this corpus
@type files: C{None} or C{str} or C{list}
"""
self._require(self.WORDS, self.POS, self.CHUNK)
return LazyMap(self._get_iob_words, self._grids(files))
def _grids(self, files=None):
return concat([StreamBackedCorpusView(filename, self._read_grid_block,
encoding=enc)
for (filename, enc) in self.abspaths(files, True)])
def _read_grid_block(self, stream):
grids = []
for block in read_blankline_block(stream):
block = block.strip()
if not block: continue
grid = [line.split() for line in block.split('\n')]
if grid[0][self._colmap.get('words', 0)] == '-DOCSTART-':
del grid[0]
for row in grid:
if len(row) != len(grid[0]):
raise ValueError('Inconsistent number of columns:\n%s'
% block)
grids.append(grid)
return grids
def _get_words(self, grid):
return self._get_column(grid, self._colmap['words'])
def _get_tagged_words(self, grid):
return zip(self._get_column(grid, self._colmap['words']),
self._get_column(grid, self._colmap['pos']))
def _get_iob_words(self, grid):
return zip(self._get_column(grid, self._colmap['words']),
self._get_column(grid, self._colmap['pos']),
self._get_column(grid, self._colmap['chunk']))
def _get_chunked_words(self, grid, chunk_types):
words = self._get_column(grid, self._colmap['words'])
pos_tags = self._get_column(grid, self._colmap['pos'])
chunk_tags = self._get_column(grid, self._colmap['chunk'])
stack = [Tree(self._top_node, [])]
for (word, pos_tag, chunk_tag) in zip(words, pos_tags, chunk_tags):
if chunk_tag == 'O':
state, chunk_type = 'O', ''
else:
(state, chunk_type) = chunk_tag.split('-')
if chunk_types is not None and chunk_type not in chunk_types:
state = 'O'
if state == 'I' and chunk_type != stack[-1].node:
state = 'B'
if state in 'BO' and len(stack) == 2:
stack.pop()
if state == 'B':
new_chunk = Tree(chunk_type, [])
stack[-1].append(new_chunk)
stack.append(new_chunk)
stack[-1].append((word, pos_tag))
return stack[0]
def _get_parsed_sent(self, grid, pos_in_tree):
words = self._get_column(grid, self._colmap['words'])
pos_tags = self._get_column(grid, self._colmap['pos'])
parse_tags = self._get_column(grid, self._colmap['tree'])
treestr = ''
for (word, pos_tag, parse_tag) in zip(words, pos_tags, parse_tags):
if word == '(': word = '-LRB-'
if word == ')': word = '-RRB-'
if pos_tag == '(': pos_tag = '-LRB-'
if pos_tag == ')': pos_tag = '-RRB-'
(left, right) = parse_tag.split('*')
right = right.count(')')*')'
treestr += '%s (%s %s) %s' % (left, pos_tag, word, right)
try:
tree = self._tree_class.parse(treestr)
except (ValueError, IndexError):
tree = self._tree_class.parse('(%s %s)' %
(self._top_node, treestr))
if not pos_in_tree:
for subtree in tree.subtrees():
for i, child in enumerate(subtree):
if (isinstance(child, nltk.Tree) and len(child)==1 and
isinstance(child[0], basestring)):
subtree[i] = (child[0], child.node)
return tree
def _get_srl_spans(self, grid):
"""
list of list of (start, end), tag) tuples
"""
if self._srl_includes_roleset:
predicates = self._get_column(grid, self._colmap['srl']+1)
start_col = self._colmap['srl']+2
else:
predicates = self._get_column(grid, self._colmap['srl'])
start_col = self._colmap['srl']+1
num_preds = len([p for p in predicates if p != '-'])
spanlists = []
for i in range(num_preds):
col = self._get_column(grid, start_col+i)
spanlist = []
stack = []
for wordnum, srl_tag in enumerate(col):
(left, right) = srl_tag.split('*')
for tag in left.split('('):
if tag:
stack.append((tag, wordnum))
for i in range(right.count(')')):
(tag, start) = stack.pop()
spanlist.append( ((start, wordnum+1), tag) )
spanlists.append(spanlist)
return spanlists
def _get_srl_instances(self, grid, pos_in_tree):
tree = self._get_parsed_sent(grid, pos_in_tree)
spanlists = self._get_srl_spans(grid)
if self._srl_includes_roleset:
predicates = self._get_column(grid, self._colmap['srl']+1)
rolesets = self._get_column(grid, self._colmap['srl'])
else:
predicates = self._get_column(grid, self._colmap['srl'])
rolesets = [None] * len(predicates)
instances = ConllSRLInstanceList(tree)
for wordnum, predicate in enumerate(predicates):
if predicate == '-': continue
for spanlist in spanlists:
for (start, end), tag in spanlist:
if wordnum in range(start,end) and tag in ('V', 'C-V'):
break
else: continue
break
else:
raise ValueError('No srl column found for %r' % predicate)
instances.append(ConllSRLInstance(tree, wordnum, predicate,
rolesets[wordnum], spanlist))
return instances
def _require(self, *columntypes):
for columntype in columntypes:
if columntype not in self._colmap:
raise ValueError('This corpus does not contain a %s '
'column.' % columntype)
@staticmethod
def _get_column(grid, column_index):
return [grid[i][column_index] for i in range(len(grid))]
@deprecated("Use .raw() or .words() or .tagged_words() or "
".chunked_sents() instead.")
def read(self, items, format='chunked', chunk_types=None):
if format == 'chunked': return self.chunked_sents(items, chunk_types)
if format == 'raw': return self.raw(items)
if format == 'tokenized': return self.words(items)
if format == 'tagged': return self.tagged_words(items)
raise ValueError('bad format %r' % format)
@deprecated("Use .chunked_sents() instead.")
def chunked(self, items, chunk_types=None):
return self.chunked_sents(items, chunk_types)
@deprecated("Use .words() instead.")
def tokenized(self, items):
return self.words(items)
@deprecated("Use .tagged_words() instead.")
def tagged(self, items):
return self.tagged_words(items)
class ConllSRLInstance(object):
"""
An SRL instance from a CoNLL corpus, which identifies and
providing labels for the arguments of a single verb.
"""
def __init__(self, tree, verb_head, verb_stem, roleset, tagged_spans):
self.verb = []
"""A list of the word indices of the words that compose the
verb whose arguments are identified by this instance.
This will contain multiple word indices when multi-word
verbs are used (e.g. 'turn on')."""
self.verb_head = verb_head
"""The word index of the head word of the verb whose arguments
are identified by this instance. E.g., for a sentence that
uses the verb 'turn on,' C{verb_head} will be the word index
of the word 'turn'."""
self.verb_stem = verb_stem
self.roleset = roleset
self.arguments = []
"""A list of C{(argspan, argid)} tuples, specifying the location
and type for each of the arguments identified by this
instance. C{argspan} is a tuple C{start, end}, indicating
that the argument consists of the C{words[start:end]}."""
self.tagged_spans = tagged_spans
"""A list of C{(span, id)} tuples, specifying the location and
type for each of the arguments, as well as the verb pieces,
that make up this instance."""
self.tree = tree
"""The parse tree for the sentence containing this instance."""
self.words = tree.leaves()
"""A list of the words in the sentence containing this
instance."""
for (start, end), tag in tagged_spans:
if tag in ('V', 'C-V'):
self.verb += range(start, end)
else:
self.arguments.append( ((start, end), tag) )
def __repr__(self):
plural = len(self.arguments)!=1 and 's' or ''
return '<ConllSRLInstance for %r with %d argument%s>' % (
(self.verb_stem, len(self.arguments), plural))
def pprint(self):
verbstr = ' '.join(self.words[i][0] for i in self.verb)
hdr = 'SRL for %r (stem=%r):\n' % (verbstr, self.verb_stem)
s = ''
for i, word in enumerate(self.words):
if isinstance(word, tuple): word = word[0]
for (start, end), argid in self.arguments:
if i == start: s += '[%s ' % argid
if i == end: s += '] '
if i in self.verb: word = '<<%s>>' % word
s += word + ' '
return hdr + textwrap.fill(s.replace(' ]', ']'),
initial_indent=' ',
subsequent_indent=' ')
class ConllSRLInstanceList(list):
"""
Set of instances for a single sentence
"""
def __init__(self, tree, instances=()):
self.tree = tree
list.__init__(self, instances)
def __str__(self):
return self.pprint()
def pprint(self, include_tree=False):
for inst in self:
if inst.tree != self.tree:
raise ValueError('Tree mismatch!')
if include_tree:
words = self.tree.leaves()
pos = [None] * len(words)
synt = ['*'] * len(words)
self._tree2conll(self.tree, 0, words, pos, synt)
s = ''
for i in range(len(words)):
if include_tree:
s += '%-20s ' % words[i]
s += '%-8s ' % pos[i]
s += '%15s*%-8s ' % tuple(synt[i].split('*'))
for inst in self:
if i == inst.verb_head:
s += '%-20s ' % inst.verb_stem
break
else:
s += '%-20s ' % '-'
for inst in self:
argstr = '*'
for (start, end), argid in inst.tagged_spans:
if i==start: argstr = '(%s%s' % (argid, argstr)
if i==(end-1): argstr += ')'
s += '%-12s ' % argstr
s += '\n'
return s
def _tree2conll(self, tree, wordnum, words, pos, synt):
assert isinstance(tree, Tree)
if len(tree) == 1 and isinstance(tree[0], basestring):
pos[wordnum] = tree.node
assert words[wordnum] == tree[0]
return wordnum+1
elif len(tree) == 1 and isinstance(tree[0], tuple):
assert len(tree[0]) == 2
pos[wordnum], pos[wordnum] = tree[0]
return wordnum+1
else:
synt[wordnum] = '(%s%s' % (tree.node, synt[wordnum])
for child in tree:
wordnum = self._tree2conll(child, wordnum, words,
pos, synt)
synt[wordnum-1] += ')'
return wordnum
class ConllChunkCorpusReader(ConllCorpusReader):
"""
A ConllCorpusReader whose data file contains three columns: words,
pos, and chunk.
"""
def __init__(self, root, files, chunk_types, encoding=None):
ConllCorpusReader.__init__(
self, root, files, ('words', 'pos', 'chunk'),
chunk_types=chunk_types, encoding=encoding)