• Posted by Intent Media 26 Nov
  • 1 Comments

Vowpal Wabbit on Elastic MapReduce

Overview

Vowpal Wabbit is a fast machine learning package from John Langford’s group at Microsoft and Yahoo. It can be run in parallel on a cluster, allowing for implementation of e.g. the algorithms outlined in Zinkevich et al.

  • An overview of using VW on a cluster (not EMR-specific) is here
  • A good tutorial introduction to VW in general is here

The following are some notes about setting up VW to run on an EMR cluster.

Install AWS Ruby Command Line Tools

The first step is to download and install the EMR ruby client, setting it up so that one can launch and monitor jobs from the command line.

Create Bootstrap Script

The next step is to create a bootstrap script that each of the machines in the EMR cluster will run upon launch. This script needs to download the VW source, install VW and any tools necessary to build it, and put the libraries in a place where the EMR EC2 instances will see them. By default, the Amazon Machine Image that is launched for EMR can view libraries in the /usr/local/cuda/lib path. So we compile the vw libraries and copy them there, though we are not using cuda.

VW includes an example mapper script for distributed runs but it may be out of date or we may want to modify it. So in addition to the VW repo, we also clone a personal repo called vw_scripts that contains custom mappers that we have created.

The bootstrap script is stored to s3 so that it can be referenced from the EC2 instances that make up our cluster. The script is as follows:

sudo apt-get update
sudo apt-get -y install libboost-all-dev
sudo apt-get -y install libtool
sudo apt-get -y install automake

cd ~
sudo git clone https://github.com/johnnywalleye/vw_scripts
sudo git clone https://github.com/JohnLangford/vowpal_wabbit
cd vowpal_wabbit
sudo ./autogen.sh
sudo ./configure
sudo make
sudo make install
sudo mkdir /usr/local/cuda
sudo mkdir /usr/local/cuda/lib
sudo cp /usr/local/lib/* /usr/local/cuda/lib

exit 0

Call it, say, build_vw and save it in an s3 bucket.

Launch EMR Cluster

The next step is to launch an EMR cluster. Assuming the AWS ruby command line script is in your path, run something like the following (of course, –num-instances and –instance-type are up to you):

elastic-mapreduce --create --alive --num-instances 100 --instance-type m1.xlarge --bootstrap-action s3://[your s3 bucket]/build_vw

We use the –alive option so that we can keep the cluster running and submit new jobs as we wish.

Start Spanning Tree

First, ssh into the master node of the cluster using the Master Public DNS Name. (The name can found in the AWS console or by using the command line tools.)

ssh -i [your key folder]/key.pem hadoop@ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com

Start the spanning tree, and exit the master node:

~/vowpal_wabbit/cluster/spanning_tree
exit

Ideally, the spanning_tree could be started as part of the bootstrap process, by running something like the following rather than sshing into the master node. Unfortunately, the bootstrap process hangs when these lines are added, so we leave them out of the bootstrap script for now:

grep -Fq '"isMaster": true' /mnt/var/lib/info/instance.json
if [ $? -eq 0 ];
then
	~/vowpal_wabbit/cluster/spanning_tree &
	disown %
fi

Submit EMR JobFlows

Then, using the command line tools, submit the job to the cluster (mapred.min.split.size is the minimum MapReduce split size in Bytes, which is sometimes useful in controlling the number of mappers):

elastic-mapreduce -j j-[XXXXXXXXXXXX] --stream --jobconf mapred.min.split.size=1073741824 --input s3://[your s3 bucket]/training_data --mapper /home/hadoop/vw_scripts/runvw.sh --output s3://[your s3 bucket]/model_output --reducer NONE

The runvw.sh script can be any of the mapper scripts from the vw_scripts repo.

Note: the runvw.sh script references a number of variables, which will be visible to it via the Hadoop streaming job:

  • $mapred_map_tasks: The total number of mapper tasks in this MapReduce job, e.g. 100
  • $mapper: The number corresponding to this map task, e.g. 000015
  • $mapred_job_id: The ID of the MapReduce job, e.g. 2013102818240004
  • $mapreduce_job_submithost: Resolves to the private dns of the master node within the cluster, e.g. ip-xxx-xxx-xxx-xxx.ec2.internal

VW’s spanning_tree aggregates the results when the allreduce function is called. It blocks while listening for all results to come in from the mappers before it begins processing them.

Importantly, all of the map tasks also block after sending a message out to the spanning_tree. The result is that the job can become deadlocked if the number of map tasks submitted in the cluster are too many to run at a single time.

To workaround this issue one can indirectly control the number of map tasks by setting mapred.min.split.size or by storing the input data in an un-splittable compression format (e.g. gzip), divided into $n$ files, where $n$ is the desired number of map tasks. In practice, setting $n$ equal to the number of tasktrackers in the cluster may work well. The vw cluster overview mentioned above has some brief notes on this.

Jon Sondag
Data Scientist

Post Comments 1

Posted by Pramod Kumar on
  • Jun 23 2015
 
What AMI version did you use?