This tutorial covers the basic concepts and code of Hadoop.
The Hadoop system makes it easy to parallelize a particular form of computation across a large data set, just like Google's MapReduce. It automates the process of splitting up and coordinating a computation and its data to run on thousands of machines at once. Although the implementation of Hadoop is complex, writing code that uses Hadoop is actually not very hard (hence, a successful abstraction).
This tutorial walks through a few code examples to illustrate the key features. Both examples make use of literature text sources - Shakespeare's complete works and Mark Twain's Huckleberry Finn. These and many others works are available at Project Gutenberg. However, we also encourage you to find and use alternate text sources that are interesting to you, even if they're not available at Project Gutenberg.
To use Hadoop, you write two classes -- a Mapper and a Reducer. The Mapper class contains a map function, which is called once for each input and outputs any number of intermediate <key, value> pairs. What code you put in the map function depends on the problem you are trying to solve. Let's start with a short example.
Suppose the goal is to create an "index" of a body of text -- we are given a text file, and we want to output a list of words annotated with the line-number at which each word appears.
For that problem, an appropriate Map strategy is: for each word in the input, output the pair <word, line-number> For example, suppose we have this five-line High school football coach quote as our input data set:
Running the Map code that for each word, outputs a pair <word, line-number>, yielding the set of pairs...
For now we can think of the <key, value> pairs as a nice linear list, but in reality, the Hadoop process runs in parallel on many machines. Each process has a little part of the overall Map input (called a map shard), and maintains its own local cache of the Map output. (For a description of how it really works, see HadoopMapReduce or the Google White Paper linked to at the end of this document.)
After the Map phase produces the intermediate <key, value> pairs they are efficiently and automatically grouped by key by the Hadoop system in preparation for the Reduce phase (this grouping is known as the Shuffle phase of a map-reduce). For the above example, that means all the "we" pairs are grouped together, all the "are" pairs are grouped together like this, showing each group as a line...
The Reducer class contains a reduce function, which is then called once for each key -- one reduce call for "we", one for "are", and so on. Each reduce looks at all the values for that key and outputs a "summary" value for that key in the final output. So in the above example, the reduce is called once for the "we" key, and passed the values the mapper output, 1, 4, 2, and 5 (the values going into reduce are not in any particular order). Suppose reduce computes a summary value string made of the line numbers sorted into increasing order, then the output of the Reduce phase on the above pairs will produce the pairs shown below. The Reduce phase also sorts the output <key,value> pairs into increasing order by key:
Like Map, Reduce is also run in parallel on a group of machines. Each machine is assigned a subset of the keys to work on (known as a reduce shard), and outputs its results into a separate file. By default these are named "part-#####".
As a first Hadoop code example we will look at a simple "line indexer". All of the example code here is available to be checked out, built, and run at (TODO:specify this path). Given an input text, offset indexer uses Hadoop to produce an index of all the words in the text. For each word, the index has a list of all the locations where the word appears and a text excerpt of each line where the word appears. Running the line indexer on the complete works of Shakespeare yields the following entry for the word "cipher".
The Hadoop code below for the line indexer is actually pretty short. The Map code extracts one word at a time from the input, and the Reduce code combines all the data for one word.
A Java Mapper class is defined in terms of its input and intermediate <key, value> pairs. To declare one, simply subclass from MapReduceBase and implement the Mapper interface. The Mapper interface provides a single method: public void map(WriteableComparable key, Writeable value, OutputCollector output, Reporter reporter). Note: these inner classes probably need to be declared "static". If you get an error saying ClassName.<init>() is not defined, try declaring your class static. The map function takes four parameters which in this example correspond to:
The Hadoop system divides the (large) input data set into logical "records" and then calls map() once for each record. How much data constitutes a record depends on the input data type; For text files, a record is a single line of text. The main method is responsible for setting output key and value types.
Since in this example we want to output <word, offset> pairs, the types will both be Text (a basic string wrapper, with UTF8 support). It is neccessary to wrap the more basic types because all input and output types for Hadoop must implement WritableComparable, which handles the writing and reading from disk.
For the line indexer problem, the map code takes in a line of text and for each word in the line outputs a string key/value pair <word, offset:line>.
The Map code below accomplishes that by...
public static class LineIndexerMapper extends MapReduceBase implements Mapper {
private final static Text word = new Text();
private final static Text summary = new Text();
public void map(WritableComparable key, Writable val,
OutputCollector output, Reporter reporter)
throws IOException {
String line = val.toString();
summary.set(key.toString() + ":" + line);
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, summary);
}
}
}
When run on many machines, each mapper gets part of the input -- so for example with 100 Gigabytes of data on 200 mappers, each mapper would get roughly its own 500 Megabytes of data to go through. On a single mapper, map() is called going through the data in its natural order, from start to finish.
The Map phase outputs <key, value> pairs, but what data makes up the key and value is totally up to the Mapper code. In this case, the Mapper uses each word as a key, so the reduction below ends up with pairs grouped by word. We could instead have chosen to use the line-length as the key, in which case the data in the reduce phase would have been grouped by line length. In fact, the map() code is not required to call output.collect() at all. It may have its own logic to prune out data simply by omitting collect. Pruning things in the Mapper is efficient, since it is highly parallel, and already has the data in memory. By shrinking its output, we shrink the expense of organizing and moving the data in preparation for the Reduce phase.
Defining a Reducer is just as easy. Simply subclass MapReduceBase and implement the Reducer interface: public void reduce(WriteableComparable key, Iterator values, OutputCollector output, Reporter reporter). The reduce() method is called once for each key; the values parameter contains all of the values for that key. The Reduce code looks at all the values and then outputs a single "summary" value.
Given all the values for the key, the Reduce code typically iterates over all the values and either concats the values together in some way to make a large summary object, or combines and reduces the values in some way to yield a short summary value.
The reduce() method produces its final value in the same manner as map() did, by calling output.collect(key, summary). In this way, the Reduce specifies the final output value for the (possibly new) key. It is important to note that when running over text files, the input key is the byte-offset within the file. If the key is propogated to the output, even for an identity map/reduce, the file will be filed with the offset values. Not only does this use up a lot of space, but successive operations on this file will have to eliminate them. For text files, make sure you don't output the key unless you need it (be careful with the IdentityMapper and IdentityReducer).
The line indexer Reducer takes in all the <word, offset> key/value pairs output by the Mapper for a single word. For example, for the word "cipher", the pairs look like: <cipher, 38624:To cipher what is writ in learned books>, <cipher, 12046:To cipher me how fondly I did dote;>, <cipher, ... >.
Given all those <key, value> pairs, the reduce outputs a single value string. For the line indexer problem, the strategy is simply to concat all the values together to make a single large string, using "^" to separate the values. The choice of "^" is arbitrary -- later code can split on the "^" to recover the separate values. So for the key "cipher" the output value string will look like "38624:To cipher what is writ in learned book^12046:To cipher me how fondly I did dote;^34739:Mine were the very cipher of a function,^ ...".
To do this, the Reducer code simply iterates over values to get all the value strings, and concats them together into our output String.
public static class LineIndexerReducer extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
boolean first = true;
StringBuilder toReturn = new StringBuilder();
while(values.hasNext()){
if(!first)
toReturn.append('^');
first=false;
toReturn.append(values.next().toString());
}
output.collect(key, new Text(toReturn.toString()));
}
}
Given the Mapper and Reducer code, the short main() below starts the Map-Reduction running. The Hadoop system picks up a bunch of values from the command line on its own, and then the main() also specifies a few key parameters of the problem in the JobConf object, such as what Map and Reduce classes to use and the format of the input and output files. Other parameters, ie. the number of machines to use, are optional and the system will determine good values for them if not specified.
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("LineIndexer");
// The keys are words (strings):
conf.setOutputKeyClass(Text.class);
// The values are offsets+line (strings):
conf.setOutputValueClass(Text.class);
conf.setMapperClass(LineIndexer.LineIndexerMapper.class);
conf.setReducerClass(LineIndexer.LineIndexerReducer.class);
if (args.length < 2) {
System.out.println("Usage: LineIndexer <input path> <output path>");
System.exit(0);
}
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
To run a Hadoop job, simpy ssh into any of the JobTracker nodes on the cluster. To run the job, it is first necessary to copy the indput data files onto the distributed file system. If the data files are in the localInput/ directory, this is accomplished by executing:
./bin/hadoop dfs -put localInput dfsInput
The files will then be copied onto the dfs into the directory dfsInput. It is important to copy files into a well named directory that is unique. These files can be viewed with
./bin/hadoop dfs -ls dir
where dir is the
name of the directory to be viewed.
You can also use
./bin/hadoop dfs -lsr dir
to
recursively view the directories. Note that all "relative" paths given will be
put in the /users/$USER/[dir] directory.
Make sure that the dfsOutput directory does not already exist, as you will be presented with an error, and your job will not run (This prevents the accidental overwriting of data, but can be overridden).
Now that the data is available to all of the worker machines, the job can be executed from a local jar file:
./bin/hadoop jar OffsetIndexer.jar dfsInput dfsOutput
The job should be run across the worker machines, copying input and intermediate data as needed. The output of the reduce stage will be left in the dfsOutput directory. To copy these files to your local machine in the directory localOutput, execute:
./bin/hadoop dfs -get dfsOutput localOutput
During testing, you may want to run your Map-Reduces locally so as not to adversely affect the compute clusters. This is easily accomplished by adding a line to the main method:
conf.set("mapred.job.tracker", "local");
When you submit your job to run a line will be printed saying:
Running job: job_12345
where 'job_12345'
will correspond to whatever name your job has been given. Further status
information will be printed in that terminal as the job progresses. However, it
is also possible to monitor a job given its name from any node in the cluster.
This is done by the command:
./bin/hadoop/ job -status job_12345
A small amount of status information will be displayed, along with a link to a tracking URL (eg, http://jobtrackermachinename:50050/). This page will be a job-specific status page, and provide links to main status pages for other jobs and the Hadoop cluster itself.
After the Hadoop job finishes and the output is copied to a local machine, it can be analyzed. The line indexer client reads in the index files and presents a simple command line interface -- type a word to pull up that entry from the index, or leave the line blank to quit.
To run the client, execute the program line_indexer.par:
./line_indexer.par ./output/part-00000
The
index file 'part-00000' will then be loaded and the following prompt displayed:
***********************
* Line Indexer
Client *
***********************
Enter a word to query the index
for.
Or 'exit' to quit.
>
Enter a word and the index listings generated by the map reduce will be printed:
***********************
* Line Indexer
Client *
***********************
Enter a word to query the index
for.
Or 'exit' to quit.
> ship
Index for:
ship
119489 But we will ship him hence: and this vile deed
89045 Have lost
my way for ever: I have a ship
89830 I will possess you of that ship and
treasure.
34636 That their designment halts: a noble ship of Venice
34791
Third Gentleman The ship is here put in,
37636 The riches of the
ship is come on shore!
23092 Ere he take ship for France, and in
Southampton.
74453 For there I'll ship them all for Ireland.
131177 Like
to a ship that, having 'scaped a tempest,
108668 And ship from thence to
Flanders.
132817 Whiles, in his moan, the ship splits on the rock,
61495
ANTIGONUS Thou art perfect then, our ship hath touch'd upon
65730 ship boring
the moon with her main-mast, and anon
66517 Clown I
would you had been by the ship side, to have
114335 new ship to purge
melancholy and air himself: for,
17361 Now am I like that proud insulting
ship
112791 FERDINAND The ship is
under sail, and here she comes amain.
48720 And in their ship I am sure
Lorenzo is not.
48868 SALARINO He came too late, the ship was under
sail:
55861 a ship of rich lading wrecked on the narrow seas;
56018 a tall
ship lie buried, as they say, if my gossip
3094 Assure yourself, after our
ship did split,
25148 Make such unquiet, that the ship
51644 So up and
down the poor ship drives:
54363 the wind is loud, and will not lie till the
ship be
92888 Lysimachus our Tyrian ship espies,
7115
PROTEUS Go, go, be gone, to save your ship from wreck,
616
SCENE A ship at Sea: an island.
676 SCENE
I On a ship at sea: a tempestuous noise
2924
GONZALO I'll warrant him for drowning; though the ship
were
4713 It should the good ship so have swallow'd and
14860
PROSPERO Of the king's ship
15389 Supposing that they saw the king's
ship wreck'd
98063 I'll bring you to your ship and so to Naples,
5225 Our
helpful ship was splitted in the midst;
5606 At length, another ship had
seized on us;
45794 If any ship put out, then straight away.
50454 The
ship is in her trim; the merry wind
50657 What ship of Epidamnum stays for
me?
50696 DROMIO OF SYRACUSE A ship
you sent me to, to hire waftage.
Found 42 occurrences.
After playing around with the client, enter 'exit' to shutdown the program.
A funny and more complex example of a Map-Reduction is the Ngram Mimic algorithm. But first, a little terminology. A "bigram" is just a pair of adjacent words in a body of text, so the text "We hold these truths" contains the bigrams "We hold", "hold these", and "these truths". An "ngram" just generalizes the idea to more than two words, so the text contains the 3-grams "We hold these" and "hold these truths". The Ngram Mimic algorithm reads in a body of text and reduces it down to an index of what ngrams appear in the text. The ngram index can then drive the random generation of "mimic" paragraphs that bizarrely resemble the style and content of the original text.
Here is an ngram mimic paragraph based on all the 3-grams in the complete works of Shakespeare...
The mimic paragraphs can appear to make sense if you don't look too closely. In reality, the algorithm is driven only by the statistics of how words tended to appear in the text, and it knows nothing about grammar or semantics (what the words actually mean).
The ngram mimic problem has two parts -- the reading of the original text to create the ngram index, and the generation of random paragraphs based on that index. This section outlines the creation of the ngram index, which is a perfect problem for the map-reduce programming model.
The first goal is to figure a set of <prefix, follow-word> pairs, recording for each occurrence of a prefix in the text, what word followed it. For now, suppose that the prefix is a single word, so we are in effect recording all the bigrams in the text. The Mapper reads through the words in the text, and for each pair of adjacent words, outputs a <prefix-word, follow-word> pair. So with the text
For the mimic algorithm, it works best to count the final period, ".", at the end of each sentence as a separate "word" of its own, as in the final <everything, .> pair. Also, the code will need to take care to get cross-line pairs like <everything, nice>, where part of the data is on one line and part is on the next line.
To generalize the scheme to 3-grams or beyond, just allow the prefix to be made of multiple words. For 3-grams, for each three word sequence "word2 word1 word0" in the text, record that the prefix "word2 word1" was followed by "word0" by outputting the pair <word2 word1, word0>. The 3-grams for the above text are...
The Hadoop system creates the Mapper object and then calls map() with line after line of the input text. To map out the ngram index, read through the words on each line from left to right. The StringTokenizer class provides a simple iterator interface for the parsing. For a sequence of three words "word2 word1 word0", output pairs <word1, word0> and <word2 word1, word0>, and then loop around to read the next word0. By declaring the word0/word1/word2 strings as instance variables, they persist across calls to map() so the words can be used from one line to the next. The Hadoop system calls map() to process the lines in their natural order (line-1, then line-2, then line-3).
public static class MimicMapper extends MapReduceBase implements Mapper {
private String word1, word2, word3;
public MimicMapper() {
word1 = word2 = word3 = null;
}
public void map(WritableComparable key, Writable val,
OutputCollector output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(val.toString());
while (itr.hasMoreTokens()) {
word3 = itr.nextToken();
if (word2 != null) {
// emit the bigram <word2, word3>
output.collect(new Text(word2), new Text(word3));
if (word1 != null)
// emit the trigram <word1 word2, word3>
output.collect(new Text(word1 + " " + word2), new Text(word3));
}
// shift the words over
word1 = word2;
word2 = word3;
}
}
}
To be fair, there is one flaw with the above Hadoop "word1 word0" approach to measuring bi-grams. We do not correctly measure the bi-grams that cross shards. So if one mapper gets a shard of data that ends "they lived happily", and some other mapper gets the next shard of data that begins with the data "everafter, the end." ... well since each mapper only sees its shard of the data, we won't count that "happily" precedes the word "everafter". But really, what's 10 pairs of words out of the collected works of Shakespeare? If one really wants to be a stickler, the input class (which does the sharding) can be rewritten to take this into account, but doing so would involve either some redundancy or a knowledge of the internal workings of map() to be successful.
To make the final mimic index, crunch the data down for each prefix to summarize all the "follow" words that might come after that prefix. In the example text above the prefix "and" has 3 follow words: the word "everything" follows 2 times, and the word "spice" follows 1 time. That information can be summarized in a single key/value pair <and^3^everything^2spice^1>. The prefix is the key and the value contains the total count first followed by the words and counts separated with "^".
Each prefix is either a single word, like "He", or multiple words separated by spaces, like "He never". For example, here's an excerpt, from the ngram index built from Mark Twain's Huckleberry Finn:
The first block shows the counts for each word that followed the unigram "He". The second block shows the counts for each word that followed the bigram "He had". It can be seen that "never" followed "He" once, while "says" followed "He" 7 times, and that "suspicions" followed "He had" once and "a" followed "He had" twice.
To crunch all the follow words to a single summary string, the reducer first puts all the follow words in a hash map to count how many times each appears. Then the final summary string, e.g. "the::3^it's::2^until::4^any::1" is a built from iterating over the keys in the hash map.
public static class MimicReducer extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter) throws IOException {
// First build a mapping from words to occurence counts.
HashMap<String, Integer> counts = new HashMap<String, Integer>();
int totalCount = 0;
while (values.hasNext()) {
String word = ((Text) values.next()).toString();
Integer toPut;
if (counts.containsKey(word))
toPut = counts.get(word).intValue() + 1;
else
toPut = new Integer(1);
counts.put(word, toPut);
totalCount++;
}
// Now emit the built summary.
boolean first=true;
Set<String> keys = counts.keySet();
StringBuilder result = new StringBuilder();
result.append(totalCount);
for (String word : keys) {
if(!first)
result.append('^');
first=false;
result.append(totalCount);
result.append('::');
result.append(counts.get(word));
}
output.collect(key, new Text(result.toString()));
}
}
You can run the mimic indexer in the same manner as the offset indexer
./mimic.par ./ngram-huckfinn/part-00000
and
then play with the client, which lets you generate random paragraphs that read
similarly to the original body. Just hit enter to let it seed itself, or you can
give it a phrase to start with.**********************
* Ngram Mimic
Client *
**********************
Enter a seed word to generate a text
block.
Or hit enter and a random seed word will be chosen.
Or
'exit' to quit.
> He
He was all behind a string. I says to part
of them castles with these goings on me; en en one loafer sing the bed. Then I
was going so started acrost, so it rotten cabbages, and most out, and him give a
me couple of a little hut; bench by and the minute two mile to be goodness, too
many of Huckleberry -- answer me to to do keep still. I thought would come from
St. Jacques' plantation, forty more; because I see warn't nothing about else. He
was just as it quick as as you a blanket and the back iron bolt it." "It ain't
frauds, and Sunday-like, hot breakfast on accounts of me; but so you as can tell
me, and Bilge. Thish yer poor Miss Hooker she would think of a steamboat
landing; but ef I wish had come of them castles with these ole King
Sollermun,
> river
river had tin plates, ain't no more bad dis
trip?" "No," I says, might pump at all; by him de ben', en told you." "Well," he
would say, 'It's a good start; then Tom poured down slow and work bother and to
liked. There was ready to catch anything 'at 'uz killed off, den I'd ruther not,
and the graveyard. She wuz a church." I warn't no more trouble and to liked.
There -- leastways I've got to a body rest! Go 'long wuz somebody spying around
the water now, and I judged could go up a sound, and and all right off under it,
nor have to them give good-day, and about on that died in himself there that was
I pretty soon some of others, and there see warn't surprised. Said the same way,
and they put in no attention to stay on the women that clown in the
>
gold
gold watch at it all was they handsome and -- I " stopped. You
mark them fellows can't get into the texas-hall we was going to away the biggest
one summer. Such a sudden and death now, and says to itch; and search them up.
And I reckon was two mile back in her and eyes looked in; but doan' gimme no
further, but what he'd make sure. So then he got to a lot they'll get into this
trouble escape 'll go." Well, the king's was excited, and and she laughed at the
lonesomest sound in de year. And mind, Cairo I better and the other; and took
lined out of dirt on the tree. I won. took Boggs again, and a dry and spot. Then
we had to your friend; and says I, maybe frogs or twelve disciples. Well, they
heard the splitting of the raft on both of