Francisco Oliveira is a consultant with AWS Professional Services
Customers starting their big data journey often ask for guidelines on how to submit user applications to Spark running on Amazon EMR. For example, customers ask for guidelines on how to size memory and compute resources available to their applications and the best resource allocation model for their use case.
In this post, I show how to set spark-submit flags to control the memory and compute resources available to your application submitted to Spark running on EMR. I discuss when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.
Spark execution model
At a high level, each application has a driver program that distributes work in the form of tasks among executors running on several nodes of the cluster.
The driver is the application code that defines the transformations and actions applied to the data set. At its core, the driver has instantiated an object of the SparkContext class. This object allows the driver to acquire a connection to the cluster, request resources, split the application actions into tasks, and schedule and launch tasks in the executors.
The executors not only perform tasks sent by the driver but also store data locally. As the executors are created and destroyed (see the “Enabling dynamic allocation of executors” section later), they register and deregister with the driver. The driver and the executors communicate directly.
To execute your application, the driver organizes the work to be accomplished in jobs. Each job is split into stages and each stage consists of a set of independent tasks that run in parallel. A task is the smallest unit of work in Spark and executes the same code, each on a different partition.
Spark programming model
An important abstraction in Spark is the resilient distributed dataset (RDD). This abstraction is key to perform in-memory computations. An RDD is a collection of read-only and immutable partitions of data that are distributed across the nodes of the cluster. Partitions in Spark allow the parallel execution of subsets of the data. Spark applications create RDDs and apply operations to RDDs. Although Spark partitions RDDs automatically, you can also set the number of partitions.
RDDs support two types of operations: transformation and actions. Transformations are operations that generate a new RDD, and actions are operations that write data to external storage or return a value to the driver after running a transformation on the dataset. Common transformations include operations that filter, sort and group by key. Common actions include operations that collect the results of tasks and ship them to the driver, save an RDD, or count the number of elements in a RDD.
A common way to launch applications on your cluster is by using the spark-submit script. This script offers several flags that allow you to control the resources used by your application.
Setting the spark-submit flags is one of the ways to dynamically supply configurations to the SparkContext object that is instantiated in the driver. spark-submit can also read configuration values set in the conf/spark-defaults.conf file which you can set using EMR configuration options when creating your cluster and, although not recommended, hardcoded in the application. An alternative to change conf/spark-defaults.conf is to use the --conf prop=value flag. I present both the spark-submit flag and the property name to use in the spark-defaults.conf file and --conf flag.
Spark applications running on EMR
Any application submitted to Spark running on EMR runs on YARN, and each Spark executor runs as a YARN container. When running on YARN, the driver can run in one YARN container in the cluster (cluster mode) or locally within the spark-submit process (client mode).
When running in cluster mode, the driver runs on ApplicationMaster, the component that submits YARN container requests to the YARN ResourceManager according to the resources needed by the application. A simplified and high-level diagram of the application submission process is shown below.
When running in client mode, the driver runs outside ApplicationMaster, in the spark-submit script process from the machine used to submit the application.
Setting the location of the driver
With spark-submit, the flag --deploy-mode can be used to select the location of the driver.
Submitting applications in client mode is advantageous when you are debugging and wish to quickly see the output of your application. For applications in production, the best practice is to run the application in cluster mode. This mode offers you a guarantee that the driver is always available during application execution. However, if you do use client mode and you submit applications from outside your EMR cluster (such as locally, on a laptop), keep in mind that the driver is running outside your EMR cluster and there will be higher latency for driver-executor communication.
Setting the driver resources
The size of the driver depends on the calculations the driver performs and on the amount of data it collects from the executors. When running the driver in cluster mode, spark-submit provides you with the option to control the number of cores (--driver-cores) and the memory (--driver-memory) used by the driver. In client mode, the default value for the driver memory is 1024 MB and one core.
Setting the number of cores and the number of executors
The number of executor cores (--executor-cores or spark.executor.cores) selected defines the number of tasks that each executor can execute in parallel. The best practice is to leave one core for the OS and about 4-5 cores per executor. The number of cores requested is constrained by the configuration property yarn.nodemanager.resource.cpu-vcores, which controls the number of cores available to all YARN containers running in one node and is set in the yarn-site.xml file.
The number of executors per node can be calculated using the following formula:
number of executors per node = number of cores on node - 1 for OS/number of task per executor
The total number of executors (--num-executors or spark.executor.instances) for a Spark job is:
total number of executors = number of executors per node * number of instances -1.
Setting the memory of each executor
The memory space of each executor container is subdivided on two major areas: the Spark executor memory and the memory overhead.
Note that the maximum memory that can be allocated to an executor container is dependent on the yarn.nodemanager.resource.memory-mb property available at yarn-site.xml. The executor memory (--executor-memory or spark.executor.memory) defines the amount of memory each executor process can use. The memory overhead (spark.yarn.executor.memoryOverHead) is off-heap memory and is automatically added to the executor memory. Its default value is executorMemory * 0.10.
Executor memory unifies sections of the heap for storage and execution purposes. These two subareas can now borrow space from one another if usage is exceeded. The relevant properties are spark.memory.fraction and spark.memory.storageFraction. For more information, see the Unified Memory Management in Spark 1.6 whitepaper.
The memory of each executor can be calculated using the following formula:
memory of each executor = max container size on node / number of executors per node
A quick example
To show how you can set the flags I have covered so far, I submit the wordcount example application and then use the Spark history server for a graphical view of the execution.
First, I submit a modified word count sample application as an EMR step to my existing cluster. The code can be seen below:
from __future__ import print_function from pyspark import SparkContext import sys if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: wordcount ", file=sys.stderr) exit(-1) sc = SparkContext(appName="WordCount") text_file = sc.textFile(sys.argv) counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) counts.saveAsTextFile(sys.argv) sc.stop()
The cluster has six m3.2xlarge instances plus one instance for the master, each with 8 vCPU and 30 GB of memory. The default value of yarn.nodemanager.resource.memory-mb for this instance type is 23 GB.
According to the formulas above, the spark-submit command would be as follows:
spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/
I submit the application as an EMR step with the following command:
aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,5,--executor-cores,5,--executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE
Note that I am also setting the property spark.yarn.submit.waitAppCompletion with the step definitions. When this property is set to false, the client submits the application and exits, not waiting for the application to complete. This setting allows you to submit multiple applications to be executed simultaneously by the cluster and is only available in cluster mode.
I use the default values for --driver-memory and --driver-cores, as the sample application is writing directly to Amazon S3 and the driver is not receiving any data from the executors.
Enabling dynamic allocation of executors
Spark on YARN has the ability to dynamically scale up and down the number of executors. This feature can be valuable when you have multiple applications being processed simultaneously as idle executors are released and an application can request additional executors on demand.
To enable this feature, please see the steps in the EMR documentation.
Spark provides granular control to the dynamic allocation mechanism by providing the following properties:
- Initial number of executors (spark.dynamicAllocation.initalExecutors)
- Minimum number of executors to be used by the application (spark.dynamicAllocation.minExecutors)
- Maximum executors that can be requested (spark.dynamicAllocation.maxExecutors)
- When to remove an idle executor (sparkdynamicAllocation.executorIdleTime)
- When to request new executors to process waiting tasks (spark.dynamicAllocation.schedulerBacklogTimeout and spark.dynamicAllocation.sustainedSchedulerBacklogTimeout)
Automatically configure executors with maximum resource allocation
EMR provides an option to automatically configure the properties above in order to maximize the resource usage of the entire cluster. This configuration option can be valuable when you have only a single application being processed by your cluster at a time. Its usage should be avoided when you expect to run multiple applications simultaneously.
To enable this configuration option, please see the steps in the EMR documentation.
By setting this configuration option during cluster creation, EMR automatically updates the spark-defaults.conf file with the properties that control the compute and memory resources of an executor, as follows:
- spark.executor.memory = (yarn.scheduler.maximum-allocation-mb - 1g) -spark.yarn.executor.memoryOverhead
- spark.yarn.executor.memoryOverhead = (yarn.scheduler.maximum-allocation-mb - 1g) * 0.10
- spark.executor.instances = [this is set to the initial number of core nodes plus the number of task nodes in the cluster]
- spark.executor.cores = yarn.nodemanager.resource.cpu-vcores
- spark.default.parallelism = spark.executor.instances * spark.executor.cores
A graphical view of the parallelism
The Spark history server UI is accessible from the EMR console. It provides useful information about your application’s performance and behavior. You can see the list of scheduled stages and tasks, retrieve information about the executors, obtain a summary of memory usage, and retrieve the configurations submitted to the SparkContext object. For the purposes of this post, I show how the flags set in the spark-submit script used in the example above translate to the graphical tool.
To access the Spark history server, enable your SOCKS proxy and choose Spark History Server under Connections.
For Completed applications, choose the only entry available and expand the event timeline as below. Spark added 5 executors as requested in the definition of the --num-executors flag.
Next, by navigating to the stage details, you can see the number of tasks running in parallel per executor. This value is the same as the value of the --executor-cores flag
In this post, you learned how to use spark-submit flags to submit an application to a cluster. Specifically, you learned how to control where the driver runs, set the resources allocated to the driver and executors, and the number of executors. You also learned when to use the maximizeResourceAllocation configuration option and dynamic allocation of executors.
If you have questions or suggestions, please leave a comment below.