AWS Big Data Blog

Analyze Your Data on Amazon DynamoDB with Apache Spark

Manjeet Chayel is a Solutions Architect with AWS

Every day, tons of customer data is generated, such as website logs, gaming data, advertising data, and streaming videos. Many companies capture this information as it’s generated and process it in real time to understand their customers.

Amazon DynamoDB is a fast and flexible NoSQL database service for applications that need consistent, single-digit-millisecond latency at any scale. It is a fully managed database, supporting both key-value and key-sorted set schemas.

Its flexible data model and reliable performance make it a great fit for mobile, web, gaming, ad tech, the Internet of Things, and many other applications, including the type of real-time data processing I’m talking about. In this blog post, I’ll show you how to use Apache Spark to process customer data in DynamoDB.

With the Amazon EMR 4.3.0 release, you can run Apache Spark 1.6.0 for your big data processing. When you launch an EMR cluster, it comes with the emr-hadoop-ddb.jar library required to let Spark interact with DynamoDB. Spark also natively supports applications written in Scala, Python, and Java and includes several tightly integrated libraries for SQL (Spark SQL), machine learning (MLlib), stream processing (Spark Streaming), and graph processing (GraphX). These tools make it easier to leverage the Spark framework for a variety of use cases.

How Spark talks to DynamoDB

Spark provides HadoopRDD for reading stored data (for example, to work with files in HDFS, tables in HBase, or objects in Amazon S3), which you can access by using the Hadoop FileSystem interface. Spark also uses familiar approaches like Hadoop InputFormat, OutputFormat, and InputSplits to map the underlying dataset to a resilient distributed dataset (RDD). I recommend you brush up on these subjects. When you call the HadoopRDD API, it uses the MapReduce API (org.apache.hadoop.mapred) to read and write.

The HadoopRDD class definition is as follows:

HadoopRDD(sc: SparkContext, conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int)

When a job starts and you use HadoopRDD to read from a data source, on the back end using MapReduce API operations, each input file is broken into splits and each task processes a single split. Each split is further divided into records of key-value pairs that the Spark task processes one record at a time.

The simplest way for Spark to interact with DynamoDB is to build a connector that talks to DynamoDB by implementing the simple Hadoop interfaces.

Amazon EMR provides an implementation of this connector as part of emr-hadoop-ddb.jar, which contains the DynamoDBItemWriteable class. Using this class, you can implement your own DynamoDBInputFormat as shown below.

public class DynamoDbInputFormat implements InputFormat, Serializable {

    @Override
    public InputSplit[] getSplits(@NonNull final JobConf job, final int numSplits) throws IOException {
        final int splits = Integer.parseInt(requireNonNull(job.get(NUMBER_OF_SPLITS), NUMBER_OF_SPLITS
            + " must be non-null"));

        return IntStream.
            range(0, splits).
            mapToObj(segmentNumber -> new DynamoDbSplit(segmentNumber, splits)).
            toArray(InputSplit[]::new);
}

Because the application does not know the number of splits on DynamoDB, you have to pass them manually by setting them up in a job configuration.

public class DynamoDbSplit implements InputSplit, Serializable {
    private int segmentNumber;
    private int splits;
 
// have setter and getter methods to read and set the variables
}

Finally, in the job configuration you pass the details about the splits and also the table name.

private static JobConf createDynamoDbJobConf() {
        final JobConf conf = new JobConf();
        conf.set("dynamodb.numberOfSplits", "10000");
        conf.set("dynamodb.tableName", "myDynamoDBTable");
	return conf;
}

We will write a simple Scala program to do a word count operation on a DynamoDB table using the preceding implementation. To get started, you need to complete the following steps:

1.Launch an EMR cluster with Spark

You can launch an EMR cluster with Spark and Hive. To learn how, see the Apache Spark topic.

2.Load data into DynamoDB

On the EMR cluster you just launched, load sample data into DynamoDB from a file present on S3. To learn how, see the Using Amazon Elastic MapReduce with DynamoDB post.

3.Read the DynamoDB table from the Spark program

Log in to your EMR cluster using any Secure Shell (SSH) client, as shown below.

ssh -o ServerAliveInterval=10 -i <YOUR-KEY-PAIR> hadoop@<EMR-MASTER-DNS>

We will run this example using a Spark interactive shell. You can invoke the Spark shell easily by entering the Spark shell and passing emr-ddb-hadoop.jar as an external jar. The example below counts the number of records in the DynamoDB table.

$ spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
   
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
 
var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "myDynamoDBTable")   // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")
jobConf.set("dynamodb.version", "2011-12-05")
 
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
 
var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
 
// Doing a count of items on Orders table
orders.count()

You can use Spark Streaming to process data coming from a live data stream, like one from Amazon Kinesis or Kafka. Spark Streaming is an extension of the core Spark framework. When you’ve processed the data, you can then store the results in DynamoDB, which can easily be used to back up applications to show metrics in real time on many items. Spark Streaming provides a high-level abstraction called a Discretized Stream or DStream, which represents a continuous sequence of RDDs. Spark Streaming uses the Amazon Kinesis Client Library (KCL) to consume data from an Amazon Kinesis stream.

The code snippet below from a Spark Streaming example creates as many DStreams (and in turn, KCL workers) as there are shards.

// Create the Amazon Kinesis DStreams
val kinesisStreams = (0 until numStreams).map { i =>
   KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}

To learn how to optimize Spark Streaming to efficiently process Amazon Kinesis streams, I recommend you read the Optimize Spark-Streaming to Efficiently Process Amazon Kinesis Streams post.

Conclusion

I’ve shown you how to use Spark to process data in DynamoDB. You can implement the Hadoop interfaces to do some complex logic and persist the information in a NoSQL system to power a variety of applications.

Happy Sparking!

If you have questions or suggestions, please leave a comment below.

————————

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming

Looking to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.