- Posted by Intent Media 26 Nov
- 1 Comments
Vowpal Wabbit is a fast machine learning package from John Langford’s group at Microsoft and Yahoo. It can be run in parallel on a cluster, allowing for implementation of e.g. the algorithms outlined in Zinkevich et al.
- An overview of using VW on a cluster (not EMR-specific) is here
- A good tutorial introduction to VW in general is here
The following are some notes about setting up VW to run on an EMR cluster.
Install AWS Ruby Command Line Tools
The first step is to download and install the EMR ruby client, setting it up so that one can launch and monitor jobs from the command line.
Create Bootstrap Script
The next step is to create a bootstrap script that each of the machines in the EMR cluster will run upon launch. This script needs to download the VW source, install VW and any tools necessary to build it, and put the libraries in a place where the EMR EC2 instances will see them. By default, the Amazon Machine Image that is launched for EMR can view libraries in the /usr/local/cuda/lib path. So we compile the vw libraries and copy them there, though we are not using cuda.
VW includes an example mapper script for distributed runs but it may be out of date or we may want to modify it. So in addition to the VW repo, we also clone a personal repo called vw_scripts that contains custom mappers that we have created.
The bootstrap script is stored to s3 so that it can be referenced from the EC2 instances that make up our cluster. The script is as follows:
sudo apt-get update sudo apt-get -y install libboost-all-dev sudo apt-get -y install libtool sudo apt-get -y install automake cd ~ sudo git clone https://github.com/johnnywalleye/vw_scripts sudo git clone https://github.com/JohnLangford/vowpal_wabbit cd vowpal_wabbit sudo ./autogen.sh sudo ./configure sudo make sudo make install sudo mkdir /usr/local/cuda sudo mkdir /usr/local/cuda/lib sudo cp /usr/local/lib/* /usr/local/cuda/lib exit 0
Call it, say, build_vw and save it in an s3 bucket.
Launch EMR Cluster
The next step is to launch an EMR cluster. Assuming the AWS ruby command line script is in your path, run something like the following (of course, –num-instances and –instance-type are up to you):
elastic-mapreduce --create --alive --num-instances 100 --instance-type m1.xlarge --bootstrap-action s3://[your s3 bucket]/build_vw
We use the –alive option so that we can keep the cluster running and submit new jobs as we wish.
Start Spanning Tree
First, ssh into the master node of the cluster using the Master Public DNS Name. (The name can found in the AWS console or by using the command line tools.)
ssh -i [your key folder]/key.pem email@example.com
Start the spanning tree, and exit the master node:
Ideally, the spanning_tree could be started as part of the bootstrap process, by running something like the following rather than sshing into the master node. Unfortunately, the bootstrap process hangs when these lines are added, so we leave them out of the bootstrap script for now:
grep -Fq '"isMaster": true' /mnt/var/lib/info/instance.json if [ $? -eq 0 ]; then ~/vowpal_wabbit/cluster/spanning_tree & disown % fi
Submit EMR JobFlows
Then, using the command line tools, submit the job to the cluster (mapred.min.split.size is the minimum MapReduce split size in Bytes, which is sometimes useful in controlling the number of mappers):
elastic-mapreduce -j j-[XXXXXXXXXXXX] --stream --jobconf mapred.min.split.size=1073741824 --input s3://[your s3 bucket]/training_data --mapper /home/hadoop/vw_scripts/runvw.sh --output s3://[your s3 bucket]/model_output --reducer NONE
The runvw.sh script can be any of the mapper scripts from the vw_scripts repo.
Note: the runvw.sh script references a number of variables, which will be visible to it via the Hadoop streaming job:
- $mapred_map_tasks: The total number of mapper tasks in this MapReduce job, e.g. 100
- $mapper: The number corresponding to this map task, e.g. 000015
- $mapred_job_id: The ID of the MapReduce job, e.g. 2013102818240004
- $mapreduce_job_submithost: Resolves to the private dns of the master node within the cluster, e.g. ip-xxx-xxx-xxx-xxx.ec2.internal
VW’s spanning_tree aggregates the results when the allreduce function is called. It blocks while listening for all results to come in from the mappers before it begins processing them.
Importantly, all of the map tasks also block after sending a message out to the spanning_tree. The result is that the job can become deadlocked if the number of map tasks submitted in the cluster are too many to run at a single time.
To workaround this issue one can indirectly control the number of map tasks by setting mapred.min.split.size or by storing the input data in an un-splittable compression format (e.g. gzip), divided into $n$ files, where $n$ is the desired number of map tasks. In practice, setting $n$ equal to the number of tasktrackers in the cluster may work well. The vw cluster overview mentioned above has some brief notes on this.