AWS Big Data Blog

Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics

Chris Marshall is a Solutions Architect for Amazon Web Services

Analyzing web log traffic to gain insights that drive business decisions has historically been performed using batch processing.  While effective, this approach results in delayed responses to emerging trends and user activities.  There are solutions to deal with processing data in real time using streaming and micro-batching technologies, but they can be complex to set up and maintain.  Amazon Kinesis Analytics is a managed service that makes it very easy to identify and respond to changes in behavior in real-time.

One use case where it’s valuable to have immediate insights is analyzing clickstream data.   In the world of digital advertising, an impression is when an ad is displayed in a browser and a clickthrough represents a user clicking on that ad.  A clickthrough rate (CTR) is one way to monitor the ad’s effectiveness.  CTR is calculated in the form of: CTR = Clicks / Impressions * 100.  Digital marketers are interested in monitoring CTR to know when particular ads perform better than normal, giving them a chance to optimize placements within the ad campaign.  They may also be interested in anomalous low-end CTR that could be a result of a bad image or bidding model.

In this post, I show an analytics pipeline which detects anomalies in real time for a web traffic stream, using the RANDOM_CUT_FOREST function available in Amazon Kinesis Analytics.

RANDOM_CUT_FOREST 

Amazon Kinesis Analytics includes a powerful set of analytics functions to analyze streams of data.  One such function is RANDOM_CUT_FOREST.  This function detects anomalies by scoring data flowing through a dynamic data stream. This novel approach identifies a normal pattern in streaming data, then compares new data points in reference to it. For more information, see Robust Random Cut Forest Based Anomaly Detection On Streams.

Analytics pipeline components

To demonstrate how the RANDOM_CUT_FOREST function can be used to detect anomalies in real-time click through rates, I will walk you through how to build an analytics pipeline and generate web traffic using a simple Python script.  When your injected anomaly is detected, you get an email or SMS message to alert you to the event.

This post first walks through the components of the analytics pipeline, then discusses how to build and test it in your own AWS account.  The accompanying AWS CloudFormation script builds the Amazon API Gateway API, Amazon Kinesis streams, AWS Lambda function, Amazon SNS components, and IAM roles required for this walkthrough. Then you manually create the Amazon Kinesis Analytics application that uses the RANDOM_CUT_FOREST function in SQL.

Web Client

Because Python is a popular scripting language available on many clients, we’ll use it to generate web impressions and click data by making HTTP GET requests with the requests library.  This script mimics beacon traffic generated by web browsers.  The GET request contains a query string value of click or impression to indicate the type of beacon being captured.  The script has a while loop that iterates 2500 times.  For each pass through the loop, an Impression beacon is sent.  Then, the random function is used to potentially send an additional click beacon.  For most of the passes through the loop, the click request is generated at the rate of roughly 10% of the time.  However, for some web requests, the CTR jumps to a rate of 50% representing an anomaly.  These are artificially high CTR rates used for the purpose of illustration for this post, but the functionality would be the same using small fractional values

import requests
import random
import sys
import argparse

def getClicked(rate):
	if random.random() <= rate:
		return True
	else:
		return False

def httpGetImpression():
	url = args.target + '?browseraction=Impression' 
	r = requests.get(url)

def httpGetClick():
	url = args.target + '?browseraction=Click' 
	r = requests.get(url)
	sys.stdout.write('+')

parser = argparse.ArgumentParser()
parser.add_argument("target",help=" the http(s) location to send the GET request")
args = parser.parse_args()
i = 0
while (i < 2500):
	httpGetImpression()
	if(i<1950 or i>=2000):
		clicked = getClicked(.1)
		sys.stdout.write('_')
	else:
		clicked = getClicked(.5)
		sys.stdout.write('-')
	if(clicked):
		httpGetClick()
	i = i + 1
	sys.stdout.flush()

Web “front door”

Client web requests are handled by Amazon API Gateway, a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. This handles the web request traffic to get data into your analytics pipeline by being a service proxy to an Amazon Kinesis stream.  API Gateway takes click and impression web traffic and converts the HTTP header and query string data into a JSON message and place it into your stream.  The CloudFormation script creates an API endpoint with a body mapping template similar to the following:

#set($inputRoot = $input.path('$'))
{
  "Data": "$util.base64Encode("{ ""browseraction"" : ""$input.params('browseraction')"", ""site"" : ""$input.params('Host')"" }")",
  "PartitionKey" : "$input.params('Host')",
  "StreamName" : "CSEKinesisBeaconInputStream"
}

(API Gateway service proxy body mapping template)

This tells API Gateway to take the host header and query string inputs and convert them into a payload to be put into a stream using a service proxy for Amazon Kinesis Streams.

Input stream

Amazon Kinesis Streams can capture terabytes of data per hour from thousands of sources.  In this case, you use it to deliver your JSON messages to Amazon Kinesis Analytics.

Analytics application

Amazon Kinesis Analytics processes the incoming messages and allow you to perform analytics on the dynamic stream of data.  You use SQL to calculate the CTR and pass that into your RANDOM_CUT_FOREST function to calculate an anomaly score.

First, calculate the counts of impressions and clicks using SQL with a tumbling window in Amazon Kinesis Analytics.  Analytics uses streams and pumps in SQL to process data.  See our earlier post to get an overview of streaming data with Kinesis Analytics. Inside the Analytics SQL, a stream is analogous to a table and a pump is the flow of data into those tables.  Here’s the description of streams for the impression and click data.

CREATE OR REPLACE STREAM "CLICKSTREAM" ( 
   "CLICKCOUNT" DOUBLE
);


CREATE OR REPLACE STREAM "IMPRESSIONSTREAM" ( 
   "IMPRESSIONCOUNT" DOUBLE
);

Later, you calculate the clicks divided by impressions; you are using a DOUBLE data type to contain the count values.  If you left them as integers, the division below would yield only a 0 or 1.  Next, you define the pumps that populate the stream using a tumbling window, a window of time during which all records received are considered as part of the statement.

CREATE OR REPLACE PUMP "CLICKPUMP" AS 
INSERT INTO "CLICKSTREAM" ("CLICKCOUNT") 
SELECT STREAM COUNT(*) 
FROM "SOURCE_SQL_STREAM_001"
WHERE "browseraction" = 'Click'
GROUP BY FLOOR(
  ("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00')
    SECOND / 10 TO SECOND
);
CREATE OR REPLACE PUMP "IMPRESSIONPUMP" AS 
INSERT INTO "IMPRESSIONSTREAM" ("IMPRESSIONCOUNT") 
SELECT STREAM COUNT(*) 
FROM "SOURCE_SQL_STREAM_001"
WHERE "browseraction" = 'Impression'
GROUP BY FLOOR(
  ("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00')
    SECOND / 10 TO SECOND
);

The GROUP BY statement uses FLOOR and ROWTIME to create a tumbling window of 10 seconds.  This tumbling window will output one record every ten seconds assuming we are receiving data from the corresponding pump.  In this case, you output the total number of records that were clicks or impressions.

Next, use the output of these pumps to calculate the CTR:

CREATE OR REPLACE STREAM "CTRSTREAM" (
  "CTR" DOUBLE
);

CREATE OR REPLACE PUMP "CTRPUMP" AS 
INSERT INTO "CTRSTREAM" ("CTR")
SELECT STREAM "CLICKCOUNT" / "IMPRESSIONCOUNT" * 100 as "CTR"
FROM "IMPRESSIONSTREAM",
  "CLICKSTREAM"
WHERE "IMPRESSIONSTREAM".ROWTIME = "CLICKSTREAM".ROWTIME;

Finally, these CTR values are used in RANDOM_CUT_FOREST to detect anomalous values.  The DESTINATION_SQL_STREAM is the output stream of data from the Amazon Kinesis Analytics application.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "CTRPERCENT" DOUBLE,
    "ANOMALY_SCORE" DOUBLE
);

CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM * FROM
TABLE (RANDOM_CUT_FOREST( 
             CURSOR(SELECT STREAM "CTR" FROM "CTRSTREAM"), --inputStream
             100, --numberOfTrees (default)
             12, --subSampleSize 
             100000, --timeDecay (default)
             1) --shingleSize (default)
)
WHERE ANOMALY_SCORE > 2; 

The RANDOM_CUT_FOREST function greatly simplifies the programming required for anomaly detection.  However, understanding your data domain is paramount when performing data analytics.  The RANDOM_CUT_FOREST function is a tool for data scientists, not a replacement for them.  Knowing whether your data is logarithmic, circadian rhythmic, linear, etc. will provide the insights necessary to select the right parameters for RANDOM_CUT_FOREST.  For more information about parameters, see the RANDOM_CUT_FOREST Function.

Fortunately, the default values work in a wide variety of cases. In this case, use the default values for all but the subSampleSize parameter.  Typically, you would use a larger sample size to increase the pool of random samples used to calculate the anomaly score; for this post, use 12 samples so as to start evaluating the anomaly scores sooner.

Your SQL query outputs one record every ten seconds from the tumbling window so you’ll have enough evaluation values after two minutes to start calculating the anomaly score.  You are also using a cutoff value where records are only output to “DESTINATION_SQL_STREAM” if the anomaly score from the function is greater than 2 using the WHERE clause. To help visualize the cutoff point, here are the data points from a few runs through the pipeline using the sample Python script:

As you can see, the majority of the CTR data points are grouped around a 10% rate.  As the values move away from the 10% rate, their anomaly scores go up because they are more infrequent.  In the graph, an anomaly score above the cutoff is shaded orange. A cutoff value of 2 works well for this data set.

Output stream

The Amazon Kinesis Analytics application outputs the values from “DESTINATION_SQL_STREAM” to an Amazon Kinesis stream when the record has an anomaly score greater than 2.

Anomaly execution function

The output stream is an input event for an AWS Lambda function with a batch size of 1, so the function gets called one time for each message put in the output stream.  This function represents a place where you could react to anomalous events.  In a real world scenario, you may want to instead update a real-time bidding system to react to current events.  In this example, you send a message to SNS to see a tangible response to the changing CTR.  The CloudFormation template creates a Lambda function similar to the following:

var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "<region>" });
	exports.handler = function(event, context) {
	//our batch size is 1 record so loop expected to process only once
	event.Records.forEach(function(record) {
		// Kinesis data is base64 encoded so decode here
		var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
		var rec = payload.split(',');
		var ctr = rec[0];
		var anomaly_score = rec[1];
		var detail = 'Anomaly detected with a click through rate of ' + ctr + '% and an anomaly score of ' + anomaly_score;
		var subject = 'Anomaly Detected';
		var params = {
			Message: detail,
			MessageStructure: 'String',
			Subject: subject,
			TopicArn: 'arn:aws:sns:us-east-1::ClickStreamEvent'
		};
		sns.publish(params, function(err, data) {
			if (err) context.fail(err.stack);
			else     context.succeed('Published Notification');
		});
	});
}; 

Notification system

Amazon SNS is a fully managed, highly scalable messaging platform.  For this walkthrough, you send messages to SNS with subscribers that send email and text messages to a mobile phone.

Analytics pipeline walkthrough

Sometimes it’s best to build out a solution so you can see all the parts working and get a good sense of how it works.   Here are the steps to build out the entire pipeline as described above in your own account and perform real-time clickstream analysis yourself.  Following this walkthrough creates resources in your account that you can use to test the example pipeline.

Prerequisites

To complete the walkthrough, you’ll need an AWS account and a place to execute Python scripts.

Configure the CloudFormation template

Download the CloudFormation template named CSE-cfn.json from the AWS Big Data blog Github repository. In the CloudFormation console, set your region to N. Virginia, Oregon, or Ireland and then browse to that file.

When your script executes and an anomaly is detected, SNS sends a notification.  To see these in an email or text message, enter your email address and mobile phone number in the Parameters section and choose Next.

This CloudFormation script creates policies and roles necessary to process the incoming messages.  Acknowledge this by selecting the field or you will get a warning about IAM_CAPABILITY.

If you entered a valid email or SMS number, you will receive a validation message when the SNS subscriptions are created by CloudFormation.  If you don’t wish to receive email or texts from your pipeline, you don’t need to complete the verification process.

Run the Python script

Copy ClickImpressionGenerator.py to a local folder.

This script uses the requests library to make HTTP requests.  If you don’t have that Python package globally installed, you can install it locally in by running the following command in the same folder as the Python script:

pip install requests –t .

When the CloudFormation stack is complete, choose Outputs, ExecutionCommand, and run the command from the folder with the Python script.

This will start generating web traffic and send it to the API Gateway in the front of your analytics pipeline.  It is important to have data flowing into your Amazon Kinesis Analytics application for the next section so that a schema can be generated based on the incoming data.  If the script completes before you finish with the next section, simply restart the script with the same command.

Create the Analytics pipeline application

At the end of this section we will show you how to deploy the Kinesis Analytics with another CloudFormation script, but first we will walk through creating it manually in the console. First, open the Amazon Kinesis console and choose Go to Analytics.

Choose Create new application to create the Kinesis Analytics application, enter an application name and description, and then choose Save and continue.

Choose Connect to a source to see a list of streams.  Select the stream that starts with the name of your CloudFormation stack and contains CSEKinesisBeaconInputStream in the form of: <stack name>- CSEKinesisBeaconInputStream-<random string>.  This is the stream that your click and impression requests will be sent to from API Gateway.

Scroll down and choose Choose an IAM role.  Select the role with the name in the form of <stack name>-CSEKinesisAnalyticsRole-<random string>.  If your ClickImpressionGenerator.py script is still running and generating data, you should see a stream sample.  Scroll to the bottom and choose Save and continue.

Next, add the SQL for your real-time analytics.  Choose the Go to SQL editor, choose Yes, start application, and paste the SQL contents from CSE-SQL.sql into the editor. Choose Save and run SQL.  After a minute or so, you should see data showing up every ten seconds in the CLICKSTREAM.  Choose Exit when you are done editing.

Choose Connect to a destination and select the <stack name>-CSEKinesisBeaconOutputStream-<random string> from the list of streams.  Change the output format to CSV.

Choose Choose an IAM role and select <stack name>-CSEKinesisAnalyticsRole-<random string> again.  Choose Save and continue.  Now your analytics pipeline is complete.  When your script gets to the injected anomaly section, you should get a text message or email to notify you of the anomaly.  To clean up the resources created here, delete the stack in CloudFormation, stop the Kinesis Analytics application then delete it.

Alternatively, you can create the Kinesis Analytics application by using the CSE-KA-cfn.json template in CloudFormation.  Instead of manually creating the application in the console, create a CloudFormation stack using this template.  Give the new stack a name and use the name of the first stack you created as the only parameter needed for this stack.

Now your analytics pipeline is complete.  When your script gets to the injected anomaly section, you should get a text message or email to notify you of the anomaly.  To clean up the resources created here, delete the stack in CloudFormation, stop the Kinesis Analytics application then delete it.

Summary

A pipeline like this can be used for many use cases where anomaly detection is valuable. What solutions have you enabled with this architecture? If you have questions or suggestions, please comment below.

 


Related

Process Amazon Kinesis Aggregated Data with AWS Lambda