Code Coverage for nltk.data
Untested Functions
|
Partially Tested Functions
|
"""
Functions to find and load NLTK X{resource files}, such as corpora,
grammars, and saved processing objects. Resource files are identified
using URLs, such as"C{nltk:corpora/abc/rural.txt}" or
"C{http://nltk.org/sample/toy.cfg}". The following URL protocols are
supported:
- "C{file:I{path}}": Specifies the file whose path is C{I{path}}.
Both relative and absolute paths may be used.
- "C{http://I{host}/{path}}": Specifies the file stored on the web
server C{I{host}} at path C{I{path}}.
- "C{nltk:I{path}}": Specifies the file stored in the NLTK data
package at C{I{path}}. NLTK will search for these files in the
directories specified by L{nltk.data.path}.
If no protocol is specified, then the default protocol "C{nltk:}" will
be used.
This module provides to functions that can be used to access a
resource file, given its URL: L{load()} loads a given resource, and
adds it to a resource cache; and L{retrieve()} copies a given resource
to a local file.
"""
import sys
import os, os.path
import textwrap
import weakref
import yaml
import re
import urllib2
import zipfile
import codecs
import gzip
try:
import cPickle as pickle
except:
import pickle
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
from nltk import cfg, sem
path = []
"""A list of directories where the NLTK data package might reside.
These directories will be checked in order when looking for a
resource in the data package. Note that this allows users to
substitute in their own versions of resources, if they have them
(e.g., in their home directory under ~/nltk/data)."""
path += [d for d in os.environ.get('NLTK_CORPORA', '').split(os.pathsep) if d]
path += [d for d in os.environ.get('NLTK_DATA', '').split(os.pathsep) if d]
if os.path.expanduser('~/') != '~/': path += [
os.path.expanduser('~/nltk_data')]
if sys.platform.startswith('win'): path += [
r'C:\nltk_data', r'D:\nltk_data', r'E:\nltk_data',
os.path.join(sys.prefix, 'nltk_data'),
os.path.join(sys.prefix, 'lib', 'nltk_data')]
else: path += [
'/usr/share/nltk_data',
'/usr/local/share/nltk_data',
'/usr/lib/nltk_data',
'/usr/local/lib/nltk_data']
class PathPointer(object):
"""
An abstract base class for 'path pointers,' used by NLTK's data
package to identify specific paths. Two subclasses exist:
L{FileSystemPathPointer} identifies a file that can be accessed
directly via a given absolute path. L{ZipFilePathPointer}
identifies a file contained within a zipfile, that can be accessed
by reading that zipfile.
"""
def open(self, encoding=None):
"""
Return a seekable read-only stream that can be used to read
the contents of the file identified by this path pointer.
@raise IOError: If the path specified by this pointer does
not contain a readable file.
"""
raise NotImplementedError('abstract base class')
def file_size(self):
"""
Return the size of the file pointed to by this path pointer,
in bytes.
@raise IOError: If the path specified by this pointer does
not contain a readable file.
"""
raise NotImplementedError('abstract base class')
def join(self, fileid):
"""
Return a new path pointer formed by starting at the path
identified by this pointer, and then following the relative
path given by C{fileid}. The path components of C{fileid}
should be seperated by forward slashes (C{/}), regardless of
the underlying file system's path seperator character.
"""
raise NotImplementedError('abstract base class')
class FileSystemPathPointer(PathPointer, str):
"""
A path pointer that identifies a file which can be accessed
directly via a given absolute path. C{FileSystemPathPointer} is a
subclass of C{str} for backwards compatibility purposes --
this allows old code that expected C{nltk.data.find()} to expect a
string to usually work (assuming the resource is not found in a
zipfile).
"""
def __init__(self, path):
"""
Create a new path pointer for the given absolute path.
@raise IOError: If the given path does not exist.
"""
path = os.path.abspath(path)
if not os.path.exists(path):
raise IOError('No such file or directory: %r' % path)
self._path = path
str.__init__(self, path)
path = property(lambda self: self._path, doc="""
The absolute path identified by this path pointer.""")
def open(self, encoding=None):
stream = open(self._path, 'rb')
if encoding is not None:
stream = SeekableUnicodeStreamReader(stream, encoding)
return stream
def file_size(self):
return os.stat(self._path).st_size
def join(self, fileid):
path = os.path.join(self._path, *fileid.split('/'))
return FileSystemPathPointer(path)
def __repr__(self):
return 'FileSystemPathPointer(%r)' % self._path
def __str__(self):
return self._path
class GzipFileSystemPathPointer(FileSystemPathPointer):
"""
A subclass of C{FileSystemPathPointer} that identifies a gzip-compressed
file located at a given absolute path. C{GzipFileSystemPathPointer} is
appropriate for loading large gzip-compressed pickle objects efficiently.
"""
BLOCK_SIZE = 2 * 2**20
def open(self, encoding=None):
stream = StringIO()
file = gzip.open(self._path, 'rb')
for line in iter(lambda: file.read(self.BLOCK_SIZE), ''):
stream.write(line)
stream = StringIO(stream.getvalue())
if encoding:
stream = SeekableUnicodeStreamReader(stream, encoding)
return stream
class ZipFilePathPointer(PathPointer):
"""
A path pointer that identifies a file contained within a zipfile,
which can be accessed by reading that zipfile.
"""
def __init__(self, zipfile, entry=''):
"""
Create a new path pointer pointing at the specified entry
in the given zipfile.
@raise IOError: If the given zipfile does not exist, or if it
does not contain the specified entry.
"""
if isinstance(zipfile, basestring):
zipfile = OpenOnDemandZipFile(os.path.abspath(zipfile))
entry = re.sub('(^|/)/+', r'\1', entry)
if entry:
try: zipfile.getinfo(entry)
except: raise IOError('Zipfile %r does not contain %r' %
(zipfile.filename, entry))
self._zipfile = zipfile
self._entry = entry
zipfile = property(lambda self: self._zipfile, doc="""
The C{zipfile.ZipFile} object used to access the zip file
containing the entry identified by this path pointer.""")
entry = property(lambda self: self._entry, doc="""
The name of the file within C{zipfile} that this path
pointer points to.""")
def open(self, encoding=None):
data = self._zipfile.read(self._entry)
stream = StringIO(data)
if encoding is not None:
stream = SeekableUnicodeStreamReader(stream, encoding)
return stream
def file_size(self):
return self._zipfile.getinfo(self._entry).file_size
def join(self, fileid):
entry = '%s/%s' % (self._entry, fileid)
return ZipFilePathPointer(self._zipfile, entry)
def __repr__(self):
return 'ZipFilePathPointer(%r, %r)' % (
self._zipfile.filename, self._entry)
_resource_cache = weakref.WeakValueDictionary()
"""A weakref dictionary used to cache resources so that they won't
need to be loaded more than once."""
def find(resource_name):
"""
Find the given resource from the NLTK data package, and return a
corresponding path name. If the given resource is not found,
raise a C{LookupError}, whose message gives a pointer to the
installation instructions for the NLTK data package.
@type resource_name: C{str}
@param resource_name: The name of the resource to search for.
Resource names are posix-style relative path names, such as
C{'corpora/brown'}. In particular, directory names should
always be separated by the C{'/'} character, which will be
automatically converted to a platform-appropriate path
separator.
@rtype: C{str}
"""
m = re.match('(.*\.zip)/?(.*)$|', resource_name)
zipfile, zipentry = m.groups()
for path_item in path:
if os.path.isfile(path_item) and path_item.endswith('.zip'):
try: return ZipFilePathPointer(path_item, resource_name)
except IOError: continue
elif os.path.isdir(path_item):
if zipfile is None:
p = os.path.join(path_item, *resource_name.split('/'))
if os.path.exists(p):
if p.endswith('.gz'):
return GzipFileSystemPathPointer(p)
else:
return FileSystemPathPointer(p)
else:
p = os.path.join(path_item, *zipfile.split('/'))
if os.path.exists(p):
try: return ZipFilePathPointer(p, zipentry)
except IOError: continue
msg = textwrap.fill(
'Resource %r not found. For installation instructions, '
'please see <http://nltk.org/index.php/Installation>.' %
(resource_name,), initial_indent=' ', subsequent_indent=' ',
width=66)
msg += '\n Searched in:' + ''.join('\n - %r' % d for d in path)
sep = '*'*70
resource_not_found = '\n%s\n%s\n%s' % (sep, msg, sep)
raise LookupError(resource_not_found)
def retrieve(resource_url, filename=None, verbose=True):
"""
Copy the given resource to a local file. If no filename is
specified, then use the URL's filename. If there is already a
file named C{filename}, then raise a C{ValueError}.
@type resource_url: C{str}
@param resource_url: A URL specifying where the resource should be
loaded from. The default protocol is C{"nltk:"}, which searches
for the file in the the NLTK data package.
"""
if filename is None:
if resource_url.startswith('file:'):
filename = os.path.split(filename)[-1]
else:
filename = re.sub(r'(^\w+:)?.*/', '', resource_url)
if os.path.exists(filename):
filename = os.path.abspath(filename)
raise ValueError, "File %r already exists!" % filename
if verbose:
print 'Retrieving %r, saving to %r' % (resource_url, filename)
infile = _open(resource_url)
outfile = open(filename, 'wb')
while True:
s = infile.read(1024*64)
outfile.write(s)
if not s: break
infile.close()
outfile.close()
FORMATS = {
'pickle': "A serialized python object, stored using the pickle module.",
'yaml': "A serialzied python object, stored using the yaml module.",
'cfg': "A context free grammar, parsed by nltk.cfg.parse_cfg().",
'pcfg': "A probabilistic CFG, parsed by nltk.cfg.parse_pcfg().",
'fcfg': "A feature CFG, parsed by nltk.cfg.parse_fcfg().",
'fol': "A list of first order logic expressions, parsed by "
"nltk.sem.parse_fol().",
'val': "A semantic valuation, parsed by nltk.sem.parse_valuation().",
'raw': "The raw (byte string) contents of a file.",
}
AUTO_FORMATS = {
'pickle': 'pickle',
'yaml': 'yaml',
'cfg': 'cfg',
'pcfg': 'pcfg',
'fcfg': 'fcfg',
'fol': 'fol',
'val': 'val'}
def load(resource_url, format='auto', cache=True, verbose=False):
"""
Load a given resource from the NLTK data package. The following
resource formats are currently supported:
- C{'pickle'}
- C{'yaml'}
- C{'cfg'} (context free grammars)
- C{'pcfg'} (probabilistic CFGs)
- C{'fcfg'} (feature-based CFGs)
- C{'fol'} (formulas of First Order Logic)
- C{'val'} (valuation of First Order Logic model)
- C{'raw'}
If no format is specified, C{load()} will attempt to determine a
format based on the resource name's file extension. If that
fails, C{load()} will raise a C{ValueError} exception.
@type resource_url: C{str}
@param resource_url: A URL specifying where the resource should be
loaded from. The default protocol is C{"nltk:"}, which searches
for the file in the the NLTK data package.
@type cache: C{bool}
@param cache: If true, add this resource to a cache. If C{load}
finds a resource in its cache, then it will return it from the
cache rather than loading it. The cache uses weak references,
so a resource wil automatically be expunged from the cache
when no more objects are using it.
@type verbose: C{bool}
@param verbose: If true, print a message when loading a resource.
Messages are not displayed when a resource is retrieved from
the cache.
"""
if cache:
resource_val = _resource_cache.get(resource_url)
if resource_val is not None:
if verbose:
print '<<Using cached copy of %s>>' % (resource_url,)
return resource_val
if verbose:
print '<<Loading %s>>' % (resource_url,)
if format == 'auto':
resource_url_parts = resource_url.split('.')
ext = resource_url_parts[-1]
if ext == 'gz':
ext = resource_url_parts[-2]
format = AUTO_FORMATS.get(ext)
if format is None:
raise ValueError('Could not determine format for %s based '
'on its file\nextension; use the "format" '
'argument to specify the format explicitly.'
% resource_url)
if format == 'pickle':
resource_val = pickle.load(_open(resource_url))
elif format == 'yaml':
resource_val = yaml.load(_open(resource_url))
elif format == 'cfg':
resource_val = cfg.parse_cfg(_open(resource_url).read())
elif format == 'pcfg':
resource_val = cfg.parse_pcfg(_open(resource_url).read())
elif format == 'fcfg':
resource_val = cfg.parse_fcfg(_open(resource_url).read())
elif format == 'fol':
resource_val = sem.parse_fol(_open(resource_url).read())
elif format == 'val':
resource_val = sem.parse_valuation(_open(resource_url).read())
elif format == 'raw':
resource_val = _open(resource_url).read()
else:
assert format not in FORMATS
raise ValueError('Unknown format type!')
if cache:
try:
_resource_cache[resource_url] = resource_val
except TypeError:
pass
return resource_val
def show_cfg(resource_url, escape='##'):
"""
Write out a grammar file, ignoring escaped and empty lines
@type resource_url: C{str}
@param resource_url: A URL specifying where the resource should be
loaded from. The default protocol is C{"nltk:"}, which searches
for the file in the the NLTK data package.
@type escape: C{str}
@param escape: Prepended string that signals lines to be ignored
"""
resource_val = load(resource_url, format='raw', cache=False)
lines = resource_val.splitlines()
for l in lines:
if l.startswith(escape): continue
if re.match('^$', l): continue
print l
def clear_cache():
"""
Remove all objects from the resource cache.
@see: L{load()}
"""
_resource_cache.clear()
def _open(resource_url):
"""
Helper function that returns an open file object for a resource,
given its resource URL. If the given resource URL uses the 'ntlk'
protocol, or uses no protocol, then use L{nltk.data.find} to find
its path, and open it with the given mode; if the resource URL
uses the 'file' protocol, then open the file with the given mode;
otherwise, delegate to C{urllib2.urlopen}.
@type resource_url: C{str}
@param resource_url: A URL specifying where the resource should be
loaded from. The default protocol is C{"nltk:"}, which searches
for the file in the the NLTK data package.
"""
protocol, path = re.match('(?:(\w+):)?(.*)', resource_url).groups()
if protocol is None or protocol.lower() == 'nltk':
return find(path).open()
elif protocol.lower() == 'file':
return open(path, 'rb')
else:
return urllib2.urlopen(resource_url)
class LazyLoader(object):
def __init__(self, path):
self.__path = path
def __load(self):
resource = load(self.__path)
self.__dict__ = resource.__dict__
self.__class__ = resource.__class__
def __getattr__(self, attr):
self.__load()
return getattr(self, attr)
def __repr__(self):
self.__load()
return '%r' % self
class OpenOnDemandZipFile(zipfile.ZipFile):
"""
A subclass of C{zipfile.ZipFile} that closes its file pointer
whenever it is not using it; and re-opens it when it needs to read
data from the zipfile. This is useful for reducing the number of
open file handles when many zip files are being accessed at once.
C{OpenOnDemandZipFile} must be constructed from a filename, not a
file-like object (to allow re-opening). C{OpenOnDemandZipFile} is
read-only (i.e., C{write} and C{writestr} are disabled.
"""
def __init__(self, filename):
if not isinstance(filename, basestring):
raise TypeError('ReopenableZipFile filename must be a string')
zipfile.ZipFile.__init__(self, filename)
assert self.filename == filename
self.close()
def read(self, name):
assert self.fp is None
self.fp = open(self.filename, 'rb')
value = zipfile.ZipFile.read(self, name)
self.close()
return value
def write(self, *args, **kwargs):
"""@raise NotImplementedError: OpenOnDemandZipfile is read-only"""
raise NotImplementedError('OpenOnDemandZipfile is read-only')
def writestr(self, *args, **kwargs):
"""@raise NotImplementedError: OpenOnDemandZipfile is read-only"""
raise NotImplementedError('OpenOnDemandZipfile is read-only')
def __repr__(self):
return 'OpenOnDemandZipFile(%r)' % self.filename
class SeekableUnicodeStreamReader(object):
"""
A stream reader that automatically encodes the source byte stream
into unicode (like C{codecs.StreamReader}); but still supports the
C{seek()} and C{tell()} operations correctly. This is in contrast
to C{codecs.StreamReader}, which provide *broken* C{seek()} and
C{tell()} methods.
This class was motivated by L{StreamBackedCorpusView}, which
makes extensive use of C{seek()} and C{tell()}, and needs to be
able to handle unicode-encoded files.
Note: this class requires stateless decoders. To my knowledge,
this shouldn't cause a problem with any of python's builtin
unicode encodings.
"""
DEBUG = True
def __init__(self, stream, encoding, errors='strict'):
stream.seek(0)
self.stream = stream
"""The underlying stream."""
self.encoding = encoding
"""The name of the encoding that should be used to encode the
underlying stream."""
self.errors = errors
"""The error mode that should be used when decoding data from
the underlying stream. Can be 'strict', 'ignore', or
'replace'."""
self.decode = codecs.getdecoder(encoding)
"""The function that is used to decode byte strings into
unicode strings."""
self.bytebuffer = ''
"""A buffer to use bytes that have been read but have not yet
been decoded. This is only used when the final bytes from
a read do not form a complete encoding for a character."""
self.linebuffer = None
"""A buffer used by L{readline()} to hold characters that have
been read, but have not yet been returned by L{read()} or
L{readline()}. This buffer consists of a list of unicode
strings, where each string corresponds to a single line.
The final element of the list may or may not be a complete
line. Note that the existence of a linebuffer makes the
L{tell()} operation more complex, because it must backtrack
to the beginning of the buffer to determine the correct
file position in the underlying byte stream."""
self._rewind_checkpoint = 0
"""The file position at which the most recent read on the
underlying stream began. This is used, together with
L{_rewind_numchars}, to backtrack to the beginning of
L{linebuffer} (which is required by L{tell()})."""
self._rewind_numchars = None
"""The number of characters that have been returned since the
read that started at L{_rewind_checkpoint}. This is used,
together with L{_rewind_checkpoint}, to backtrack to the
beginning of L{linebuffer} (which is required by
L{tell()})."""
self._bom = self._check_bom()
"""The length of the byte order marker at the beginning of
the stream (or C{None} for no byte order marker)."""
def read(self, size=None):
"""
Read up to C{size} bytes, decode them using this reader's
encoding, and return the resulting unicode string.
@param size: The maximum number of bytes to read. If not
specified, then read as many bytes as possible.
@rtype: C{unicode}
"""
chars = self._read(size)
if self.linebuffer:
chars = ''.join(self.linebuffer) + chars
self.linebuffer = None
self._rewind_numchars = None
return chars
def readline(self, size=None):
"""
Read a line of text, decode it using this reader's encoding,
and return the resulting unicode string.
@param size: The maximum number of bytes to read. If no
newline is encountered before C{size} bytes have been
read, then the returned value may not be a complete line
of text.
"""
if self.linebuffer and len(self.linebuffer) > 1:
line = self.linebuffer.pop(0)
self._rewind_numchars += len(line)
return line
readsize = size or 72
chars = ''
if self.linebuffer:
chars += self.linebuffer.pop()
self.linebuffer = None
while True:
startpos = self.stream.tell() - len(self.bytebuffer)
new_chars = self._read(readsize)
if new_chars and new_chars.endswith('\r'):
new_chars += self._read(1)
chars += new_chars
lines = chars.splitlines(True)
if len(lines) > 1:
line = lines[0]
self.linebuffer = lines[1:]
self._rewind_numchars = len(new_chars)-(len(chars)-len(line))
self._rewind_checkpoint = startpos
break
elif len(lines) == 1:
line0withend = lines[0]
line0withoutend = lines[0].splitlines(False)[0]
if line0withend != line0withoutend:
line = line0withend
break
if not new_chars or size is not None:
line = chars
break
if readsize < 8000:
readsize *= 2
return line
def readlines(self, sizehint=None, keepends=True):
"""
Read this file's contents, decode them using this reader's
encoding, and return it as a list of unicode lines.
@rtype: C{list} of C{unicode}
@param sizehint: Ignored.
@param keepends: If false, then strip newlines.
"""
return self.read().splitlines(keepends)
def next(self):
"""Return the next decoded line from the underlying stream."""
line = self.readline()
if line: return line
else: raise StopIteration
def __iter__(self):
"""Return self"""
return self
def xreadlines(self):
"""Return self"""
return self
closed = property(lambda self: self.stream.closed, doc="""
True if the underlying stream is closed.""")
name = property(lambda self: self.stream.name, doc="""
The name of the underlying stream.""")
mode = property(lambda self: self.stream.mode, doc="""
The mode of the underlying stream.""")
def close(self):
"""
Close the underlying stream.
"""
self.stream.close()
def seek(self, offset, whence=0):
"""
Move the stream to a new file position. If the reader is
maintaining any buffers, tehn they will be cleared.
@param offset: A byte count offset.
@param whence: If C{whence} is 0, then the offset is from the
start of the file (offset should be positive). If
C{whence} is 1, then the offset is from the current
position (offset may be positive or negative); and if 2,
then the offset is from the end of the file (offset should
typically be negative).
"""
if whence == 1:
raise ValueError('Relative seek is not supported for '
'SeekableUnicodeStreamReader -- consider '
'using char_seek_forward() instead.')
self.stream.seek(offset, whence)
self.linebuffer = None
self.bytebuffer = ''
self._rewind_numchars = None
self._rewind_checkpoint = self.stream.tell()
def char_seek_forward(self, offset):
"""
Move the read pointer forward by C{offset} characters.
"""
if offset < 0:
raise ValueError('Negative offsets are not supported')
self.seek(self.tell())
self._char_seek_forward(offset)
def _char_seek_forward(self, offset, est_bytes=None):
"""
Move the file position forward by C{offset} characters,
ignoring all buffers.
@param est_bytes: A hint, giving an estimate of the number of
bytes that will be neded to move foward by C{offset} chars.
Defaults to C{offset}.
"""
if est_bytes is None: est_bytes = offset
bytes = ''
while True:
newbytes = self.stream.read(est_bytes-len(bytes))
bytes += newbytes
chars, bytes_decoded = self._incr_decode(bytes)
if len(chars) == offset:
self.stream.seek(-len(bytes)+bytes_decoded, 1)
return
if len(chars) > offset:
while len(chars) > offset:
est_bytes += offset-len(chars)
chars, bytes_decoded = self._incr_decode(bytes[:est_bytes])
self.stream.seek(-len(bytes)+bytes_decoded, 1)
return
est_bytes += offset - len(chars)
def tell(self):
"""
Return the current file position on the underlying byte
stream. If this reader is maintaining any buffers, then the
returned file position will be the position of the beginning
of those buffers.
"""
if self.linebuffer is None:
return self.stream.tell() - len(self.bytebuffer)
orig_filepos = self.stream.tell()
bytes_read = ( (orig_filepos-len(self.bytebuffer)) -
self._rewind_checkpoint )
buf_size = sum([len(line) for line in self.linebuffer])
est_bytes = (bytes_read * self._rewind_numchars /
(self._rewind_numchars + buf_size))
self.stream.seek(self._rewind_checkpoint)
self._char_seek_forward(self._rewind_numchars, est_bytes)
filepos = self.stream.tell()
if self.DEBUG:
self.stream.seek(filepos)
check1 = self._incr_decode(self.stream.read(50))[0]
check2 = ''.join(self.linebuffer)
assert check1.startswith(check2) or check2.startswith(check1)
self.stream.seek(orig_filepos)
return filepos
def _read(self, size=None):
"""
Read up to C{size} bytes from the underlying stream, decode
them using this reader's encoding, and return the resulting
unicode string. C{linebuffer} is *not* included in the
result.
"""
if size == 0: return u''
if self._bom and self.stream.tell() == 0:
self.stream.read(self._bom)
if size is None:
new_bytes = self.stream.read()
else:
new_bytes = self.stream.read(size)
bytes = self.bytebuffer + new_bytes
chars, bytes_decoded = self._incr_decode(bytes)
if (size is not None) and (not chars) and (len(new_bytes) > 0):
while not chars:
new_bytes = self.stream.read(1)
if not new_bytes: break
bytes += new_bytes
chars, bytes_decoded = self._incr_decode(bytes)
self.bytebuffer = bytes[bytes_decoded:]
return chars
def _incr_decode(self, bytes):
"""
Decode the given byte string into a unicode string, using this
reader's encoding. If an exception is encountered that
appears to be caused by a truncation error, then just decode
the byte string without the bytes that cause the trunctaion
error.
@return: A tuple C{(chars, num_consumed)}, where C{chars} is
the decoded unicode string, and C{num_consumed} is the
number of bytes that were consumed.
"""
while True:
try:
return self.decode(bytes, 'strict')
except UnicodeDecodeError, exc:
if exc.end == len(bytes):
return self.decode(bytes[:exc.start], self.errors)
elif self.errors == 'strict':
raise
else:
return self.decode(bytes, self.errors)
_BOM_TABLE = {
'utf8': [(codecs.BOM_UTF8, None)],
'utf16': [(codecs.BOM_UTF16_LE, 'utf16-le'),
(codecs.BOM_UTF16_BE, 'utf16-be')],
'utf16le': [(codecs.BOM_UTF16_LE, None)],
'utf16be': [(codecs.BOM_UTF16_BE, None)],
'utf32': [(codecs.BOM_UTF32_LE, 'utf32-le'),
(codecs.BOM_UTF32_BE, 'utf32-be')],
'utf32le': [(codecs.BOM_UTF32_LE, None)],
'utf32be': [(codecs.BOM_UTF32_BE, None)],
}
def _check_bom(self):
enc = re.sub('[ -]', '', self.encoding.lower())
bom_info = self._BOM_TABLE.get(enc)
if bom_info:
bytes = self.stream.read(16)
self.stream.seek(0)
for (bom, new_encoding) in bom_info:
if bytes.startswith(bom):
if new_encoding: self.encoding = new_encoding
return len(bom)
return None