AWS Big Data Blog

Writing SQL on Streaming Data with Amazon Kinesis Analytics – Part 2

Ryan Nienhuis is a Senior Product Manager for Amazon Kinesis.

This is the second of two AWS Big Data posts on Writing SQL on Streaming Data with Amazon Kinesis Analytics. In the last post, I provided an overview of streaming data and key concepts, such as the basics of streaming SQL, and completed a walkthrough using a simple example. In this post, I cover more advanced stream processing concepts using Amazon Kinesis Analytics and you can complete an end-to-end application.


Amazon Kinesis Analytics allows you to easily write SQL ­­­on streaming data, providing a powerful way to build a stream processing application in minutes. The service allows you to connect to streaming data sources, process the data with sub-second latencies, and continuously emit results to downstream destinations for use in real-time alerts, dashboards, or further analysis.

This post introduces you to the different types of windows supported by Amazon Kinesis Analytics, the importance of time as it relates to stream data processing, and best practices for sending your SQL results to a configured destination. These are more advanced stream data processing concepts; for an introduction, see Writing SQL on Streaming Data with Amazon Kinesis Analytics Part 1.

Generating real-time analytics using windows

As I covered in the first post, streaming data is continuously generated; therefore, you need to specify bounds when processing data to make your result set deterministic. Some SQL statements operate on individual rows and have natural bounds, such as a continuous filter that evaluates each row based upon a defined SQL WHERE clause. However, SQL statements that process data across rows need to have set bounds, such as calculating the average of particular column. The mechanism that provides these bounds is a window.

Windows are important because they define the bounds for which you want your query to operate. The starting bound is usually the current row that Amazon Kinesis Analytics is processing, and the window defines the ending bound.

Windows are required with any query that works across rows, because the in-application stream is unbounded and windows provide a mechanism to bind the result set and make the query deterministic. Analytics supports three types of windows: tumbling, sliding, and custom.

Tumbling windows

Tumbling windows are useful for periodic reports. You can use a tumbling window to compute an average number of visitors to your website in the last 5 minutes, or the maximum over the past hour. A single result is emitted for each key in the group as specified by the clause at the end of the defined window.

An important characteristic of a tumbling window is that the bounds do not overlap; the start of a new tumbling window begins with the end of the old window. The below image visually captures how a 5-minute tumbling window would aggregate results, in separate windows that are not overlapping. The blue, green, and red windows represent minutes 0 to 5, 5 to 10, and 10 to 15, respectively.

Tumbling windows are always included in a GROUP BY clause and use the FLOOR function. The FLOOR function takes the lowest possible integer for a given input. To create a tumbling time window, convert a timestamp to an integer representation and put it in a FLOOR function to create the window bound. Three examples are shown below:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
	ticker_symbol varchar(4), 
	ticker_symbol_count integer);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, COUNT(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
/* 10 second tumbling window */
--FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND) ;
/* 1 minute tumbling window */
-- FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);
/* 2 minute tumbling window */
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 2 TO MINUTE);

This tumbling window syntax requires some explanation. Look at the first example:

FLOOR((“SOURCE_SQL_STREAM_001”.ROWTIME – TIMESTAMP ‘1970-01-01 00:00:00’) SECOND / 10 TO SECOND)

  • “SOURCE_SQL_STREAM_001”.ROWTIME – TIMESTAMP ‘1970-01-01 00:00:00’

Timestamps in Amazon Kinesis Analytics are stored as signed 64-bit integers, with 0 representing the Unix epoch (‘1970-01-01 00:00:00’). This statement calculates the number of seconds after the Unix epoch as an integer. For more information about the timestamp data type, see Data Types in the Amazon Kinesis Analytics SQL Reference Guide.

  • (<seconds since Unix epoch>) SECOND / 10 TO SECOND

This statement converts the integer back to seconds, divides by 10, and then specifies the precision to seconds. The result is the number of seconds rounded up to 10 after the Unix epoch.

  • FLOOR(<seconds rounded up to nearest 10th second since Unix epoch)

This FLOOR function takes the highest possible integer given a range. This function gets to a deterministic result when it sees a result that is no longer part of the last range (i.e., a new window). Therefore, the first row with a ROWTIME value that rounds up to the nearest 10th second triggers a result.

Sliding windows

Sliding windows are useful in real-time monitoring use cases because they provide the most up-to-date information, and allow you to easily control the sensitivity of the metric through the window size. For example, you can use a sliding window to compute the average availability (or success rate) of a public API call.

Sliding windows are a fixed size like tumbling windows, but each subsequent window overlaps with the previous one, as shown in the image below. Sliding windows emit new results any time a new row enters the window.

For each new row entering the window, a new row is generated with the aggregated result. For this reason, sliding windows are typically used in conjunction with conditions in a WHERE clause; for example, only emitting the average availability of a public API action if the result breaches a specific threshold, as specified by the WHERE clause. An example of a sliding window with a WHERE clause is shown below.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
	ticker_symbol varchar(4), 
	avg_change double);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, avg_change
FROM (
    SELECT STREAM
        ticker_symbol, 
        AVG(change) OVER W1 as avg_change
    FROM "SOURCE_SQL_STREAM_001"
    WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING)
)
WHERE ABS(avg_change) > 1;

The above example is a nested query. The nested SELECT statement calculates the average change over a 10 second sliding window, defined as W1. This window partitions results by the ticker symbol (PARTITION BY ticker symbol), and uses an interval to define the start and end of the query (RANGE INTERVAL ‘10’ SECOND PRECEDING). The outside SELECT statement filters results where the absolute value of the average change is greater than 1.

Custom windows

The final type of window are custom windows, which are useful for event correlation use cases such as sessionization. Sessionization is a common web and mobile analytics use case, and refers to grouping together all actions completed by a user during a session to describe their end-to-end experience. For example, it is extremely useful to determine the most successful route to purchasing a product on an ecommerce website. Custom windows are ideal for event correlation use cases because they do not have a fixed size; they end when a specific criteria is met. I will cover custom windows in more detail in a later post.

Time semantics in Amazon Kinesis Analytics

In streaming data, there are different types of time; how they are used is important to the analytics.  All of the above examples use ROWTIME, or the processing time, which is great for some use cases. However, in many scenarios, you want a time that more accurately reflects when the event occurred, such as the event or ingest time.

Amazon Kinesis Analytics supports three different time semantics for processing data:

  • Event time

The timestamp is assigned when the event occurred, also called client-side time. For example, the timestamp that is added in your client application, such as a web or mobile application. Event time is often desirable in analytics because it represents the actual time when the event occurred in the environment. However, many sources, such as mobile phones and web clients, do not have reliable clocks, which can lead to inaccurate timestamps. Moreover, connectivity issues can lead to records appearing in a stream out of order relative to when they occurred.

If you have a reasonably accurate clock on your source, event time is earlier than processing time.

  • Ingest time

The timestamp is assigned when a record is added to the stream, also called server-side time. Amazon Kinesis Streams includes a field called ApproximateArrivalTimeStamp in every record that provides this timestamp. The ingest time is often a close approximation of event time. If there is a delay in the record being delivered to the stream, this can lead to inaccuracies; however, this rarely happens and usually only when devices lose connectivity. Finally, while the ingest time is rarely out of order, this can occur due to the distributed nature of streaming data. Therefore, ingest time should be considered a mostly accurate and in-order reflection of the event time.

For all practical purposes, ingest time is earlier than processing time.

  • Processing time

The timestamp when Amazon Kinesis Analytics inserts a row in the first in-application stream. Analytics provides this timestamp in the ROWTIME column that exists in each in-application stream. The processing time is always monotonically increasing, but it will not be accurate if your application falls behind. This ROWTIME is very accurate in relation to the wall clock, but may not be the time when the event actually occurred.

A scenario may help you understand how these times manifest themselves in the real world. Say that you have a mobile application for which you want to count the number of unique users using your app every 1 minute and send the data to your live analytics store. You capture each user event, append a timestamp (event time), and send it to an Amazon Kinesis stream. In most cases, these events arrive as they occur, with less than a second delay. However, some customers have inaccurate clocks which cause the appended event timestamp to be wrong. Fortunately, another timestamp is added to each event in the stream that is accurate, giving you a good approximation of when the event occurred.

You now set up an Amazon Kinesis Analytics application to read from the stream. Your Analytics application computes the unique users of your mobile application using the ROWTIME (processing time). After your application has been in production for a week, you find that you need some additional logic to make the query more accurate and now you need to re-process all records from the last 24 hours. When you start your application from earlier in the stream, the time used as an analytic is based off ROWTIME, which is right now. This means the time associated with your computation is incorrect.

How do you accurately compute the unique users of your mobile application?

One of the recommended approaches for gracefully handling out of order events and achieving an accurate logical time for your analytics is to use two time windows. The first window uses the processing time, or ROWTIME, to trigger when results are emitted. The second window uses the ingest time, or ApproximateArrivalTimestamp, as the logical time that reflects when the event occurs. This strategy handles situations where data arrives late and when your applications falls behind.

In the first post, you built an application that read from a demo stream containing simulated stock ticker data. The SQL code you wrote calculated the average price, average change, maximum price, and maximum change over a 10-second tumbling time window. In this post, you are going to update that application to use two windows to gracefully handle out-of-order events and applications failing behind. You then send those final results to an Amazon Redshift table through an Amazon Kinesis Firehose delivery stream. The final solution:

  1. Ingests and reliably stores streaming data in an Amazon Kinesis stream
  2. Parses and validates the raw data through the source configuration of your Amazon Kinesis Analytics application
  3. Computes useful metrics on tumbling time windows using your Analytics application SQL code
  4. Transforms SQL results into CSV format using the destination configuration of your Analytics application
  5. Ingests and reliable delivers CSV data to an Amazon Redshift table through Firehose

Calculating simple metrics quickly and making them available to business and IT users in minutes is a common use case for streaming data. Many customers use long ETL pipelines for this scenario that are difficult to maintain and have hours to days of delay before a stakeholder can get visibility into what is happening with the business. You will use the stock ticker demo stream to perform a count of the number of transactions and the average/maximum/minimum prices and change for every stock over 1 minute.

Walkthrough:  Completing your first end-to-end application

You can jump right in if you completed the walkthrough in Part 1; go to the “Create the destination for your SQL results” section. If not, take five minutes to complete the following two setup steps.

Setup step 1:  Create an Amazon Kinesis Analytics application

  1. Open the Amazon Kinesis Analytics console and choose Create a new application.

  1. Provide a name and (optional) description for your application and choose Continue.

You are taken to the application hub page.

Setup step 2:  Create a streaming data source

For input, Analytics supports Amazon Kinesis Streams and Amazon Kinesis Firehose as streaming data input, and reference data input through S3. The primary difference between these two sources is that data is read continuously from the streaming data sources and at one time for reference data sources. Reference data sources are used for joining against the incoming stream to enrich the data.

In Amazon Kinesis Analytics, choose Connect to a source.

If you have existing Amazon Kinesis streams or Firehose delivery streams, they are shown here.

For the purposes of this post, you use a demo stream, which creates and populates a stream with sample data on your behalf. The demo stream is created under your account with a single shard, which supports up to 1 MB/sec of write throughput and 2 MB/sec of read throughput. Analytics writes simulated stock ticker data to the demo stream directly from your browser and your application reads data from the stream in real time.

Next, choose Configure a new stream and Create demo stream.

Later, you refer to the demo stream in your SQL code as “SOURCE_SQL_STREAM_001”. Analytics calls the DiscoverInputSchema API operation, which infers a schema by sampling records from your selected input data stream. You can see the applied schema on your data in the formatted sample shown in the browser, as well as the original sample taken from the raw stream. You can then edit the schema to fine tune it to your needs.

Feel free to explore; when you are ready, choose Save and continue. You are taken back to the streaming application hub.

On the streaming application hub, choose Go to SQL Editor and Run Application.

Copy and paste the following code into the text editor.

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol, sector, ticker_symbol_count, avg_price, avg_change, max_price, min_price
FROM "INTERMEDIATE_SQL_STREAM";
WHERE sector = 'TECHNOLOGY';

Choose Save and run SQL.

Create the destination for your SQL results

Amazon Kinesis Analytics supports up to four destinations, which can be any combination of an Amazon S3 bucket, OpenSearch cluster, or Amazon Redshift table through a Firehose delivery stream; or an Amazon Kinesis stream. If your specific destination isn’t supported, you can write to your SQL results to a stream and then use AWS Lambda to send it to your desired destination.

For this walkthrough, you are going to add an output that delivers data to an Amazon Redshift table using a Firehose delivery stream. There are many advantages to using Firehose to load data into Amazon Redshift, including the separation of the data processing and delivery.

In this context, Firehose serves as a buffer between the application and the eventual destination. The application doesn’t have to wait for data to be loaded to continue processing. If the Amazon Redshift cluster were to become temporarily unavailable for any reason, the application would still be able to process the latest data in the stream. This is especially important when you have multiple destinations; you don’t want to block all processing because a single destination is misconfigured or down.

Here are the steps for this walkthrough:

  1. Create a cluster, delivery stream, and destination table with a predefined CloudFormation template.
  2. Create a target table.
  3. Update the Firehose delivery stream and Analytics application.
  4. Update window timing.
  5. Set up the destination.
  6. Verify results.
  7. Clean up.

Create a cluster, delivery stream, and destination table

You need to create (a) an Amazon Redshift cluster, (b) a Firehose delivery stream, and (c) the Amazon Redshift table to which the SQL results are delivered. This would take about 10 minutes to do by hand, so I have created an AWS CloudFormation template that automates these steps for you.

AWS CloudFormation is a service that helps you model and set up your Amazon Web Services resources so that you can spend less time managing those resources and more time focusing on your applications that run in AWS. You provide a template to the service, and the service automatically provisions and sets up the resources for you.

  1. Open the CloudFormation console and choose Create stack.
  2. Choose Specify an Amazon S3 template URL and copy and paste the following link:

https://s3.amazonaws.com/amazon-kinesis-analytics/KA-demo-cf-template-v2.json

  1. Choose Next.
  2. Review the parameters and provide a stack name and Amazon Redshift password. Remember this password as you need it later. Choose Next.
  3. Review the optional parameters and choose Next.
  4. Review the final creation steps and choose Create. The template should take a couple of minutes to complete.

Create the target table in Amazon Redshift

After the stack has been created, you can use a tool like SQL Workbench to create the target table in Amazon Redshift. The target table will look very similar to the output in-application stream you create in your Analytics application. For more information about setting up SQL Workbench with Amazon Redshift, see Connect to Your Cluster by Using SQL Workbench/J.

Next, run the following SQL statement to create the target Amazon Redshift table:

CREATE TABLE stock_ticker_counts (
    process_time	TIMESTAMP,    
    ingest_time	TIMESTAMP,
    ticker_symbol       VARCHAR(4),
    sector	       VARCHAR(16),  
    ticker_symbol_count INTEGER,
    average_price       DECIMAL(10,2),
    average_change      DECIMAL(10,2),    
    maximum_price       DECIMAL(10,2),
    minimum_price       DECIMAL(10,2));

Update the Firehose delivery stream and Analytics application

After the Amazon Kinesis Firehose delivery stream “KADemoAggregatedTickerData” is created, open the Firehose console and edit the Amazon Redshift table column names above.

process_time, ingest_time, ticker_symbol, sector, ticker_symbol_count, average_price, average_change, maximum_price, minimum_price

Next, update the Amazon Kinesis Analytics application with the final SQL code to match the columns of the destination.

I built on the previous blog post for this walkthrough. The starting SQL code is as follows:

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol, sector, ticker_symbol_count, avg_price, avg_change, max_price, min_price
FROM "INTERMEDIATE_SQL_STREAM"
WHERE sector = 'TECHNOLOGY';

Remove the second step in the application code that performs the continuous filter and inserts data into “DESTINATION_SQL_STREAM”.

  • Delete the second CREATE PUMP statement.
  • Delete the second CREATE STREAM statement.
  • Update the first two SQL statements to reference “DESTINATION_SQL_STREAM” instead of “INTERMEDIATE_SQL_STREAM”. Your code should now look like the following:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) AS avg_price,
                AVG(change) AS avg_change,
                MAX(price) AS max_price,
                MIN(price) AS min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Add some additional statements to your query, including an additional window for the ingest time.

  • Add the sector to the SELECT and GROUP BY clause:
  • Add the process and ingest time to the SELECT clause:
ROWTIME AS process_time,
(TIMESTAMP '1970-01-01 00:00:00' + 10 * FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND)) AS ingest_time,

You add this last calculation for ingest_time in the SELECT and GROUP BY clauses in order to add the second window. Similar to the previous explanation, this calculation emits the 10-second intervals of the ingest time.

  • Add a window that uses ingest time to the GROUP BY clause:
FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

The second tumbling window you added uses the stream approximate arrival timestamp, also known as the ingest time. Now that you have added the second window, ingest_time becomes the logical timestamp that is used in the window. The previous window uses ROWTIME (the processing time), which is now simply a trigger to determine when results are emitted.

  • Finally, update the target stream to match your new output.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol	VARCHAR(4),
    sector 		VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));

Your application code should look like the following:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol		VARCHAR(4),
    sector 			VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));


CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM	
        ROWTIME AS process_time,
        (TIMESTAMP '1970-01-01 00:00:00' + 10 * FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND)) AS ingest_time,
        ticker_symbol,
        sector,
		COUNT(*),
		AVG(price),     
		MAX(price),     
		MIN(price),
		AVG(change)
FROM "SOURCE_SQL_STREAM_001"
GROUP BY	ticker_symbol, sector,
		FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 30 TO SECOND),
		FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Choose Save and Run and see the results.

ROWTIME is now just a trigger, and the ingest_time value becomes the time-series key used in the analysis, along with ticker_symbol. This approach gracefully handles rows that arrive late and can be combined for a final accurate result in the destination.

Update window timing

Make one final change before you set up the destination. I find that using short time windows while I am testing and developing code is much easier to work with because I see the results with little delay (in this example, it’s a 10-second window so that you have a 10-second delay). However, I don’t need that level of granularity in my Amazon Redshift table, so I update my windows to be 1 minute. The updated code is shown below.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol		VARCHAR(4),
    sector 			VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));


CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM	
	ROWTIME as process_time,
	FLOOR("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME TO MINUTE) AS ingest_time,
	ticker_symbol,
	sector,
	COUNT(*),
	AVG(price),     
	MAX(price),     
	MIN(price),
	AVG(change)
FROM "SOURCE_SQL_STREAM_001"
GROUP BY	ticker_symbol, sector,
		FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE),
		FLOOR("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME TO MINUTE);

Choose Save and Run and see the results.

Add the destination to the application

In your application code, you write the output of SQL statements to one or more in-application streams. You can optionally add output configuration to your application to persist everything written to an in-application stream to an external destination such as an Amazon Kinesis stream or a Firehose delivery stream. This allows you to complete your use case of sending to a downstream analytics tool, or by reacting to your data in real-time.

  1. Set up the destination by choosing your application name at the top of the page and then choosing Add a destination. You are shown a list of your streams that can be used as a destination. The Firehose delivery stream that you created to deliver data to Amazon Redshift should be listed.
  2. Select the Firehose delivery stream “KADemoAggregatedTickerData” that you just created with CloudFormation.
  3. Choose CSV as output format, select a role, and choose Save.

Verify results

The end-to-end application is now running and emits results to the Amazon Redshift table continuously. Go back to SQL Workbench and see the results arrive. It takes approximately 2 minutes for the results to show up in your Amazon Redshift table. This is because you have a 1-minute tumbling window in your Analytics application code and a 1-minute buffer in Firehose.

In SQL Workbench, execute the following query:

SELECT * FROM stock_ticker_counts;

Combine all the results for the late-arriving rows, to get a final, accurate count.

SELECT 	ingest_time,
	ticker_symbol,
	sector, 
	sum(ticker_symbol_count) AS ticker_symbol_count, 
	sum(ticker_symbol_count * average_price) / SUM(ticker_symbol_count) AS average_price,
	sum(ticker_symbol_count * average_change) / SUM(ticker_symbol_count) AS average_change,
	MAX(maximum_price) AS maximum_price, 
	MIN(minimum_price) AS minimum_price 
FROM stock_ticker_counts
GROUP BY ingest_time, ticker_symbol, sector;

The query shows final and accurate results, and seamlessly handles out-of-order rows because of the added second window. For me, the results are making it to Amazon Redshift less than two minutes after the data was generated.

Clean up

The final step is to clean up. Take the following steps to avoid incurring charges.

  1. Delete the Streams demo stream.
  2. Stop the Analytics application.
  3. Delete the CloudFormation stack for the Amazon Redshift cluster and Firehose delivery stream.

Summary

Previously, real-time stream data processing was only accessible to those with the technical skills to build and manage a complex application. With Amazon Kinesis Analytics, anyone familiar with ANSI-Standard SQL can build and deploy a stream data processing application in minutes.

This end-to-end solution provided a managed and elastic data processing pipeline using Analytics that calculates useful results over streaming data. Results are calculated as they arrive, and Firehose delivers them to a persistent analytics store, Amazon Redshift.

It’s simple to get this solution working for your use case. All that is required is to replace the Amazon Kinesis demo stream with your own, and then set up data producers. From there, configure the analytics and you have an end-to-end solution for capturing, processing, and durably storing streaming data.

If you have questions or suggestions, please comment below.

—————————–

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming