Building a Recommendation Engine with Spark ML on Amazon EMR using Zeppelin

Guy Ernest is a Solutions Architect with AWS

Many developers want to implement the famous Amazon model that was used to power the "People who bought this also bought these items" feature on This model is based on a method called Collaborative Filtering. It takes items such as movies, books,  and that were rated highly by a set of users and recommending them to other users who also gave them high ratings. This method works well in domains where explicit ratings or implicit user actions can be gathered and analyzed.

There have been many advancements in these collaborative filtering algorithms. In 2006, Netflix introduced the Netflix Prize, shared a huge data set (more than 2GB of compressed data about millions of users and thousands of movies), and offered 1 million dollars to whoever could design an algorithm that could best improve their existing recommendation engine. The winning team was also asked to publish the algorithm and share it with the community. For several years, many teams across the world competed for the prestigious prize and many algorithms were developed and published. You can read the official paper from the winning team or the summary of it.

In the previous blog posts about Amazon ML, we built various ML models, such as numeric regression, binary classification, and multi-class classification. Such models can be used for features like recommendation engines. In this post, we are not going to implement the complete set of algorithms that were used in the Amazon solution. Instead, we show you how to use a simpler algorithm that is included out-of-the-box in Spark MLlib for collaborative filtering, called Alternating Least Squares (ALS).

Why Spark?

Apache Spark is an open source project that has gained attention from analytics experts. In contrast to Hadoop MapReduce’s two-stage, disk-based paradigm, Spark's multi-stage in-memory primitives provides performance up to 100 times faster for certain applications. Since it is possible to load data into a cluster's memory and query it repeatedly, Spark is commonly used for iterative machine learning algorithms at scale. Furthermore, Spark includes a library with common machine learning algorithms, MLlib, which can be easily leveraged in a Spark application. For an example, see the "Large-Scale Machine Learning with Spark on Amazon EMR" post on the AWS Big Data Blog.

Spark succeeds where traditional MapReduce approach fails, making it easy to develop and execute iterative algorithms. Many ML algorithms are based on iterative optimization, which makes Spark a great platform for implementing them.

Other open-source alternatives for building ML models are either relatively slow, such as Mahout using Hadoop MapReduce, or limited in their scale, such as Weka or R. Commercial managed services alternatives, which are taking most of the complexity out of the process, are also available, such as Dato or Amazon Machine Learning, a service that makes it easy for developers of all skill levels to use machine learning technology. In this post, we explore Spark’s MLlib and show why it is a popular tool for data scientists using AWS who are looking for a DIY solution.

Why Apache Zeppelin?

Data scientists and analysts love to interact with their data, share their findings, and collaborate with others. Visualization tools are a popular way to build and share business intelligence insights, and notebooks can run in an interactive mode and allow others to see the steps which arrive at the final analysis. 

There are a few popular notebooks that can be used with Spark, mainly Apache Zeppelin and IPython. IPython is focused on Python, which is supported in Spark. In addition to Python, Zeppelin supports a few more languages (most importantly, Scala), and also integrates well with the Spark framework (Spark SQL, MLLib, and HiveQL using the HiveConext). Another alternative to explore is Jupyter, which extended IPython for multiple programming languages.

You can also easily add Zeppelin to your Amazon EMR cluster when you launch it by selecting Zeppelin-Sandbox from the Applications to install list.

Why Amazon EMR?

Amazon EMR makes it easy to launch a Hadoop cluster and install many of the Hadoop ecosystem applications, such as Pig, Hive, HBase, and Spark. EMR makes it easy for data scientists to create clusters quickly and process vast amounts of data in a few clicks. EMR also integrates with other AWS big data services such as Amazon S3 for low-cost durable storage, Amazon Kinesis for streaming data, and Amazon DynamoDB as a noSQL datastore.

EMR allows you the flexibility of choosing optimal instance types to fit different applications. For example, Spark caches data in memory for faster processing, so it is best to use instances with more memory (such as the R3 instance family). Also, EMR’s ability to use Amazon EC2 Spot capacity can dramatically reduce the cost of training and retraining ML models. Most of the time, the Spot market price for larger instances such as the r3.4xl is around 10%-20% of the On-Demand price.

The combination of EMR to create a Hadoop cluster and install Spark and Zeppelin on it, Spark to provide a rich language of data manipulation, Zeppelin to provide a notebook interface with data visualization, and SparkML to provide implementation of some popular ML algorithms is a powerful tool in the hands of data scientists that have a lot of data to process and are building predictive models for their business.

Launch Spark on EMR

To launch an EMR cluster with Spark and Zeppelin, use the AWS Management Console.

  1. On the EMR console, choose Create cluster.
  2. Choose Go to advanced options.
  3. In the Software configuration section, in the Application to be installed table, add both Spark and Zeppelin-Sandbox. You can remove Pig and Hue, as you don’t need them on this cluster and it makes your cluster start faster.

  1. When I showed this example to a team of AWS support engineers, they immediately started to check the utilization of the Spark cluster and reviewed the cluster logs and metrics. Based on their experience they advised me to configure Spark to use dynamic allocation of executors. When I used their advice, I got more than 5 times improvement in building the recommender model. To do that add the following JSON to the software settings:
    [{"classification":"spark-defaults", "properties":{"spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.dynamicAllocation.enabled":"true"}, "configurations":[]}]

  1. In the Hardware configuration section, change the instance type to r3.xlarge to provide additional memory to your cluster over the general purpose m3.xlarge instance types. In future clusters, you can modify the cluster to use larger instances (r3.4xlarge, for example), but for the amount of data you are going to use in this example, the smaller instance type is sufficient.
  2. Keep the default number of instances as 3. A quick look at the EMR pricing page shows that running this cluster has an hourly cost of less than $1.50/hr (EC2 + EMR price).


  1. Give the cluster a name, such "SparkML". Keep the other default values of the wizard.
  2. In the Security and Access section, choose an EC2 key pair that is already available in the region in which you are running the cluster at (see "oregon-key", in the example above), and for which you have access to its PEM file. If you don't have any key in this region, follow the help link to the right of the EC2 key pair field.

  1. After you select the EC2 key pair, complete the wizard and choose Create cluster.

Connect to your EMR cluster

After a few minutes, you can connect to the cluster.

  1. In the cluster list, select the cluster to view the details.
  2. Choose SSH beside the DNS address in the Master Public DNS field.

You may need to correct the location of the PEM file, and verify that you changed the permissions on the file:

chmod 400 ~/<YOUR-KEY-FILE>.pem
  1. Connect to the cluster:
ssh -i ~/<YOUR-KEY-FILE>.pem" hadoop@ec2-<MASTER-PUBLIC-IP>.<REGION>

On the SSH page are instructions for connecting to the cluster from a Windows machine. For detailed instructions for connecting to the cluster, see Connect to the Master Node Using SSH.

(Optional) Connect using Mosh

SSH is a secure way to connect to an EC2 instance of an EMR cluster, but it is also sensitive to connectivity issues. Often you may need to reconnect. To solve this issue, you can also install and use a tool called Mosh (Mobile Shell).

After you have connected to the cluster with the standard SSH connection, install Mosh on the server:

sudo yum install mosh

Proceed with the installation process on the master node. Mosh uses UDP connection to keep the stability of the secure connection; therefore, you need to open the security group to these ports.

  1. In the cluster list, select the cluster name to view the details.
  2. On the cluster details page, select the security group of the master node.

 On the Inbound tab, choose Edit and Add rule to allow UDP in ports 60000 - 60010, and choose Save.

You can restrict the access to your IP only, and only to the specific UDP ports that you set in your connection command. For the sake of simplicity for this public example, these settings should suffice.

Now install the Mosh client on your side of the connection from the Mosh website, and connect to your Spark cluster:

mosh --ssh="/usr/bin/ssh -i ~/<YOUR KEY FILE>.pem" hadoop@ec2-<MASTER-PUBLIC-IP>.<REGION> --server="/usr/bin/mosh-server"	

Connect to the Zeppelin notebook

There are several ways to connect to the UI on the master node. One method is to use a proxy extension to the browser, as it is explained in the pop up from the management console above or in the EMR documentation.

In this example, you can use the SSH tunnel between a local port (8157, for example) and the port that Zeppelin is listening to (default for 8890), as follows:

ssh -i <YOUR-KEY-FILE.pem> -N -L 8157:ec2-<MASTER-PUBLIC-IP>.<REGION> hadoop@ec2-<MASTER-PUBLIC-IP>.<REGION>

Or you can use the Mosh (Mobile Shell) connection, which I prefer:

mosh --ssh="/usr/bin/ssh -i <YOUR-KEY-FILE>.pem -N -L 8157:ec2-<MASTER-PUBLIC-IP>.<REGION>" --server="/usr/bin/mosh-server"

Now, open a browser locally on your machine and point it to http://localhost:8157/, and you should see the home page of Zeppelin on your Spark cluster.

The green light on the top right side of the page indicates that the notebook is already connected to the Spark cluster. You can now open the preinstalled Tutorial notebook from the Notebook menu and choose Run all the notes. The tutorial demonstrates the interactivity of the notebook and the nice visualization and integration with SparkSQL.

Build the Recommender with SparkML

Now it is time to build the recommender model. Use the Spark MLlib tutorial for the dataset and the steps to build the model. The tutorial is based on an interesting and large dataset that was released by grouplens from MovieLens website. There are a few datasets for 100K, 1M, 10M, and 20M ratings. The datasets can be downloaded from the Spark MLLib training site (for example, wget or from the grouplens site (for example, wget

After you have downloaded the dataset, upload them to S3 in the same region that you have your Spark cluster running.

Note: The code example is part of Databricks Spark training material ( and AMPCamp (

First, define the import libraries to be used in building the ML model:


import org.apache.log4j.Logger
import org.apache.log4j.Level

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

Choose Run this paragraph and you should see the echo of your imports.

Next, load the dataset into the Spark cluster. You need to have two types of data:  movie information (movies.dat file) and ratings information (ratings.dat folder). Define the location of the files, and load them with the definition of the delimiter (::) and the format of each ((movieId, movieName) and (timestamp % 10, Rating(userId, movieId, rating)), respectively.

val movieLensHomeDir = "s3://emr.examples/movieLens/"

val movies = sc.textFile(movieLensHomeDir + "movies.dat").map { line =>
  val fields = line.split("::")
  // format: (movieId, movieName)
  (fields(0).toInt, fields(1))

val ratings = sc.textFile(movieLensHomeDir + "ratings.dat").map { line =>
  val fields = line.split("::")
  // format: (timestamp % 10, Rating(userId, movieId, rating))
  (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

To verify that the files were loaded correctly, you can count the number of ratings using the following:

val numRatings = ratings.count
val numUsers =
val numMovies =

println("Got " + numRatings + " ratings from "
  + numUsers + " users on " + numMovies + " movies.")

Before building the ML model, it is important to split that dataset into a few parts, one for training (60%), one for validation (20%), and one for testing (20%), as follows:

val training = ratings.filter(x => x._1 < 6)
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
val test = ratings.filter(x => x._1 >= 8).values.cache()

val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()

println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)

You should get an output similar to the following:

training: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[2615] at repartition at :63
validation: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[2621] at repartition at :63
test: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[2623] at values at :59
numTraining: Long = 3810355
numValidation: Long = 1269312
numTest: Long = 1268409
Training: 3810355, validation: 1269312, test: 1268409

Next, define the function to be used to evaluate the performance of the model. The common function used is Root Mean Squared Error (RMSE), and this is a scala version of it:

/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
    val predictions: RDD[Rating] = model.predict( => (x.user, x.product)))
    val predictionsAndRatings = => ((x.user, x.product), x.rating))
    .join( => ((x.user, x.product), x.rating))).values
    math.sqrt( => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)

Now you can use the error function to choose the best parameters for the training algorithm. The ALS algorithm requires three parameters: matrix factors rank, number of iteration, and lambda. You can select different values for these parameters and measure the RMSE for each combination to select the best combination:

val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
  val model = ALS.train(training, rank, numIter, lambda)
  val validationRmse = computeRmse(model, validation, numValidation)
  println("RMSE (validation) = " + validationRmse + " for the model trained with rank = " 
    + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
  if (validationRmse < bestValidationRmse) {
    bestModel = Some(model)
    bestValidationRmse = validationRmse
    bestRank = rank
    bestLambda = lambda
    bestNumIter = numIter

This stage can take a few minutes to complete:

The best combination seems to be the larger rank (12) and the smaller lambda (0.1). Now, run it on the test data:

// evaluate the best model on the test set
val testRmse = computeRmse(bestModel.get, test, numTest)

println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
  + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is
  " + testRmse + ".")

The output should be similar to the following:

testRmse: Double = 0.8157748896668603
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test set is 0.8157748896668603.

How well does this model compare to a more naive prediction, based on the average rating?

// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse = 
  math.sqrt( => (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

You should see the following output:

meanRating: Double = 3.507898942981895
baselineRmse: Double = 1.059715654158681
improvement: Double = 23.038148392339163
The best model improves the baseline by 23.04%.

Use the model to make personal recommendations

After you have built the model, you can start using it to make recommendations for various users. To get the top 10 movie recommendations for one of your users (e.g. user ID=100), run the following:

val candidates = sc.parallelize(movies.keys.toSeq)
val recommendations = bestModel.get
  .predict(, _)))
  .sortBy(- _.rating)

var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
  println("%2d".format(i) + ": " + movies(r.product))
  i += 1

To get the recommendations for "Comedy" movies, you can limit the candidates only to this genre. First, load the genre information that is part of the movie.dat file as the third column:

val moviesWithGenres = sc.textFile(movieLensHomeDir + "movies.dat").map { line =>
  val fields = line.split("::")
  // format: (movieId, movieName, genre information)
  (fields(0).toInt, fields(2))

Next filter the movies to include only the ones with "Comedy":

val comedyMovies = moviesWithGenres.filter(_._2.matches(".*Comedy.*")).keys
val candidates = sc.parallelize(comedyMovies.toSeq)
val recommendations = bestModel.get
  .predict(, _)))
  .sortBy(- _.rating)

var i = 1
println("Comedy Movies recommended for you:")
recommendations.foreach { r =>
  println("%2d".format(i) + ": " + movies(r.product))
  i += 1

You can save the model to S3, to allow you to load later again and to reuse it with your application.

// Save and load model, "s3://emr.examples/movieLens/model/recommendation")
val sameModel = MatrixFactorizationModel.load(sc,  "s3://emr.examples/movieLens/model/recommendation")

After you are done with the Spark cluster, terminate your EMR cluster to stop paying for the resources.


In this example, you launched a Spark cluster with a Zeppelin notebook using Amazon EMR. You connected to the notebook from your local browser and built a recommendation engine using Spark MLlib and movie rating information from MovieLens. Finally, you used a machine learning model and made personal recommendations for some of the users in the dataset.

Using the concepts in this example, you can use your own data to build a recommender, evaluate its quality and performance, and apply the recommender to your use case. With a collaborative Zeppelin notebook, you can quickly visualize your data, explore the algorithm, and share the results with others in your organization.

If you have questions or suggestions, please leave a comment below.



Large-Scale Machine Learning with Spark on Amazon EMR