The science that has been developed around the facts of language passed through three stages before finding its true and unique object. First something called "grammar" was studied. This study, initiated by the Greeks and continued mainly by the French, was based on logic. It lacked a scientific approach and was detached from language itself. Its only aim was to give rules for distinguishing between correct and incorrect forms; it was a normative discipline, far removed from actual observation, and its scope was limited.
-- Ferdinand de Saussure
Using Hadoop with NLTK
Using NLTK with Hadoop
Given a large data set and domain specific knowledge:
We have a wealth of data and can iterate rapidly!
You're not going to the U.S.A. in that super-zeppelin, Dr. Stoddard?
Hadoop is Java and NLTK is Python, how to make them play?
Comic Attribution: askrahul.com
stdin
and pushed to stdout
import sys
class Mapper(object):
def __init__(self, infile=sys.stdin, separator='\t'):
self.infile = infile
self.sep = separator
def emit(self, key, value):
sys.stdout.write("%s%s%s\n" % (key, self.sep, value))
def map(self):
for line in self:
for word in line.split():
self.emit(word, 1)
def __iter__(self):
for line in self.infile:
yield line
if __name__ == "__main__":
mapper = Mapper()
mapper.map()
import sys
from itertools import groupby
from operator import itemgetter
class Reducer(object):
def __init__(self, infile=sys.stdin, separator="\t"):
self.infile = infile
self.sep = separator
def emit(self, key, value):
sys.stdout.write("%s%s%s\n" % (key, self.sep, value))
def reduce(self):
for current, group in groupby(self, itemgetter(0)):
try:
total = sum(int(count) for current, count in group)
self.emit(current, total)
except ValueError:
pass
def __iter__(self):
for line in self.infile:
yield line.rstrip().split(self.sep, 1)
if __name__ == "__main__":
reducer = Reducer()
reducer.reduce()
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file /home/hduser/mapper.py -mapper /home/hduser/mapper.py \
-file /home/hduser/reducer.py -reducer /home/hduser/reducer.py \
-input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import wordpunct_tokenize
class Mapper(object):
def __init__(self):
if 'stopwords' in self.params:
with open(self.params['stopwords'], 'r') as excludes:
self._stopwords = set(line.strip() for line in excludes)
else:
self._stopwords = None
self.lemmatizer = WordNetLemmatizer()
def __call__(self, key, value):
for word in self.tokenize(value):
if not word in self.stopwords:
yield word, 1
def normalize(self, word):
word = word.lower()
return self.lemmatizer.lemmatize(word)
def tokenize(self, sentence):
for word in wordpunct_tokenize(sentence):
yield self.normalize(word)
@property
def stopwords(self):
if not self._stopwords:
self._stopwords = nltk.corpus.stopwords.words('english')
return self._stopwords
def reducer(key, values):
yield key, sum(values)
def runner(job):
job.additer(Mapper, reducer, reducer)
def starter(prog):
excludes = prog.delopt("stopwords")
if excludes: prog.addopt("param", "stopwords="+excludes)
if __name__ == "__main__":
import dumbo
dumbo.main(runner, starter)
hduser@ubuntu:~$ dumbo start token_count.py \
-input /user/hduser/gutenberg
-output /user/hduser/gutenberg-output
-hadoop $HADOOP_BIN
-hadooplib $HADOOP_CLASSPATH