AWS Big Data Blog

Real-time in-memory OLTP and Analytics with Apache Ignite on AWS

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.

Babu Elumalai is a Solutions Architect with AWS

Organizations are generating tremendous amounts of data, and they increasingly need tools and systems that help them use this data to make decisions. The data has both immediate value (for example, trying to understand how a new promotion is performing in real time) and historic value (trying to understand the month-over-month revenue of launched offers on a specific product).

The Lambda  architecture (not AWS Lambda) helps you gain insight into immediate and historic data by having a speed layer and a batch layer. You can use the speed layer for real-time insights and the batch layer for historical analysis.

In this post, we’ll walk through how to:

  1. Build a Lambda architecture using Apache Ignite
  2. Use Apache Ignite to perform ANSI SQL on real-time data
  3. Use Apache Ignite as a cache for online transaction processing (OLTP) reads

To illustrate these approaches, we’ll discuss a simple order-processing application. We will extend the architecture to implement analytics pipelines and then look at how to use Apache Ignite for real-time analytics.

A classic online application

Let’s assume that you’ve built a system to handle the order-processing pipeline for your organization. You have an immutable stream of order documents that are persisted in the OLTP data store. You use Amazon DynamoDB to store the order documents coming from the application.

Below is an example order payload for this system:

{'BillAddress': '5719 Hence Falls New Jovannitown  NJ 31939', 'BillCity': 'NJ', 'ShipMethod': '1-day', 'UnitPrice': 14, 'BillPostalCode': 31939, 'OrderQty': 1, 'OrderDate': 20160314050030, 'ProductCategory': 'Healthcare'}

{'BillAddress': '89460 Johanna Cape Suite 704 New Fionamouth  NV 71586-3118', 'BillCity': 'NV', 'ShipMethod': '1-hour', 'UnitPrice': 3, 'BillPostalCode': 71586, 'OrderQty': 1, 'OrderDate': 20160314050030, 'ProductCategory': 'Electronics'}

Here is example code that I used to generate sample order data like the preceding and write the sample orders  into DynamoDB.

The illustration following shows the current architecture for this example.

Your first analytics pipeline

Next, suppose that business users in your organization want to analyze the data using SQL or business intelligence (BI) tools for insights into customer behavior, popular products, and so on. They are considering Amazon Redshift for this. Amazon Redshift is a fast, fully managed, petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using ANSI SQL or your existing business intelligence tools.

To use this approach, you have to build a pipeline that can extract your order documents from DynamoDB and store them in Amazon Redshift. Let’s look at the components we can use to build this pipeline:

  • DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours. Applications can access this log and view the data items as they appeared before and after they were modified, in near real time.
  • AWS Lambda lets you run code without provisioning or managing servers.
  • Amazon Kinesis Firehose can capture and automatically load streaming data into Amazon S3 and Amazon Redshift, enabling near real-time analytics with existing business intelligence tools and dashboards.

You can connect DynamoDB Streams, Lambda, and Amazon Kinesis Firehose to build a pipeline that continuously streams data from DynamoDB to Amazon Redshift:

  1. Create your Amazon Redshift cluster in a VPC in the Amazon VPC service.
  1. Create your order tables in the Amazon Redshift cluster. For reference, use the example Create Table statement following.
create table orderdata(orderid varchar(100), orderdate bigint,ShipMethod varchar(10),BillAddress varchar(200),BillCity varchar(50), BillPostalCode int, OrderQty int, UnitPrice int, productcategory varchar(200))distkey(orderid) sortkey(orderdate,productcategory);

3.   Create a delivery stream in Amazon Kinesis Firehose that delivers incoming events to Amazon Redshift.

  1. Enable DynamoDB Streams on your DynamoDB table by using this approach. Once you’ve enabled DynamoDB Streams, every write to your DynamoDB table is available asynchronously in your streams.
  1. Create a Lambda function that reads the streams data and writes to the Amazon Kinesis Firehose delivery stream. You can follow the instructions in this blog post to create a Lambda function that will process the streams data. I have written example code in Python that processes the order stream data and writes to Firehose.

Using the preceding steps, you will build an architecture like the one below.

You can use an open-source BI solution like Apache Zeppelin to perform analytics on Amazon Redshift as shown above. Apache Zeppelin is available as a sandbox application on Amazon EMR. In the image below, the visualization shows the shipping methods that customers chose for their orders. Apache Zeppelin creates this visualization from Amazon Redshift.

SQL on the data streams

Business users have been content to perform analytics on data collected in Amazon Redshift to spot trends. But recently, they have been asking AWS whether the latency can be reduced for real-time analysis. At the same time, they want to continue using the analytical tools they’re familiar with.

In this situation, we need a system that lets you capture the data stream in real time and use SQL to analyze it in real time.

In the earlier section, you learned how to build the pipeline to Amazon Redshift with Firehose and Lambda functions. The following illustration shows how to use Apache Spark Streaming on EMR to compute time window statistics from DynamoDB Streams. The computed data can be persisted to Amazon S3 and accessed with SparkSQL using Apache Zeppelin.

Note: For this to work, use DynamoDBStreamsAdapterClient and integrate with Amazon Kinesis client library for Spark Streaming provided under the Amazon Software License (ASL).

This is a great option for doing real-time analytics, but it requires that your analysts know how to use Apache Spark to compute results in real time. In the next section, we’ll introduce Apache Ignite and talk about how you can use it as to implement real-time analytics while letting users interact with the data streams using SQL.

What is Apache Ignite?

As the following image shows, Apache Ignite is an in-memory data fabric built on top of a distributed in-memory computing platform. Apache Ignite is optimized for high performance and can process large-scale datasets in real time—orders of magnitude faster than is possible with traditional disk-based or flash-based technologies.

Connecting the pieces with Apache Ignite

The following illustration shows how you can use Apache Ignite to build the architecture we’ve described. In this architecture, you use Amazon Kinesis Client Library (Amazon KCL) to read from the DynamoDB Streams and stream into Apache Ignite. You can directly query data in Ignite as it becomes available and use SQL through Zeppelin. And because the writes from DynamoDB are replicated asynchronously into Apache Ignite, the Ignite cluster can actually serve as an eventually consistent cache.

Deploying Apache Ignite on AWS

You can either use an AWS CloudFormation template or bootstrap actions with Amazon EMR to deploy an Apache Ignite cluster. We have provided a CloudFormation script that will help you deploy Apache Ignite on AWS. Because Apache Ignite is an in-memory technology, you might need to forecast how much data you want Apache Ignite to hold and provision the cluster based on that forecast so that you don’t run out of memory. The following illustration shows a CloudFormation deployment for Apache Ignite.

For production systems, it’s highly advised to change “Internal CIDR IP range to whitelist” and restrict to only the IP range you want to allow access from. The format for the IP will be xxx.xxx.xxx.xxx/32 for a single IPv4 address or for a range xxx.xxx.xxx.0/24

Note: Apache Ignite typically performs node discovery through multicast. Because AWS doesn’t support multicast at this point, you can use the S3-based discovery tool TcpDiscoveryS3IpFinder, which comes with the Ignite distribution.

When deploying Ignite using CloudFormation, you should use Auto Scaling groups to launch the cluster across multiple Availability Zones for high availability. In addition, Apache Ignite lets you configure replicas for your data through a backup parameter. You set this parameter to 1 to maintain two copies of your data. Apache Ignite also lets you configure a replicated or partitioned cache. A replicated cache makes Ignite replicate every write across every node in the cluster. Use partitioned mode if you want to horizontally scale the cluster with data.

Streaming data into Apache Ignite

Your order data is already available in the DynamoDB Streams. You need to write a KCL app to consume the streams and publish the order data to Apache Ignite. You need to leverage the DynamoDB Streams adapter for Amazon Kinesis to use the KCL library to reliably consume and process the DynamoDB Streams.

The sample code here will help you get started building a KCL app for Apache Ignite from DynamoDB Streams. Below is an excerpt from the code.

TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509","<IP_ADDRESS1>:47500..47509","<IP_ADDRESS2>:47500..47509"));
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setDiscoverySpi(spi);
cfg.setClientMode(true);
cfg.setPeerClassLoadingEnabled(true);

Ignite ignite = Ignition.start(cfg);
IgniteDataStreamer<String,orderdata> cache = Ignition.ignite().dataStreamer("<cacheName>");
LOG.info(">>> cache acquired");

recordProcessorFactory = new StreamsRecordProcessorFactory(cache);
workerConfig = new KinesisClientLibConfiguration("ddbstreamsprocessing",
streamArn, streamsCredentials, "ddbstreamsworker")
.withMaxRecords(1000)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

System.out.println("Creating worker for stream: " + streamArn);
worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
System.out.println("Starting worker...");

int exitCode = 0;
try {
worker.run();
catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
      t.printStackTrace();
      exitCode = 1;
}

The KCL app loops in the background to continuously publish the order stream to Apache Ignite. In this case, leverage the Ignite Data Streamer API to push large data streams. The illustration below shows the data streamer in action and how the data can be consumed with SQL on the other side.

Real-time SQL analytics

This architecture allows business users to seamlessly query the order data with ANSI SQL at very low latencies. Apache Ignite also integrates with Apache Zeppelin and can be used to visualize your SQL results using the Ignite interpreter for Zeppelin. The example below shows a simple visualization on a SQL query run on Apache Ignite through Zeppelin, followed by an interpreter configuration for Ignite.

Apache Ignite also allows you to join multiple tables if you have a highly normalized schema like a star schema. You can make use of affinity collocation to collocate same cache keys together for efficient joins by avoiding moving data across the network.

When users run a SQL query, the query runs across multiple nodes in the cluster, emulating an massively parallel processing (MPP) architecture. In partitioned cache mode, each node is responsible for its own data. This approach allows Ignite to parallelize SQL query execution in memory, resulting in significantly higher performance for analytics.

You can also define indexes on your datasets to further improve performance, and you can configure Ignite to store these indexes as off-heap structures.

Consider running Apache Ignite clusters on R3 instance types on AWS. R3 instances are memory-optimized and are a great fit for memory intensive workloads. We also expect to launch X1 instance types later this year. These instances will feature up to 2 TB of memory and might also be a great choice to run in-memory Ignite clusters in the future.

Sliding window analysis

It’s easy to configure sliding windows on Apache Ignite because you can define an expiration on your dataset. You can configure a time-based window to expire data after, say, 30 seconds to provide a 30-second sliding window of your data. You might need to create a separate cache for this and stream the data into this cache as well.

The following illustration shows an Apache Ignite sliding window.

Using the cluster as an eventually consistent cache

In this design, we stream data continuously to Apache Ignite. Because the data writes are happening on DynamoDB, the Apache Ignite cluster can also be considered an eventually consistent cache for DynamoDB. So your OLTP read logic can be changed to something like the following for cases when you can use eventually consistent reads:

Read Key K1
	Read K1 from Apache Ignite
	If K1 not found
		Cache Miss
		Read from DynamoDB
		Populate Apache Ignite with K1
		Return K1 to client
	Else
		Return K1 to client

Conclusion

In this post, we looked at a business problem and how Apache Ignite can be applied to solving that business problem through its support for an in-memory data fabric. Apache Ignite has other features like ACID-compliant distributed transaction support; publish/subscribe (pub/sub) cluster-wide messaging; the Ignite resilient distributed dataset (RDD), which is an implementation of the native Spark RDD that lets you share Spark RDD across applications; and many more.

To use Apache Ignite in this way, you need to deploy and manage it on AWS. Before you invest in this, consider whether an architecture based on managed services meets your needs. In this scenario you have to manage the Apache Ignite cluster, so you must be careful about choosing your cache size, the level of replication for your data, how you leverage off-heap memory, how to tune the eviction policy for your cache, and how to tune garbage collection for your Java virtual machine. Understand your data well and test thoroughly with different settings to arrive at an optimal configuration.

If you have questions or suggestions, please leave a comment below.

————————————–

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming