December 1, 2011
My last post was a brief description of the integration between Riak and Hadoop MapReduce. This, the follow up post, will be a bit more hands on and walk you through an example riak-hadoop MapReduce job. If you want to have a go, you need to follow these steps. I won’t cover all the details of installing everything, but I’ll point you at resources to help.
Getting Set Up
First you need both a Riak and Hadoop install. I went with a local devrel Riak cluster and a local Hadoop install in pseudo distributed mode. (A singleton node of both Hadoop and Riak installed from a package will do, if you’re pushed for time.)
The example project makes use of Riak’s Secondary Indexes (“2i”) so you will need to switch the backend on Riak to use LevelDB. (NOTE: This is not a requirement for riak-hadoop, but this demo uses 2i, which does require it). So, for each Riak node, change the storage backend from bitcask
to eleveldb
by editing the app.config
.
riak_kv, [
%% Storage_backend specifies the Erlang module defining the storage
%% mechanism that will be used on this node.
{storage_backend, riak_kv_eleveldb_backend},
%% …and the rest
]},
Now start up your Riak cluster, and start up Hadoop:
for d in dev/dev*; do $d/bin/riak start; done
cd $HADOOP_HOME
bin/start-dfs.sh
bin/start-mapred.sh
Next you need to pull the riak-hadoop-wordcount sample project from GitHub. Checkout the project and build it:
git clone https://github.com/russelldb/riak-hadoop-wordcount
cd riak-hadoop-wordcount
mvn clean install
A Bit About Dependencies…
Warning: Hadoop has some class loading issues. There is no class namespace isolation. The Riak Java Client depends on Jackson for JSON handling, and so does Hadoop, but different versions (naturally). When the Riak-Hadoop driver is finished it will come with a custom classloader. Until then, however, you’ll need to replace your Hadoop lib/jackson*.jar
libraries with the ones in the lib
folder of this repo on yourJobTracker
/ Namenode
only. On your tasknodes
, you need only remove the Jackson jars from your hadoop/lib
directory, since the classes in the job jar are at least loaded. (There is an open bug about this in Hadoop’s JIRA. It has been open for 18 months, so I doubt it will be fixed anytime soon. I’m very sorry about this. I will address it in the next version of the driver.)
Loading Data
The repo includes a copy of Mark Twain’s Adventures Of Huckleberry Finn from Project Gutenberg, and a class that chunks the book into chapters and loads the chapters into Riak.
To load the data run the Bootstrap
class. The easiest way is to have maven
do it for you:
mvn exec:java -Dexec.mainClass=”com.basho.riak.hadoop.Bootstrap”
-Dexec.classpathScope=runtime
This will load the data (with an index on the Author
field.) Have a look at the Chapter
class if you’d like to see how easy it is to store a model instance in Riak.
Bootstrap
assumes that you are running a local devrel
cluster. If your Riak install isn’t listening on the PB
interface on 127.0.0.1
on port 8081
then you can specify the transport and address like this:
mvn exec:java -Dexec.mainClass=”com.basho.riak.hadoop.Bootstrap”
-Dexec.classpathScope=runtime -Dexec.args=”[pb|http PB_HOST:PB_PORT|HTTP_URL]”
That should load one item per chapter into a bucket called wordcount
. You can check it succeeded by running (being sure to adjust the url based on your configuration):
curl http://127.0.0.1:8091/riak/wordcount?keys=stream
Package The Job Jar
If you’re running the devrel
cluster locally you can just package up the jar now with:
mvn clean package
Otherwise, first edit the RiakLocations
in the RiakWordCount
class to point at your Riak cluster/node, e.g.
conf = RiakConfig.addLocation(conf, new RiakPBLocation(“127.0.0.1”, 8081));
conf = RiakConfig.addLocation(conf, new RiakPBLocation(“127.0.0.1”, 8082));
Then simply package the jar as before.
Run The job
Now we’re finally ready to run the MapReduce job. Copy the jar from the previous step to your hadoop install directory and kick off the job.
cp target/riak-hadoop-wordcount-1.0-SNAPSHOT-job.jar $HADOOP_HOME
cd $HADOOP_HOME
bin/hadoop jar riak-hadoop-wordcount-1.0-SNAPSHOT-job.jar
And wait… Hadoop helpfully provides status messages as it distributes the code and orchestrates the MapReduce execution.
Inspect The Results
If all went well there will be a bucket in your Riak cluster named wordcount_out
. You can confirm it is populated by listing its keys:
curl http://127.0.0.1:8091/riak/wordcount_out?keys=stream
Since the WordCountResult
output class has RiakIndex
annotations for both the count
and word
fields, you can perform 2i queries on your data. For example, this should give you an idea of the most common words in Huckleberry Finn:
curl 127.0.0.1:8091/buckets/wordcount_out/index/count_int/1000/3000
Or, if you wanted to know which "f"
words Twain was partial too, run the following:
curl 127.0.0.1:8091/buckets/wordcount_out/index/word_bin/f/g
Summary
We just performed a full roundtrip MapReduce, starting with data stored in Riak, feeding it to Hadoop for the actual MapReduce processing, and then storing the results back into Riak. It was a trivial task with a small amount of data, but it illustrates the principle and the potential. Have a look at the RiakWordCount
class. You can see that only a few lines of configuration and code are needed to perform a Hadoop MapReduce job with Riak data. Hopefully the riak-hadoop-wordcount repo can act as a template for some further exploration. If you have any trouble running this example, please let me know by raising a GitHub issue against the project or by jumping on the Riak Mailing List to ask some questions.
Enjoy.