Analyze a Time Series in Real Time with AWS Lambda, Amazon Kinesis and Amazon DynamoDB Streams

This is a guest post by Richard Freeman, Ph.D., a solutions architect and data scientist at JustGiving. JustGiving in their own words: "We are one of the world's largest social platforms for giving that’s helped 26.1 million registered users in 196 countries raise $3.8 billion for over 27,000 good causes.”

Introduction

As more devices, sensors and web servers continuously collect real-time streaming data, there is a growing need to analyze, understand and react to events as they occur, rather than waiting for a report that is generated the next day. For example, your support staff could be immediately notified of sudden peaks in traffic, abnormal events, or suspicious activities, so they can quickly take the appropriate corrective actions to minimize service downtime, data leaks or financial loss.

Traditionally, this would have gone through a data warehouse or a NoSQL database, and the data pipeline code could be custom built or based on third-party software. These models resulted in analysis that had a long propagation delay: the time between a check out occurring and the event being available for analysis would typically be several hours. Using a streaming analytics architecture, we can provide analysis of events typically within one minute or less.

Amazon Kinesis Streams is a service that can continuously capture and store terabytes of data from hundreds or thousands of sources. This might include website clickstreams, financial transactions, social media feeds, application logs, and location-tracking events. A variety of software platforms can be used to build an Amazon Kinesis consumer application, including the Kinesis Client Library (KCL), Apache Spark Streaming, or Elastic MapReduce via Hive.

Using Lambda and DynamoDB gives you a truly serverless architecture, where all the infrastructure including security and scalability is managed by AWS. Lambda supports function creation in Java, Node.js, and Python; at JustGiving, we use Python to give us expressiveness and flexibility in building this type of analysis.

This post explains how to perform time-series analysis on a stream of Amazon Kinesis records, without the need for any servers or clusters, using AWS Lambda, Amazon Kinesis Streams, Amazon DynamoDB and Amazon CloudWatch.  We demonstrate how to do time-series analysis on live web analytics events stored in Amazon Kinesis Streams and present the results in near real-time for use cases like live key performance indicators, ad-hoc analytics, and quality assurance, as used in our AWS-based data science and analytics  RAVEN (Reporting, Analytics, Visualization, Experimental, Networks) platform at JustGiving.

In our architecture, various producers continuously write web events such as page views, clicks or impressions to Amazon Kinesis Streams. A Lambda function is then invoked with these events, and we perform the time-series calculations in memory. We then insert the results into a DynamoDB summary table. Another Lambda function is then invoked as new records are added to the summary table and this function instruments the metrics through Amazon CloudWatch, so that they can be charted in Metrics and Dashboards.

If the time-series data needs to be presented externally, then a chart dashboard using Chart.js or D3.js libraries can be used which reads directly from DynamoDB. In fact, the first Lambda function invoked for the time-series calculation can simultaneously write the time-series data to both DynamoDB and CloudWatch. However, in this post we demonstrate how one Lambda function can connect two streams sequentially, and highlight the differences in records between Amazon Kinesis Streams and DynamoDB Streams with a concrete example.

Set up the Amazon Kinesis event source and the Lambda time-series analysis function

Lambda invokes the function when it detects new data on our input Amazon Kinesis event source. First, create a DynamoDB table to store the time-series data and an IAM policy and role to allow the Lambda function to connect to AWS services.

Creating a target DynamoDB table

Assuming you have Python AWS Boto set up locally, you can create the table with the following:

import boto.dynamodb2
[...]
conn=boto.dynamodb2.connect_to_region('eu-west-1') #replace with desired region
table_name='poc-raven-lambda-event-counters'
print Table.create(table_name
                    , schema=[HashKey('EventName'),
                              RangeKey('DateHour')]
                    , throughput={'read': 10, 'write': 100}
                    , connection=conn
                    )
[...]

Alternatively you can use the AWS Management Console.

Create a DynamoDB table

  1. Open the DynamoDB console at https://console.aws.amazon.com/dynamodb/.
  2. On the dashboard, choose Create Table and configure the following:
    1. For Table name, enter poc-raven-lambda-event-counters.
    2. For Primary key (Partition key), enter EventName and choose String.
    3. Select Add sort key.
      1. For Sort key name (Range), enter EventHour and choose String.
    4. Under Table settings, clear Use default settings.
    5. Choose Create.
  3. Under tables, select the table poc-raven-lambda-event-counters.
    1. Select Capacity
    2. Under Provisioned Capacity:
      1. For Read capacity units, enter 10.
      2. For Write capacity units, enter 100.
    3. Choose Save

After the table has been created, you can enable an update stream by choosing Tables, Overview, and Manage Stream. DynamoDB update streams can include new, old, or both types of data when an item is created or modified. For this case, choose New and old images.

Setting up the IAM policy and role

Now, create an IAM policy and the role that references that policy, to allow your Lambda function to connect to AWS services.

Create an IAM policy

  1. Open the IAM console at https://console.aws.amazon.com/iam/.
  2. On the navigation bar, choose Policies, Create Policy.
  3. Choose Create Your Own Policy and configure the following:
    1. For Policy Name, enter poc-raven-lambda-time-series-analysis.
    2. For Description, enter Read/write access to DynamoDB, CloudWatch, and Lambda execution rights.
    3. For Policy Document, insert the following JSON: 
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetRecords",
                "kinesis:GetShardIterator",
                "kinesis:DescribeStream",
                "kinesis:ListStreams",
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        },
		{
            "Sid": "Stmt1433175691000",
            "Effect": "Allow",
            "Action": [
                "dynamodb:BatchGetItem",
                "dynamodb:BatchWriteItem",
                "dynamodb:DescribeStream",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams",
                "dynamodb:ListTables",
                "dynamodb:PutItem",
                "dynamodb:Query",
                "dynamodb:Scan",
                "dynamodb:UpdateItem",
                "dynamodb:UpdateTable"
            ],
            "Resource": [
                "arn:aws:dynamodb:eu-west-1:<999999999999>:table/poc-raven-lambda-event-counters"
            ]
        }
    ]
}

The action array list specifies the allowable actions on the specified resource. Here, it is the DynamoDB table created earlier. Replace <999999999999> with your AWS account ID.

Next, create an IAM role which uses this policy, so that Lambda can assume an identity with the required privileges of reading from Amazon Kinesis Streams, writing to DynamoDB, and writing to CloudWatch.

Create an IAM role

  1. Open the IAM console at https://console.aws.amazon.com/iam/.
  2. On the navigation pane, choose Roles, Create New Role.
  3. For Set Role Name, enter lambda_kinesis_streams and choose Next step.
  4. For Role Type, choose AWS Service Roles, AWS Lambda.
  5. Select the policy created earlier, poc-raven-lambda-time-series-analysis and choose Next step.
  6. Choose Review, Create Role.

Create the Lambda function

  1. Open the Lambda console at https://console.aws.amazon.com/lambda/.
  2. Choose Create a Lambda function and the kinesis-process-record-python blueprint.
  3. Configure the event sources:
    1. For Event Source type, choose Kinesis.
    2. For Kinesis Stream, enter poc-raven-webanalytics (or select your Amazon Kinesis stream).
    3. For Batch Size, enter 300 (this depends on the frequency that events are added to Amazon Kinesis).
    4. For Starting position, choose Trim horizon.
  4. Configure the function:
    1. For Name, enter PocLambdaEventCounter.
    2. For Runtime, choose Python 2.7.
    3. For Edit code inline, add the Lambda function source code (see the next section).
    4. For Handler, choose lambda_function.lambda_handler.
    5. For Role, select the role you created earlier, e.g., lambda_kinesis_streams. This grants the Lambda function access to Amazon Kinesis Streams, DynamoDB, and CloudWatch metrics and logs.
    6. For Memory (MB), choose 128.
    7. For Timeout, enter 5 min 0 sec. 

A common streaming analytics use case is to analyze individual events in a time period and record a running count of the number of events by type. You can then visualize trends to find interesting patterns, or notify someone of anomalous issues. Here we explain how to do all three.

DynamoDB: persisting an hourly count on the events

To begin, create running counters for each event and date hour combination or tuple. For example, you roll up the count of “page view” events that happened on the 2015-10-31 between 12:00:00PM and 12:59:59PM as “2015-10-31T12:00:00”. This results in counters for each hour that can easily be analyzed and visualized.

This Python Lambda code snippet does a running count of the number of distinct events. There are two functions: lambda_handler()and update_dynamo_event_counter(). lambda_handler() is first invoked and iterates over all the events and counts the event_type and hour_event_time tuples using hour_event_counter which is of type defaultdict collection. This is an efficient Python data structure for counting distinct occurrences in a collection.

After all the records have been processed, update_dynamo_event_counter() is then invoked for every (event_type, hour_event_time) tuple. This function performs a non-locking update of the running count in DynamoDB directly for that specific tuple as an atomic action using an UpdateExpression. Batching up the writes this way incurs lower latency and gives higher throughput than if the counter was incremented one by one from the client.

import boto3
import json
from datetime import datetime
from collections import defaultdict

def update_dynamo_event_counter(tableName, event_name, event_datetime, event_count=1, dynamodb = boto3.resource(service_name='dynamodb', region_name='eu-west-1')):
        table = dynamodb.Table(tableName)
        response = table.update_item(
        Key={
            'EventName': event_name, 
            'EventHour': event_datetime
        },
        ExpressionAttributeValues={":value":event_count},
        UpdateExpression="ADD EventCount :value")

def lambda_handler(event, context):
    hour_event_counter = defaultdict(int)    
    for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        payload_json = json.loads(payload)
        try: 
            event_type=event_type=str(payload_json['event'])
        except Exception as e:            
            print('Error no event type detected')
            event_type='NULL'
        try: 
            hour_event_time=str(payload_json['utc_timestamp'].split(':', 1)[0]+':00:00')
        except Exception as e:            
            print('Error no event time detected')
            hour_event_time='NULL'
    hour_event_counter[(event_type, hour_event_time)] += 1
    for key,val in hour_event_counter.iteritems():
        print "%s, %s = %s" % (str(key[0]), str(key[1]), str(val))
        update_dynamo_event_counter('poc-raven-lambda-event-counters', key[0], key[1], int(val))   
    return 'Successfully processed {} records.'.format(len(event['Records']))

For testing, you can log events from your Amazon Kinesis stream by printing them as JSON to standard out; Lambda automatically delivers this information into CloudWatch logs. This information can then be used as the test event in the Lambda console.

This pattern of in-memory calculation, and DynamoDB atomic counters form a basic building block for time-series calculations. More complex analytic variations are possible, such as looking for co-occurrences, applying filters, trending, or windowing.

As a disclaimer, be aware that atomic increments are not idempotent, and Lambda throttling and error conditions could occur in exceptional cases (these should be monitored in CloudWatch), which could mean that the counters could be slightly inaccurate. To reduce the possibility of this occurring, you should ensure that the concurrent Lambda executions limit for your account is greater than the Kinesis shard count, and the DynamoDB counter table has a sufficient write capacity.

In addition, to reduce to risk of hot partitions, we recommend that this table is not used as a source of truth for all history but only for data in the past few months; anything older than that could be deleted or a new table could be used. However, both risks are minimized as there will not be as many concurrent writes to DynamoDB for the following reasons:

  • The number of concurrent Lambda functions is limited by the number of shard iterators in Amazon Kinesis Streams and DynamoDB Streams.
  • The count is accumulated first in memory before the write.
  • This table is storing summary aggregate data.
  • The data sizes and number of rows are small.

We have not had any issues with this pattern on our live website traffic, and results were always correct when compared to Amazon Redshift.

Here is an example table that can easily be used for visualization or pivoted in Excel for charting.

Depending on your use case, the code can easily be modified for running counts in buckets of 15 or even 1 minute intervals. We analyze our data in 1 minute buckets, but please be aware that more granular time buckets results in more data written to DynamoDB. For one minute time resolution, you can change the above code with the following:

minute_event_counter = defaultdict(int)
[...]
minute_event_time=str(payload_json['utc_timestamp'].rsplit(':', 1)[0] + ':00')
[...]
minute_event_counter[(event_type, minute_event_time)] += 1

Near real-time data visualization

Traditionally, in order to collect the metrics and display them in near real-time, you would implement your own Amazon Kinesis consumer and dashboard, for example as done in the Amazon Kinesis Data Visualization Sample Application.

To continue with the principle of serverless architectures in this post, you can make use of AWS CloudWatch directly. You can create another Lambda function to push data to CloudWatch, but in this case, it is invoked by the updates made to your DynamoDB counter summary table. This data can then be easily visualized through the CloudWatch console. In another article, we have described how you could draw near real-time charts and tables, using the counters data stored earlier in DynamoDB. It is a serverless solution hosted on a static site in S3 using charts.js for drawing charts and AWS Cognito for authentication; however, it requires more development effort than the CloudWatch solution described next.

Creating the Lambda function

The same IAM policy poc-raven-lambda-time-series-analysis that you created earlier can be reused, or it can be edited for additional security (the Lambda function does not need write access to the poc-raven-lambda-event-counters DynamoDB table or read access to Amazon Kinesis Streams).

For simplicity, assume that you have not changed the policy and reuse the same role created earlier, called lambda_kinesis_streams.

  1. Open the Lambda console at https://console.aws.amazon.com/lambda/.
  2. Choose Create a Lambda Function and the dynamodb-process-stream-python blueprint.
  3. Configure the event sources:
    1. For Event Source type, choose DynamoDB.
    2. For DynamoDB Table, enter poc-raven-lambda-event-counters (or select your DynamoDB table).
    3. For Batch Size, enter 300 (this depends on the frequency that events are added to Amazon Kinesis and DynamoDB).
    4. For Starting position, choose Trim horizon.
  4. Configure the function:
    1. For Name, enter PocLambdaCloudWatch-Hour.
    2. For Runtime, choose Python 2.7.
    3. For Edit code inline, add the Lambda function source code (see the next section).
    4. For Handler, choose lambda_function.lambda_handler.
    5. For Role, choose the role you created earlier, lambda_kinesis_streams.
    6. For Memory (MB), choose 128.
    7. For Timeout, enter 5 min 0 sec.
import json
import boto3
from datetime import datetime
from collections import defaultdict

def put_cloudwatch_metric(event_name, event_datetime, event_count=1, cwc=boto3.client('cloudwatch', region_name='eu-west-1')):
    metricData=[{
            'MetricName': event_name,
            'Timestamp': datetime.strptime(event_datetime, '%Y-%m-%dT%H:%M:%S'),
            'Value': event_count,            
            'Unit': 'Count'
        },]
    response = cwc.put_metric_data(Namespace="PocEventCounterHour",MetricData=metricData)   

def lambda_handler(event, context):
    hour_event_counter = defaultdict(int)
    for record in event['Records']:
        try: event_type = record['dynamodb']['NewImage']['EventName']['S']
        except Exception as e:            
            event_type='NULL'
        try: hour_event_time = record['dynamodb']['NewImage']['DateHour']['S']
        except Exception as e:            
            hour_event_time='NULL'
        try: event_count_old=int(record['dynamodb']['OldImage']['EventCount']['N'])
        except Exception as e:            
            event_count_old=0
        try: event_count_new=int(record['dynamodb']['NewImage']['EventCount']['N'])
        except Exception as e:            
            event_count_new=0  
      
        if event_type!='NULL' and hour_event_time!='NULL':
            if(event_count_new > event_count_old):
                hour_event_counter[(event_type, hour_event_time)] += event_count_new - event_count_old 

    for key,val in hour_event_counter.iteritems():
        print "%s, %s = %d" % (key[0], key[1], val)
        put_cloudwatch_metric(key[0], key[1], int(val))
    return 'Successfully processed {} records.'.format(len(event['Records']))

The sample code iterates over all DynamoDB Streams records, extracting the event_type, hour_event_time, event_count_old, and event_count_new values that you persisted with the previous Lambda function. Subtract the old value from the new to get the incremental change in the Item.

For example, take the record EventName="page view" at DateHour="2015-10-31T12:00:00" has EventCount="5". You counted 4 more events at the same DateHour so EventCount is now 5 + 4 = 9. In this case, the old value is 5 and new value is 9, so in order to have an accurate count in CloudWatch metric, you need to put the metric EventCountnew – EventCountold i.e., 9 – 5 = 4.

As with the previous function, the period counts are batched up in the defaultdict collection and then the function put_cloudwatch_metric() writes the metrics data to CloudWatch. This is now a core code base for streaming running counters with visualization, and provides the foundations for more complex transformation such as enrichment, projections, or event filters.

To access the chart:

  1. Open the CloudWatch console at https://console.aws.amazon.com/cloudwatch/.
  2. Choose Custom Metrics and PocEventCounterHour.
  3. Check any metrics you are interested in charting.

CloudWatch dashboards provide you with the flexibility to change the time range and select which metric to display, as well as the scale and time period. There is no dashboard development effort required, and it only takes two lines of Python code to put the metrics. In addition, CloudWatch also supports dimensions for additional filtering, and there are various ways to export the data to S3 or Amazon Kinesis Streams.

Here is an example of a CloudWatch metrics chart at a 1 minute resolution:

This chart can then be included in a dashboard which includes different views of data stored in CloudWatch.

To create a dashboard and add a graph:

  1. Open the CloudWatch console at https://console.aws.amazon.com/cloudwatch/.
  2. On the navigation pane, choose Dashboards, Create Dashboard.
  3. Enter a name for the dashboard and choose Create Dashboard.
  4. Choose Metrics Graph, Configure.
  5. Select your custom metric from CloudWatch Metrics by Category.
  6. Select the metrics to draw in the dashboard chart, e.g., page view.
  7. Choose Create widget. You see a preview of the chart.

You can now resize the widget or add new widgets to the dashboard. There is also a useful auto refresh option.

Using CloudWatch alarms on the time series data

The data is now stored in CloudWatch, and you can add an alarm that is triggered if some threshold is breached. This creates an Amazon SNS notification, which results in an email being sent or an action being taken. For example, if the number of page views drop below a certain level, which might indicate an issue with the website, an email is automatically sent to the support team.

Create a new SNS topic to receive alerts:

  1. Open the Amazon SNS console at https://console.aws.amazon.com/sns.
  2. Under Common Actions, choose Create Topic.
  3. For both Topic name and Display name, enter the name of the topic (e.g., pageview-low) and choose Create topic.
  4. On the Topics detail page, choose Create Subscription, and verify the Topic ARN value.
    1. For Protocol, choose Email.
    2. For Endpoint, enter the desired email address.
  5. Choose Create Subscription.
  6. You will receive a verification email.

Create a CloudWatch alarm:

  1. Open the CloudWatch console at https://console.aws.amazon.com/cloudwatch/.
  2. Choose Alarms, Create Alarm.
  3. For Custom Metric, enter the custom metric name (e.g., PocLambdaPageViewCounter), select the metrics for which to create an alarm, e.g. “page view”, and choose Next.
  4. On the Define Alarm page, enter the following values:
    1. For Name, enter pageview-low.
    2. Enter the following values: Whenever page view is >= 5 for 1 consecutive period(s) (to test that the alert works). The little graph shows you a red line indicating the threshold.
    3. For Action, choose State is ALARM.
    4. For Send notification to, choose pageview-low.
  5. Choose Create Alarm.

If you look at Alarms in CloudWatch, you see that pageview-low is in red and you should receive an email. You can now raise the alarm threshold level to the minimum expected number of events per minute. The alarm also has a history tab that gives chronological details of past events.

Building applications with a serverless processing architecture

Using Lambda for time-series analysis is suitable for many uses cases, but you must consider the following:

  • Joining different streams and chain Lambdas functions together can be complex. If this is a requirement, you can consider the use of Spark Streaming on Amazon EMR, or frameworks such as Apache Storm.
  • AWS Lambda functions can execute for 5 minutes. If you are unable to process the data pushed to your function in this time, then consider reducing the batch size on the Amazon Kinesis Streams or DynamodB Streams event sources.
  • CloudWatch metrics only store data for two weeks. For longer term retention or when historical charting is required, you could use a custom dashboard which reads data from your DynamoDB table. Alternatively, you can export data to S3 and import it into Amazon Redshift for visualization with a custom solution or third-party tools.

There are many benefits of using a Lambda function for time-series analysis, including:

  • Flexibility – Analytics code can be updated and deployed very quickly, and visualized in near real-time.
  • High availability – Your Lambda function runs in multiple Availability Zones in a region.
  • Python libraries – Ability to include and execute any Python libraries.
  • Zero-maintenance – All services are supported by AWS so running instances are maintained and upgraded automatically.
  • Security – No use of keys or passwords, and IAM roles can be used to integrate with Amazon Kinesis Streams, DynamoDB, and CloudWatch.
  • Low cost – You only pay for the execution time of the Lambda function.
  • Ease of use – Functions are very simple; for example, this post iterates over and parses JSON records and writes the metrics to DynamoDB or CloudWatch.
  • Automatic scaling – The number of concurrent Lambda function invocations scales based on the number of shard iterators in Amazon Kinesis Streams or DynamoDB Streams.
  • Simple extensions to other transformation – Extensions can include event enrichment, event routing, fault notification, sliding windows, correlations, filtering, or trending.

A serverless architecture can reduce cost, as there is no cluster running if the stream is not processing events. When fewer records are being added to Amazon Kinesis Streams or DynamoDB Streams, then a reduced number of Lambda functions are invoked.

This architecture is also very simple to build, maintain, and understand, as it deals with operating on batches of JSON records, rather than having to think about the distribution of data across nodes. A developer, data scientist, or analyst can now write scalable streaming analytics, and create near real-time dashboards, without thinking about operating systems, instances, and associated operational tasks.

Using DynamoDB Streams, Amazon Kinesis Streams, and Lambda together for ad-hoc analytics also means that the processing can begin much faster, as there is no cluster to launch. This allows a quick turnaround on tasks like tracking new events of interest, doing ad-hoc analytics, quality assurance, and detecting issues in near real-time.

Summary

This pattern for time series analysis on Amazon Kinesis Streams and DynamoDB Streams, including the persistence of running counters, output visualization and notification in a few lines of Python code have enabled JustGiving to improve time-to-market for new functionality. We will continue to use these building blocks for more complex transformations such as trending, co-occurrence, filtering and event enrichment. As next steps, consider checking out the following resources:

 If you have questions or suggestions, please leave a comment below.

----------------------------

Related

How Expedia Implemented Near Real-time Analysis of Interdependent Datasets

Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

Comments