AWS Big Data Blog

Building Scalable and Responsive Big Data Interfaces with AWS Lambda

This is a guest post by Martin Holste, a co-founder of the Threat Analytics Platform at FireEye where he is a senior researcher specializing in prototypes.

Overview

At FireEye, Inc., we process billions of security events every day with our Threat Analytics Platform, running on AWS. In building our platform, one of the problems we had to solve was how to be efficient and responsive with user-driven event analysis at this scale. Our analysis falls into three basic categories: threat intelligence matching, anomaly detection, and user-driven queries. We relentlessly search for ways to improve our efficiency and responsiveness, and AWS Lambda is a solution that has shown significant value in fulfilling these goals by providing a simple platform for scaling user-driven workloads.

In April of 2015, AWS officially released AWS Lambda, which allows you to execute code on demand without any dedicated infrastructure. This new capability takes container-oriented thinking (e.g. Docker) to the next level by stripping out everything in the architecture except the code itself. The code is referred to as a lambda function, whose name is derived from the lambda operator because it is stateless and invoked for short periods of time.

There are two ways to use AWS Lambda. You can configure a Lambda function to be triggered by events, such as new files being uploaded to Amazon Simple Storage Service (S3), or you can invoke a Lambda function on demand. Reactive data processing architectures using the event-driven configuration are compelling; however, this post will detail a lesser-known but increasingly valuable architecture for harnessing Lambda functions for large-scale, UI-driven data processing through direct Lambda invocation.

Building a Simple Application

In this post, I’ll show you how to build an application that counts the words in files stored in S3. It streams the current totals back to the user interface in real time to demonstrate how a Lambda function can provide immediate results to users. The architecture uses Lambda functions to do all of the data processing.

The goals of this architecture are to:

  • Scale without configuration or infrastructure tuning.
  • Provide a responsive user interface and provide fluid output.
  • Use the simplest code and design possible.

To achieve these goals, I’ll walk through creating Lambda functions to receive commands from a Node.js web server, execute them in parallel in a map-reduce style flow, and stream the results back to the browser to update the user interface in real time.

Architecture

In this architecture, instead of a master-slave or job server model, AWS Lambda provides an elegant and robust mechanism for running map-reduce or scatter-gather style workflows. Node.js has native ways to connect streams or processes in a pipeline, and using that capability, a single lightweight Node.js process can orchestrate arbitrarily large workloads. The work is broken down into smaller, parallel operations on small chunks with Lambda functions doing the heavy lifting, as illustrated below.

Work is broken down into parallel operations with Lambda doing heavy lifting,

The workflow starts with a single web request to the Node.js server process. It responds with the URL for the HTML5 Server-Sent Event Source mechanism that provides a simple, efficient means of updating the client as results become available. This update stream provides progress information, logging, as well as the results themselves. Node.js makes this easy by treating the web request’s response object as a native stream at the end of the processing pipeline.

The processing pipeline itself consists of the following components:

  • S3 key lister
  • AWS Lambda invoker/result aggregator
  • Web client response handle

The “S3 key lister” function finds relevant objects to process by listing the S3 bucket and matching the key name with a given key prefix. A list of objects with matching key names is passed through the pipeline to the Lambda function invoker. The Lambda function invoker aggregates the keys into large batches and invokes a cascading Lambda for each batch with the batch as the parameter. The cascading Lambda function maps the work into smaller batches that are used as parameters for child Lambda functions. These functions process the S3 data and return the reduced results to the cascading invoker. The cascading invoker pipes them back to the original Node.js invoker that keeps track of the results state and creates a continually updated response to stream to the web client.

Now that we know what the code will do, let’s start putting the pieces together one by one.

Getting Started

I’ll assume that you already have an AWS account. The first step to creating the necessary components is to install Node.js and to install the AWS CLI.  On Ubuntu Linux, this would consist of sudo apt-get install nodejs && sudo pip install awscli to install both.

Getting started with AWS Lambda is easy and is detailed in the AWS documentation. I will assume you have read that walkthrough, have configured an AWS account with permissions to create Lambda functions, and have allowed those Lambda functions access to an S3 bucket containing data to process. You will need to create a role for the Lambda functions to assume. It will look something like arn:aws:iam::<account ID>:role/<name of the role you create>.

The complete code used for this post, along with instructions on how to install and setup the application, is available on the AWS Big Data Blog Github Repository.

This application has three different logical steps:

  1. Node.js web app
  2. Cascade Lambda function
  3. Word count Lambda function

Node.js Web App

Express is a simple web server framework for Node.js. Once Node.js is installed, you can easily install it with the command npm install express.

The web app serves two purposes: It receives a query from the client and streams the response back to the client. In the case of this very simple app, we will only implement the response stream and the query parameters will provide an S3 prefix. The query parameters have been hardcoded to an empty string in the client-side Javascript, meaning that all S3 keys in the bucket will be examined. The user will browse to the root of the URL, and a small bit of Javascript will initiate the streaming request. A basic query initiated by the Javascript from a web client will look like http://myserver/stream?prefix= which will execute a word count on all files found in the configured bucket. By default, the public awssampledb bucket is used. You can substitute your own bucket by providing Node.js with a BUCKET=mybucket environment variable at runtime.

The code snippet below from app.js shows the basic Node.js event stream pipeline creation and execution. The listStream, lambdaStream, serverSentStream, and HTTP response object are connected together in the pipeline to stream keys into Lambda functions for processing and partial results returned from the functions to the HTTP event stream.

listStream
.pipe(lambdaStream, { end: false }) // Do not propagate end   .pipe(serverSentStream)
.pipe(res); // Pipe results to HTTP response object

// Initiate the pipeline by writing bucket/prefix
listStream.write({
  bucket: BUCKET,
  prefix: req.query.prefix
});

Now that we’ve seen the Node.js app’s role, let’s look at the Lambda functions themselves and how they are created.

Cascade Lambda Function

The cascade.js Lambda function is rather simplistic but serves the important role of ensuring that even very large workloads that require thousands of Lambda functions will not overwhelm a single Node.js app. Instead, these functions serve as Lambda function “multiplier” proxies that handle the work of invoking the low-level Lambda functions and aggregating the results. Behind the scenes, the Lambda API uses HTTP transactions to perform operations. This middle tier also distributes the overhead of spinning up thousands of concurrent Lambda functions, which requires thousands of HTTP transactions.

This cascade Lambda function is provided a bucket, a list of keys, and optionally, the number of S3 keys to be sent to each of the word count Lambda functions (the batchSize). The batches are invoked in parallel, and when complete, the aggregate result is returned.

function invoke(keys, cb){
  var tail = 'Tail';
  var params = {
    FunctionName: 'wordcount',
    InvocationType: 'RequestResponse',
    LogType: tail, //None or Tail
    Payload: JSON.stringify({
      bucket: bucket,
      keys: keys
    })
  };
  var lambda_id = lambda.invoke(params, function(err, obj){
    if (err){
      cb(err, null);
      return;
    }
    if (tail === 'Tail'){
      var log = (new Buffer(obj.LogResult, 'base64')).toString();
      var matches = log.match(/Billed Duration: (d+) ms/);
      if (matches && matches.length > 1){
        lambdaBilledMS += parseInt(matches[1]);
      }
    }
    cb(null, JSON.parse(obj.Payload));
  });
}

// Chop our given list of keys up into batchSize'd batches
var batches = [];
var batch = [];
for (var i = 0, len = allKeys.length; i < len; i++){
  batch.push(allKeys[i]);
  if (batch.length >= batchSize){
    batches.push(batch.slice());
    batch = [];
  }
}
if (batch.length){
  batches.push(batch.slice());
}

// Invoke each batch in parallel, returning aggregated result when
//   all are finished.
async.map(batches, invoke,
  function (err, results) {
    if (err) {
      console.error('error on invoke', err);
      context.fail('async.map error: ' + err.toString());
      return;
    }
    
    var totalWords = 0;
    var totalLines = 0;
    for (var i = 0, len = results.length; i < len; i++){
      totalWords += results[i][0];
      totalLines += results[i][1];
    }
    context.succeed([
      totalWords,
      totalLines,
      lambdaBilledMS
    ]);
});

In addition to processing the results themselves, by requesting the Lambda log, we can parse out the billed milliseconds to produce a complete billing report for the total execution cost of the Lambda functions.

Word Count Lambda Function

The word count Lambda function takes a list of S3 keys, downloads and unzips them, splits the lines into words, and returns the aggregate count. It will do this in parallel for each key as it is retrieved in a stream.

The wordcount.js Lambda function uses the async, event-stream, zlib, and AWS libraries, which are all shipped with the standard Node.js runtime environment on Lambda. The pipeline code doing the word count looks like this:

var lineCount = 0;
var totalWords = 0;

// Create our processor that splits lines/words
var lineSplitter = es.split();
lineSplitter.on('data', function(line){
  lineCount++;
  totalWords += line.split(/W/).length;
})
.on('end', function(err){
  if (err) context.fail(err);
  console.log('Finished ' + srcKey + ' with ' 
     + totalWords + ' in ' + lineCount + ' lines');
   cb(null, [totalWords, lineCount, warnings]);
 });

// Create our pipeline
s3.getObject({
  Bucket: srcBucket,
  Key: srcKey
})
.createReadStream()
.pipe(zlib.createGunzip())
.pipe(lineSplitter);

There are a few important things to note about this design. By streaming the file downloads, we prevent the Lambda functions from idling while the download from S3 completes, and the files in S3 may be of arbitrary size as the Node.js event stream only operates on a given chunk at a time. This means even very large files will not cause out of memory errors. It’s also important to note that it is more efficient to process several files in parallel to avoid starving the Lambda function during the download, since the word count Lambda function can generally process faster than it can download a compressed file.

At the end of the Lambda, context.success() is called with the final word and line counts as the return value along with any generated warnings. This is serialized as JSON and returned to the invoker, cascade.js.

Putting It All Together

Now that we have seen each individual component up close, let’s look at the total output. When we browse to the root of the URL, the Node.js app will retrieve the index.html page. This page creates the server-sent framework object that makes an XHR to the Node.js app’s stream endpoint.

var source = new EventSource('stream?prefix=');
source.addEventListener('message', function(e) { 
  console.log(e.data);   
  var result = JSON.parse(e.data);

Once this connection is established, the browser will keep the HTTP session alive, reading newline-separated data and interpreting it as JSON. This is the transport mechanism used to update the web page in real time, showing progress and logging as the words are counted.

Click the image below for a demonstration.

As the log messages scroll, warnings propagate, showing that certain files were too big to be processed in the maximum allotted processing time. This illustrates one important aspect of using Lambda functions—they are limited to one minute of execution time. Your app must be aware of timeouts and handle them accordingly. In this application, the word count Lambda function (wordcount.js) constantly checks to see if it is about to timeout and exits with partial results and a warning.

var timeDiff = process.hrtime(startTime);
if (timeDiff[0] >= MAX_EXEC_TIME){
  var msg = 'Hit max execution time of ' + MAX_EXEC_TIME

One approach to address this would be to split large files into ranges and use S3’s capability of retrieving given content ranges to share the load across multiple Lambda functions. Keep in mind that many compression algorithms such as gzip do not allow such splitting, so data must be compressed using a format that can be split, such as LZ4, which allows splitting a file into parts and decompressing the ranges in isolation.

As the Node.js app executes its pipeline and streams results back to the browser, the client-side event handling code updates the browser’s DOM, changing the text and shade of the indicator boxes. Rapid updates produce fluid changes in the user interface, shifting the white background of the counter boxes through a gray scale until it is black at 100%. The EventSource object reduces the number of Javascript objects and HTTP transactions required, which keeps the activity from adversely affecting the browser’s performance.

Dynamic Visualizations

A map-reduce framework is a good way to aggregate a large amount of data, but sometimes a simple scatter-gather pattern is necessary. In the next example, we’ll re-use the Lambda functions to provide an alternate function in which the individual key names, words, and lines are reported and visualized.

This alternate behavior is achieved by passing the no_agg=1 parameter to the Node.js app’s stream endpoint. This passes the parameter through to the cascading Lambda function as well as the word count Lambda function itself. This parameter instructs the Lambda functions not to aggregate the result as it propagates, providing a fully granular response suitable for display in a scatter plot. This chart is rendered by the D3 visualization studio and is accessed by browsing to the /scatter URL.

Click the image below for a demonstration.

The scatter plot is dynamically updated as individual Lambda functions complete. The results are streamed using the server-sent events as before. The user can hover over the individual points to see the file name, word count, and line count.

Going Further

In this post, I’ve shown you how to scale up arbitrary data processing initiated via a user interface with results presented in real time. This is a blank slate on which you can build any application by making appropriate modifications to the Lambda functions and supporting code. It is particularly suited to apps that have a visual, user-driven component and need to support arbitrary sizes of computation for user queries. You can fork the example code repository as a starting place for your own applications.

If you have questions or suggestions, please leave a comment below.

Martin Holste is not an Amazon employee and does not represent Amazon.

—————————-

Related:

How Expedia Implemented Near Real-time Analysis of Interdependent Datasets