AWS Big Data Blog

Crunching Statistics at Scale with SparkR on Amazon EMR

Christopher Crosbie is a Healthcare and Life Science Solutions Architect with Amazon Web Services.

This post is co-authored by Gopal Wunnava, a Senior Consultant with AWS Professional Services.

SparkR is an R package that allows you to integrate complex statistical analysis with large datasets. In this blog post, we introduce you running R with the Apache SparkR project on Amazon EMR. The diagram of SparkR below is provided as a reference, but this video provides an overview of what is depicted.

Choosing Amazon EMR as your platform automates much of the work associated with setting up and configuring a Spark cluster. It integrates with many AWS services, allowing you to lower cost with Spot instances, easily resize your cluster, and leverage security controls using IAM roles and the AWS Key Management Service.  We will mention several of the features that are useful for SparkR throughout this post, but readers less familiar with EMR’s unique feature set may also want to review the features available in Amazon EMR.

The architecture below shows how you can have multiple types of clusters for different purposes all pointing to the same source of truth residing in S3.

This type of architecture is a common strategy that many AWS customers chose is to run multiple clusters geared towards different workloads. This can provide an advantage over a single multi-purpose but possibly sub-optimal cluster. Using Amazon S3 as an extension of HDFS will allow you to scale multiple clusters independent of each other, You can have one cluster optimized for machine learning algorithms and another optimized for nightly ETL but both can point to the same S3 data source. You do not need to pay to store data multiple times or even to have idle compute servers up and running just to store data.

There’s another advantage of using S3 with SparkR: you can take advantage of storage tiers such as Amazon S3 Reduced Redundancy Storage, or set up lifecycle policies for automatic archiving.

Additional configuration optimization for SparkR and EMR

The table below shows which instance type you may want to consider for a SparkR workload. However, finding the optimal performance of your SparkR cluster requires a combination of the right instance type and careful selection of configuration parameters. It is important to right size your Spark environment.

While you can choose the number of executors to use in your Spark architecture explicitly, you can also just specify dynamic allocation in situations where you are not certain. This allows YARN to choose the number of executors required for your application dynamically. Starting with EMR 4.4, dynamic allocation is the default setting and can provide much better performance.

Installing and configuring RStudio for SparkR on EMR

Some R fans may find the SparkR console a familiar home, but many others gravitate towards the powerful IDE that RStudio provides.  A common approach is to install RStudio server, which runs a web interface, on the master node. You can do this with an automated bootstrap action or by logging into the master node and executing the commands found on the Download RStudio Server page.

The commands will download, install and start an RStudio server. In addition to the server installation, you need to create a user. RStudio logins are based on the Amazon Linux user accounts that you create.

To properly interact with SparkR from within R studio, the account needs access to all the RStudio files as well as specific SparkR files. During early test and experimental stages, you can simply give your analyst account permission to everything on the cluster by finding the file /etc/sudoers and adding the following line:

analyst ALL = NOPASSWD: ALL  

IMPORTANT:  This approach is an insecure method that should only be used on throwaway or test clusters that do not contain sensitive data.

Connecting to RStudio on master node is done over SOCKS proxy.  By default, RStudio resides on port 8787 so connecting to RStudio simply involves a proxy connection to the following URL:

http://YOUR_MASTER_NODE_DNS_NAME:8787

After you connect to the web interface and log in as your analyst user, you can start to run your R code and interact with RStudio as you normally would. However, you need a way to connect the R code you write within RStudio to an initialized Spark context. When you use the SparkR shell from your master node, this connection is made automatically when the shell starts.  In RStudio, you need to declare your own connections by running the following code snippet:

#Set the path for the R libraries you would like to use. 
#You may need to modify this if you have custom R libraries. 
.libPaths(c(.libPaths(), '/usr/lib/spark/R/lib'))  

#Set the SPARK_HOME environment variable to the location on EMR
Sys.setenv(SPARK_HOME = '/usr/lib/spark') 

#Load the SparkR library into R
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

#Initiate a Spark context and identify where the master node is located.
#local is used here because the RStudio server 
#was installed on the master node

sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))

sqlContext <- sparkRSQL.init(sc) 

There are many other sparkEnvir configuration properties that can be passed to the SparR.init() function than those shown here. For more information about those options, see Starting Up from RStudio in the latest Spark documentation.

After this code has been run, you can now run SparkR on EMR from your familiar RStudio IDE.

Because RStudio runs within your client web browser, you may find that the default level of messages returned to the console overwhelms the memory capabilities of your browser and you may experience slowdowns in the user interface.  To improve this experience, turn off the informational messages within the Log4j properties.

SparkR DataFrames for basic aggregation and machine learning

The SparkR DataFrame API is a data structure that is similar to R’s native dataframe. The key difference is that the underlying data is stored in a distributed environment.

Let’s take a quick peek into what large scale aggregation and filtering actually looks like in SparkR. First, let’s see what it takes to move data from Amazon S3 into a SparkR dataframe.

S3url <- “s3://my-s3-url/*”
lines <- SparkR:::textFile(sc, s3url)
#convert to SparkR dataframe.
lines_DF <- (SparkR:::toDF(lines))

Next, you can get a feel for what the data is by taking a look at the first few rows of

head(lines_DF)

With a SparkR DataFrame, you can call aggregates and filters as you would with a native R dataframe, except now on massive sets of data.

#very basic aggregation. 
count(lines_DF)

You can also register the SparkR dataframe as a table and execute SQL commands against it:

#register this DataFrame as a table so we can explore with SQL 
registerTempTable(lines,"tbl_lines")
my_data <- sql(sqlContext, “select * from tbl_lines where _1 like text%‘)

If you want to persist the table after the Spark programs restarts, you can create a “managed table” by using the saveAsTable command and save it to the Hive metastore. See Saving to Persistent Tables in the Spark programming guide for more on how this can be done.

There is interoperability between native R and SparkR, so you can use SparkR DataFrames to perform filter and aggregate operations across huge datasets and then collect the results you would like to analyze into a smaller dataset that can fit on one machine.

#Assume custsdf is a SparkR dataframe created from a large dataset 
#You need to filter specific customer codes and retrieve all columns from source

localdf <- collect(select(filter(custsdf,custsdf$code > 001 & custsdf$code < 010),"*"))

On the machine learning side, SparkR allows the fitting of generalized linear models over DataFrames using the same glm() syntax that most R users are familiar with. This functionality can be very useful for understanding lots of data quickly.

Scaling native R functions

Even though the machine learning in the SparkR DataFrame API is limited to glm(), there are still ways to scale your native R functions to speed up the processing of algorithms that require multiple iterations or versions of models to be compared.  We discuss these techniques in this section, but be aware that this relies on moving back to the private API and may not be supported later.

This can be done because SparkR exposes the RDD API of Spark as a distributed list.  There are functions associated with distributed lists that allow us to apply our native R functions to each item in the distributed list.  These functions should be familiar to R users:

lapply
lapplyWithPartition
Reduce
reduceByKey
groupByKey
collect

To understand this more completely, walk through a simple example of how you can call a native R function that is written to parse fields.

#First, open a spark context as you normally would.
#See above for code

#next, set a link to the location of your data in S3 
s3url <- "s3://example”

#get an RDD by calling textFile.
rdd <- SparkR:::textFile(sc, s3url)

#now, create a native R function to parse the field. 
parseFields <-function(f) 

{ 
x <- strsplit(f,"t")
}

#lapply acts like a map function against an RDD. 
#This is the key to how you scale R functions across nodes.

New_rdd <- SparkR:::lapply(rdd , parseFields)

Of course, a valuable aspect of R comes in the abundance of useful packages available on CRAN and other sources. To scale these, you need to load your packages across all the local R instances running on each node of your cluster. To scale packages using lapply as in the above sample, first load the packages into the SparkContenxt for cluster execution. There are two ways to do this:

  • When initiating your Spark context from R, add the package in the constructor:

sc <- sparkR.init(sparkPackages=”package_here”)

  • Alternatively, after you have already established a Spark context, you can call the includePackages function with the package you would like to add to the Spark context:

SparkR:::includePackages(sc, “package here”)

The API discussed in this section was previously used for working with RDDs, and is deprecated in favor of the DataFrame API in Spark 1.5.  In the latest versions of SparkR (1.5 and 1.6) at the time of this writing, you can still access these via private APIs. You simply need to use the private API syntax of SparkR:::[function name] However, use these methods with caution because they may be removed in later versions of SparkR.  In the yet-to-be released Spark 2.0, there are plans to remove this API and include a newer DataFrame UDF API that will be better in functionality and performance. For more information about this new API and the corresponding discussion, see SPARK-6817.

Comparing SparkR and R to Hadoop

Now that we’ve provided a quick overview of the functionality of SparkR, you may be wondering how SparkR compares to prior attempts to move R processing into Hadoop.  This table provides a quick comparison.

Conclusion

We hope that this introduction to SparkR on Amazon EMR has given you the information you need to jump into exploring SparkR.  We’ve provided a lot of information here, but after you are familiar with the steps you can deploy your own SparkR clusters in a matter of minutes.

For more information about running Spark on EMR, see the following:

We would like to thank our friends at AMPlab for creating SparkR and providing great resources for learning it.

If you have any questions or suggestions, please leave a comment below.

————————————

Related

Connecting R with Amazon Redshift

Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.