• Posted by Intent Media 07 Aug
  • 0 Comments

Machine Learning with Spark MLlib on Elastic MapReduce

At Intent Media we use Amazon Elastic MapReduce with Spark for some of our data processing and large scale machine learning tasks. Here we share some details and an example of how to set up Spark 1.0.0 MLlib on EMR for machine learning.

There are materials available that explain how to set up Spark 0.8.1 for data processing on EMR. To demonstrate running a machine learning job with MLlib, we created a project that adds a couple of additional elements to those examples:

  1. A Scala main class for a simple EMR-based machine learning job. This class follows Spark’s built-in BinaryClassification example closely, with small changes that allow the user to specify the Spark executor memory (something we have found useful on larger datasets, defaults to 512m), and a modified SparkConf setup to work with EMR.
  2. Spark 1.0.0 support. Spark’s MLlib module has seen a number of improvements in recent releases, including sparse feature vectors, decision trees, naive bayes classification, and distributed linear algebra routines for SVD and PCA. The 1.0+ jars available on the Spark website unfortunately do not work out of the box on EMR but for previous releases AWS has provided EMR-compatible jars in the s3 bucket at s3://elasticmapreduce/samples/spark. As far as we’re aware these are not yet officially available for the 1.0+ releases of Spark so we rolled our own. For more information on this — for instance, to create jars for running Spark 1.0.1+ on EMR — see our detailed instructions.

Running an Example

This example follows closely a recent post from Snowplow Analytics. It assumes that you have the SBT, the Amazon EMR Ruby client, and s3cmd installed and configured with your AWS access credentials.

From the command line, run the following, assuming {JAR_BUCKET} is the S3 bucket to which you would like to push the jar.

git clone git://github.com/johnnywalleye/spark-example-project.git
cd spark-example-project
sbt assembly
s3cmd put target/scala-2.10/spark-example-project-0.2.0.jar
s3://{JAR_BUCKET}/spark-example-project-0.2.0.jar

Running

To invoke the job from the command line, run the following, assuming {OUT_BUCKET} is the S3 bucket to which you would like to send your output:

elastic-mapreduce --create --name "BinaryClassificationJob" --instance-type m3.xlarge --instance-count 3 --bootstrap-action s3://intentmedia-spark/install-spark-shark.sh --jar s3://elasticmapreduce/libs/script-runner/script-runner.jar --step-name "binary classification run" --arg s3://snowplow-hosted-assets/common/spark/run-spark-job-0.1.0.sh --arg s3://{JAR_BUCKET}/spark-example-project-0.2.0.jar --arg com.intentmedia.spark.BinaryClassificationJob --arg s3://intentmedia-spark/sample_binary_classification_data.txt/ --arg s3://{OUT_BUCKET}/results/ --arg --algorithm --arg LR --arg --regType --arg L2 --arg --regParam --arg 0.1 —-arg —-executorMemory —-arg 512m

You’re all set at this point and should be able to see the results of your job at s3://{OUT_BUCKET}/results/

Specifically, the predictions-and-labels output should look like the following, though may differ since the job randomly assigns training/test labels to 80% and 20% of the dataset respectively. Here we have a test set AUC of 1, great! Turns out this particular toy problem was not very difficult:

(0.0,0.0)
(0.0,0.0)
(1.0,1.0)
(0.0,0.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(1.0,1.0)

This example runs on a small file but the benefit of using Spark is that we can scale to much larger datasets, preprocess our data, and train a variety of statistical models all within the same framework. Spark’s performance on ML tasks is of course best if the entire dataset fits in memory, and AWS’ r3.8xlarge instances offer 244GB of memory for a few dollars per hour on demand or about $0.25 per hour spot, allowing the user to quickly scale to a cluster with a few TB of memory.

We use this setup at Intent Media to power some of the models that we use for serving ads. Given that MLlib is still changing quickly, we have modified it in certain cases, some simple (predicting class probabilities rather than class outcomes in classifiers) and some more complicated (adding a new splitting criterion to MLlib’s DecisionTree). Modeling with Spark on a cluster has been useful in allowing us to rapidly iterate over preprocessing and modeling techniques while easily scaling available CPU and memory.

Jon Sondag

Data Scientist

Post Comments 0