transform Nutch into a scalable, distributed search engine
from MapReduce
Doug Cutting has done it again. The creator of Lucene and Nutch has implemented (with Mike Cafarella and others) a distributed platform for high volume data processing called MapReduce.
MapReduce is the brainchild of Google and is very well documented by Jeffrey Dean and Sanjay Ghemawat in their paper MapReduce: Simplified Data Processing on Large Clusters. In essence, it allows massive data sets to be processed in a distributed fashion by breaking the processing into many small computations of two types: a map operation that transforms the input into an intermediate representation, and a reduce function that recombines the intermediate representation into the final output. This processing model is ideal for the operations a search engine indexer like Nutch or Google needs to perform - like computing inlinks for URLs, or building inverted indexes - and it will transform Nutch into a scalable, distributed search engine.
Nutch MapReduce takes advantage of the Nutch Distributed File System (NDFS) - itself inspired by another Google Labs project, the Google File System. NDFS provides a fault-tolerant environment for working with very large files using cheap commodity hardware.
Currently MapReduce is a part of Nutch, but it has been proposed that it and NDFS be moved into a separate project. However, it is perfectly possible to use the MapReduce functionality in Nutch for your own data processing. In this blog, I'll briefly describe how to get started.
Building MapReduce in Nutch
The MapReduce implementation in Nutch is still in active development, so it is best to get the latest sources from subversion and build them yourself. It's very easy. At the moment the latest MapReduce work isn't in trunk (though it may be soon). I checked out the mapred branch from subversion (the URL is http://svn.apache.org/repos/asf/lucene/nutch/). Then I typed
ant tar
in the base directory, and a few minutes later out popped nutch-0.8-dev.tar.gz, which I unpacked on my system.
A MapReduce example
The demo program that comes with Nutch MapReduce is a distributed grep. Actually, it's a bit more than grep, it behaves more like the following UNIX command line,
grep -Eh <regex> <inDir>/* | sort | uniq -c | sort -nr
which counts the number of lines in all files in <inDir> that match <regex> and displays the counts in descending order.
An example of where this kind of query might be useful is analysing web server access logs to find the top requested pages that match a given pattern. But imagine trying to grep gigabytes of data - the standard UNIX tools don't scale, so a way of breaking the work into pieces and parallelizing it across a number of machines is the only way to get an answer in a reasonable time.
Let's look at a trivial example to understand MapReduce and the grep demo. There are two files in a directory called in, called a and b, with the following contents:
C B B C
and
C A
respectively.
Running the UNIX command line
grep -Eh 'A|C' in/* | sort | uniq -c | sort -nr
gives
3 C 1 A
Similarly, the MapReduce version
bin/nutch org.apache.nutch.mapred.demo.Grep in out 'A|C'
creates a file called part-00000 in the output directory out containing the same result:
3 C 1 A
The MapReduce algorithm breaks the operation into a user-defined map operation and a user-defined reduce operation. The map operation takes as input a key-value pair and outputs a list of intermediate key-value pairs. For grep the input is (file offset, line) (although we don't use the file offset), and the output is either an empty list [] if the line does not match the regex, or a single-element list of a key-value pair [(line, 1)] if it matches:
(0, C) -> [(C, 1)] (2, B) -> [] (4, B) -> [] (6, C) -> [(C, 1)] (0, C) -> [(C, 1)] (2, A) -> [(A, 1)]
The MapReduce implementation then gathers up all the intermediate key-value pairs with the same key for the reduce operation to turn into a list of values. For grep the input is (line, [1, 1, ...]) and the output is (line, n) where n is the number of 1s in the list.
(A, [1]) -> (A, 1) (C, [1, 1, 1]) -> (C, 3)
The sorting part of the program is actually implemented as another MapReduce job, using the output of the first as input. Details are in the source code, and also in sections 4.1 and 4.2 of the Google paper.
Distributed Nutch MapReduce
So far Nutch MapReduce has been running in its default, single JVM mode. To scale up to do serious data processing, MapReduce needs to be run on multiple nodes. In a nutshell, a single JobTracker runs on a master node and hands out map tasks and reduce tasks to TaskTrackers running on slave nodes. These processes are launched by running a script on the master node that starts the JobTracker, then starts the TaskTracker slave processes over SSH. Note that dynamic classloading (using RMI, or Jini's JERI, for example) is not used to distribute the job's map and reduce classes to each slave - they must be installed beforehand on each slave node. (This is not a big deal for the grep example - or indeed the MapReduce infrastructure for Nutch in general - since the classes are a part of the Nutch MapReduce package.)
It is still early days for Nutch MapReduce, and there is very little documentation yet. However, using a combination of Doug Cutting's OSCON 2005 presentation, his cheat sheet (found on the excellent Nutch mailing lists), and digging around in the source code, I managed to get MapReduce running in a distributed set up. One neat feature is the way you can track a job's progress by connecting to the embedded HTTP server that JobTracker runs.
Nutch MapReduce may not be finished, but most of the major pieces seem to be in place, so it is only a matter of time before this exciting and powerful tool sees wider adoption.