AWS Big Data Blog

Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers

Asmita Barve-Karandikar is an SDE with DynamoDB

Introduction

Imagine you own a popular mobile health app, with millions of users worldwide, that continuously records new information. It sends over one million updates per second to its master data store and needs the updates to be relayed to various replicas across different regions in real time. Amazon DynamoDB and DynamoDB Streams are accustomed to operating at this scale; and, they can handle the data storage and the updates capture for you. Developing a stream consumer application to replicate the captured updates to different regions at this scale may seem like a daunting task.

In a previous post, I described how you can use the Amazon Kinesis Client Library (KCL) and DynamoDB Streams Kinesis Adapter to efficiently process DynamoDB streams. In this post, I will focus on the KCL configurations that are likely to have an impact on the performance of your application when processing a large DynamoDB stream. By large, I mean a stream that requires your application to spin up two or more worker machines to consume and process it. Theoretically, the number of KCL workers you require depends on the following:

  1. The memory and CPU footprint of your application, which are directly proportional to the number of shards and the throughput per shard in your stream. My previous post provides more details about how to estimate this.
  2. The memory and CPU capacity on the machines you are looking to use for your application

More generally, a stream with a large number of shards (>100) or with a high throughput per shard (>200) will likely need you to spin up multiple workers.

The following figure shows three KCL workers in action, with DynamoDB stream shards distributed among them:

Three KCL workers in action

KCL Worker properties

Let’s explore some configurations on the KCL worker that are bound to have an impact on the performance of your application in a multi-worker scenario.

MaxLeasesForWorker: Maximum number of shards a worker can process

This is the most important property to adjust when using multiple workers. If not explicitly set, the first worker that comes up will try to pick up all the shards in your stream and could run out of resources (CPU or memory). It might take some iterations to optimize this value, but I will provide some guidance for estimating this.

The following figure shows the mapping between DynamoDB table partitions and stream shards:

Mapping between DynamoDB table partitions and stream shards

  • At any given point in time, each partition in a DynamoDB table maps to a single shard (that is, all updates to that partition are captured by a single shard).
  • Periodically, a shard stops accepting updates and continues to be available only for reads. The updates on the mapped partition are then captured by a new shard, which becomes the child of the closed shard.
  • When a partition splits to accommodate increased throughput or storage, each of the split partitions start writing to a new shard. These two new shards then become the children of the shard that the partition was writing to, just before it split.
  • KCL guarantees processing in the order of this lineage (that is, parent shards are always processed before their children).
  • Typically, shards in DynamoDB streams close for writes roughly every four hours after they are created and become completely unavailable 24 hours after they are created.

At any given point in time, the minimum number of shards that your application must process (N) is roughly equal to the number of partitions in your table at that time. Similarly, the maximum number of shards to process is roughly 6*N. Thus, we have:

N  = Number of partitions at the time the application starts (This online document can help you estimate N)

M = Number of KCL workers

Then, the range of values for this parameter is: [N/M, 6*N/M]

Specifying a value of N/M poses the following risk. During rollover (that is, when shards stop accepting writes and make way for child shards), the number of shards in the stream shoots up to roughly 2*N. Your workers are still processing the first N shards and, so, they can’t pick up any new ones.

Suppose this continues till the next rollover so that we have 3*N shards in the stream. The workers are now done with the first N shards and start looking for new shards to pick up. If they all happen to pick up assignments from the latest(third) generation of N shards (instead of the second one), all of them will end up simply waiting, because none of the parent shards (second generation) have completed processing. And, the parent shards can never complete because none of the workers can pick up any more shards to work on.

A value of 6N/M eliminates the above risk, but might initially create a skewed distribution, which could result in some workers trying to do more work than their resources can handle. The ideal value is somewhere between depending mainly on your throughput and the speed at which your application can process records. In most cases, a value of N/M + b (where b is some constant) may be a good place to start.

MaxLeasesToStealAtOneTime: Maximum number of shards a worker can steal at a time

A worker will try to steal leases from another worker in order to achieve an even load distribution. The frequency with which it can steal leases depends on the value of FailoverTimeMillis (the higher the value, the less frequent the stealing). Increasing MaxLeasesToStealAtOneTime may be beneficial in a multiple worker environment so that an even load distribution is reached quickly.

Other Properties

The following figure shows the impact of some of the configurations on shard consuming threads in a KCL application:

Impact of of some of the configurations on shard consuming threads

  • The blue blocks indicate the sleep time period for each thread for the value of the specified parameter.
  • There is one thread per shard for all the shards assigned to a worker. Depending on the state of the shard, the thread is either blocking on parent, initializing, processing, or shutting down.

CallProcessRecordsEvenForEmptyRecordList

The default is false. Change this to true if you are buffering records to make sure that processRecords gets called after every poll on the stream, and that the buffer is flushed.

IdleTimeBetweenReadInMillis

This property determines the frequency with which getRecords calls are made to DynamoDB streams to look for new records. As mentioned in this post, we recommend that you set the value of the MaxRecords parameter to the default of 1000. If you haven’t changed this, you are fetching a maximum of 1000 stream records with each getRecords call. Now, if the throughput on your table is very low and your application is not latency sensitive, you may tune up the value of IdleTimeBetweenReadInMillis from the default of 1s. Making lesser polls on streams may give you some cost savings.

Lowering this value may be beneficial if your throughput per partition is very high (>500 wps). However, you are increasing the number of getRecords() calls placed.

ParentShardPollIntervalMillis

This property determines the frequency with which a blocked child shard thread checks for parent shard completion. Initially, all shards are put in the blocking state to look for the parent. Increasing this value may unnecessarily delay the start of processing of shards with parents that have completed processing. Lowering this value increases the read throughput on the DynamoDB leases table. We recommend that you keep the default value for this parameter.

Amazon CloudWatch Metrics

By default, KCL publishes metrics at the Detail level in Amazon CloudWatch. It also publishes a number of metrics per shard, which may be difficult to track as the number of shards increases. You can turn these off as follows:

final Set<String> metricsDimensionsForLargeTables = ImmutableSet.<String>builder().addAll( 
KinesisClientLibConfiguration.METRICS_ALWAYS_ENABLED_DIMENSIONS).add("WorkerIdentifier").
build();

KinesisClientLibConfiguration workerConfig = new KinesisClientLibConfiguration(…)
.withMetricsEnabledDimensions(metricsDimensionsForLargeTables);

The above code snippet adds dimensions for Operation and WorkerIdentifier, leaving out ShardId. Turning off per shard metrics may reduce the overall memory footprint of your application. For more details about metrics, see Monitoring the Amazon Kinesis Client Library with Amazon CloudWatch in the Amazon Kinesis Streams documentation.

Summary of worker properties

Property

Default

If the value is too low…

MaxLeasesForWorker MAX_INT Workers may not be able to pick up and process shards at par with the throughput, increasing the risk of falling behind. In worst case, all workers may end up with shards that are blocking on parent, resulting in the whole application coming to a halt.

MaxLeasesToStealAtOneTime

 

1 Slow convergence to an even load distribution might result in some workers doing more work than they can handle.

IdleTimeBetweenReadsInMillis

 

1s Frequency of getRecords calls is higher, resulting in more CPU usage and more cost for DynamoDB streams usage.
ParentShardPollIntervalMillis 10s More frequent reads on DynamoDB leases table.

Conclusion

Now that you are familiar with the different configurations on the KCL, you can play around with the DynamoDB cross region replication library on GitHub to setup your own replication solution on one or more machines. Alternately, clone the repos for the KCL and the Adapter on GitHub, and then start building your own stream consuming application from scratch!

In an upcoming post, I’ll talk about monitoring your application for failures using KCL logs and CloudWatch metrics.

If you have questions or suggestions, please leave a comment below.

———————–

Related

Processing Amazon DynamoDB Streams Using the Amazon Kinesis Client Library