Node.js Streaming MapReduce with Amazon EMR

Ian Meyers is a Solutions Architecture Senior Manager with AWS

Introduction

Node.js is a JavaScript framework for running high performance server-side applications based upon non-blocking I/O and an asynchronous, event-driven processing model.

When customers need to process large volumes of complex data, Node.js offers a runtime that natively supports the JSON data structure. Languages such as Python and Ruby have excellent support for JSON data, but typically wrap structures with lists and arrays. Node.js offers a high performance and scalable alternative with native support for JSON data as objects. An AWS SDK is now available for Node.js, which allows for integration between Node.js applications and AWS services.

In this post, you'll learn how to install Node.js on Amazon Elastic MapReduce (Amazon EMR); how to build and integrate a Node.js application with the Hadoop Streaming parallel processing architecture; and finally, how to deploy and run our Node.js MapReduce application on AWS. For more information about Amazon EMR and Hive, see the tutorials at EMR and Hive, see the tutorials at http://aws.amazon.com/articles/2854.

This article assumes you have a good understanding of Hadoop Streaming, Javascript, and Amazon EMR.

Use Case

In this article, we will consume data from Twitter that contains a complex graph of information about tweets, re-tweets, replies, and direct messages. Each file contains a single block of Twitter interactions that we read and write to Amazon Simple Storage Service (Amazon S3). We want to transform the data for Hive to aggregate metrics such as re-tweet rate, tweets per second, and direct messages per user.

Sample Input Data

Our data represents a complex interaction graph between multiple users of Twitter, for both re-tweets and replies, as shown here:

For the purposes of data discovery, we could investigate this data using Hive with the JsonSerde (as outlined in http://aws.amazon.com/articles/2854). However, because this graph is complex and self-referential, most Hive JsonSerdes are unable to expose this data as a table. By processing this data in Node.js, we can easily navigate the data graph with simple syntax.

Installing Node.js

We can install Node.js on an Amazon EMR cluster node with a bootstrap action:

--bootstrap-actions Path=s3://github-emr-bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js

(If you've never used a Bootstrap Action, Amazon EMR documentation shows you how).

Writing MapReduce Code

Now that Node.js is on the Amazon EMR cluster and Hadoop Streaming is correctly configured, we need to be able to run our Node.js application for map and reduce operations. Using the Hadoop Streaming processing architecture, we can specify the mapper and reducer code to use for a streaming job.

Like any other Hadoop Streaming compatible language, we must read data from the Amazon S3 data storage or HDFS file system using standard input (stdin). In Node.js, stdin is accessible in a variety of ways, including the process global object (http://nodejs.org/api/process.html).and through the readline module (https://nodejs.org/api/readline.html ).

There are a few main functions that our MapReduce program must perform to read data from Hadoop Streaming and output results.

Configure and Buffer Standard Input

In our example, we'll use the readline module to buffer events from stdin into our Node.js application. To do this, we create an instance of the readline libraries and bind it to stdin:

var readline = require('readline');
var rl = readline.createInterface({
	input : process.stdin
});

We can then receive input events by line using:

rl.on('line', function(line) {
	exports.extractTweetInfo(line);
});

We are calling an exported function `extractTweetInfo` because it can make it much simpler to test our application, because we can just call this function with test data to ensure it does what we like.

Implement your data processing logic

Our application is going to extract a subset of Tweet information, and output it in a format that can be used by Reduce functions. One reason that we would use Node.js for Hadoop processing, is that it makes working with JSON data very simple. We can turn a text line of JSON into a Node.js object using JSON.parse:

// function which extracts and re-emits the bits of the tweet we want, in a format which is suitable for Hadoop MapReduce
exports.extractTweetInfo = function(line) {
	obj = JSON.parse(line);

We can then extract information that we want to include in our calculation treating the line item as a Node.js object:

var a = item.interaction.author;
	var dateComponents = item.interaction.created_at.split(' ');
	var d = [ dateComponents[1], dateComponents[2], dateComponents[3] ].join(' ');

	var interaction = {
		objectId : obj.id,
		hash : obj.hash,
		...
	};

When we implement a Reducer, we want to ensure that it receives all data for a certain value. To enable this, we must indicate a key value on which Hadoop sorts the output before calling the reducer. In plain text processing, this value is indicated by a string followed by a tab (\t).

To write data for storage or reduction, write to stdout using process.stdout:

	process.stdout.write(d + '\t' + JSON.stringify(interaction) + '\n');

Make the File Executable

Amazon EMR runs mapper and reducer scripts by calling them with command line syntax, such as './mapper.js'. As such, we need to ensure that the Node.js module we've built can be called from the command line. To do this, we add a standard 'shebang' comment at the top of the mapper or reducer files so that it calls Node.js to run the contents of the script:

#!/usr/bin/env node

Next, you can test your mapper code (in a file called Mapper.js, for example) by calling on the command line:

./mapper.js < input-file-path

Deploying & Running

After we've written our mappers and reducers, we transfer them to Amazon S3, and run MapReduce against some input data using Amazon EMR.

The following example shows how to perform the steps using the Amazon EMR CLI (https://aws.amazon.com/cli), but you can also achieve the same results using the equivalent commands on the Amazon EMR console (console.aws.amazon.com/elasticmapreduce), the Amazon EMR API (http://docs.aws.amazon.com/ElasticMapReduce/latest/API/Welcome.html?r=8857) or Data Pipeline (https://aws.amazon.com/datapipeline)

We’ll show how to run this application automatically using the AWS command line tools, but you could equally use the AWS Web Console or AWS Data Pipeline. We can launch a new Amazon EMR cluster using the --create-cluster command to start a cluster and run the Node JS bootstrap action:

aws emr create-cluster --ami-version 3.9.0 --enable-debugging --visible-to-all-users --name MyNodeJsMapReduceCluster --instance-groups  InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge --no-auto-terminate --enable-debugging --log-uri s3://<my-log-bucket>/EMR/logs --bootstrap-actions Path=s3://github-emr-bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js --ec2-attributes KeyName=<my-ec2-key>,InstanceProfile=EMRJobflowDefault --service-role EMRRole --region <region>

This creates an always-on cluster with the 3.9.0 AMI, 2 core nodes and 1 master node, all using the m3.xlarge instance type. It also sets up debugging and Logging to the specified bucket, and installs Node.js on startup with a bootstrap action. It also uses an Amazon EC2 Key pair so that you can SSH onto the Hadoop Cluster.

Next, we’ll add a Hadoop Streaming step to process our input data. We add a step to the cluster <my cluster ID> with:

aws emr add-steps --cluster-id <my cluster ID> --steps Name=NodeJSStreamProcess,Type=Streaming

We add our mapper and reducer JavaScript files by creating a filesystem reference (using the --files argument) and then reference the base filenames in -mapper and -reducer:

Args=--files,"s3://<path to mapper>/mapper.js\,s3://<path to reducer>/reducer.js",-mapper,mapper.js,-reducer,reducer.js 

Then we add the location of the input and output files:

-input,s3://<path-to-input-files>,-output,s3://<path-to-output-files>

This gives us the full command line invocation:

aws emr add-steps --cluster-id <my cluster ID> --steps Name=NodeJSStreamProcess,Type=Streaming,Args=[--files,"s3://github-aws-big-data-blog/aws-blog-nodejs-on-emr/src/sample-mapper.js\,s3://github-aws-big-data-blog/aws-blog-nodejs-on-emr/src/sample-reducer.js",-input,s3://github-aws-big-data-blog/aws-blog-nodejs-on-emr/sample/tweets,-output,s3://<my output bucket>/node_sample,-mapper,sample-mapper.js,-reducer,sample-reducer.js]

You only pay for the amount of time the process is running, and no other interaction with the Hadoop cluster is required to generate data. Ensure that when running the above example, you terminate your cluster when you have finished. You can do this with the Amazon EMR command line with:

aws emr terminate-clusters --cluster-ids <my cluster ID>

Conclusion

Node.js can provide fast execution of MapReduce applications with terse, native syntax for processing complex JSON data. With Amazon EMR configuration options, you can easily run Node.js-based applications that scale over time or with input data volumes.

Appendix - Sample Map Reduce Application

The following MapReduce program outputs the tweet count by day for complex JSON-structured data; in this case, Twitter data collected using DataSift (datasift.com). It also escapes special characters such as newlines and tabs, and outputs the tweet created_at field as the key. The reducer then rolls up this data by date, outputting the total number of tweets per day.

Mapper

#!/usr/bin/env node

// use RL to read lines from stdin
var readline = require('readline');
var rl = readline.createInterface({
	input : process.stdin
});

// escape all control characters so that they are plain text in the output
String.prototype.escape = function() {
	return this.replace('\n', '\\n').replace('\'', '\\\'').replace('\"', '\\"').replace('\&', '\\&').replace('\r', '\\r')
			.replace('\t', '\\t').replace('\b', '\\b').replace('\f', '\\f');
};

// function which extracts and re-emits the bits of the tweet we want, in a format which is suitable for Hadoop MapReduce
exports.extractTweetInfo = function(line) {
	obj = JSON.parse(line);

	obj.interactions.map(function(item) {
		var a = item.interaction.author;
		var dateComponents = item.interaction.created_at.split(' ');
		var d = [ dateComponents[1], dateComponents[2], dateComponents[3] ].join(' ');

		var interaction = {
			objectId : obj.id,
			hash : obj.hash,
			id : item.interaction.id,
			author_id : a.id,
			author_avatar : a.avatar,
			author_link : a.link,
			author_name : a.name,
			author_username : a.username,
			content : item.interaction.content.escape(),
			created_at : item.interaction.created_at,
			link : item.interaction.link,
			schema_version : item.interaction.schema.version,
			source : item.interaction.source
		};

		process.stdout.write(d + '\t' + JSON.stringify(interaction) + '\n');
	});
}

// fire an event on each line read from RL
rl.on('line', function(line) {
	exports.extractTweetInfo(line);
});

Reducer

#!/usr/bin/env node

// use RL to read lines from stdin
var readline = require('readline');
var rl = readline.createInterface({
	input : process.stdin
});

// variable used as a daily accumulator
var interactionSummary = {
	day : '',
	count : 0
};

// generate a JSON object from the captured input data, and then generate
// the required output
exports.processMapData = function(data) {
	lineElements = data.split('\t')
	var keyDate = lineElements[0];
	obj = JSON.parse(lineElements[1]);

	if (interactionSummary.day === '') {
		interactionSummary.day = keyDate;
		interactionSummary.count = 1;
	} else {
		if (keyDate !== interactionSummary.day) {
			process.stdout.write(JSON.stringify(interactionSummary) + '\n');

			interactionSummary.day = keyDate;
			interactionSummary.count = 1;
		} else {
			interactionSummary.count += 1;
		}
	}
};

// fire an event on each line read from RL
rl.on('line', function(line) {
	exports.processMapData(line);
});

// final event when the file is closed, to flush the final accumulated value
rl.on('close', function(line) {
	process.stdout.write(JSON.stringify(interactionSummary) + '\n');
});

Running the Sample

As in the article to create the cluster, and then use the following to add a processing step:

aws emr add-steps --cluster-id <my cluster ID> --steps Name=NodeJSStreamProcess,Type=Streaming,Args=--files,"s3://github-aws-big-data-blog/aws-blog-nodejs-on-emr/scripts/sample-mapper.js\,s3://github-aws-big-data-blog/aws-blog-nodejs-on-emr/scripts/sample-reducer.js",-input,s3:// github-aws-big-data-blog/aws-blog-nodejs-on-emr/tweets,-output,s3://<my output bucket>/node_sample,-mapper,sample-mapper.js,-reducer,sample-reducer.js

Where <my output bucket> is the name of the bucket where you would like output to be created. When completed, multiple files will reside in the configured output bucket and path, and will contain a rollup of the number of tweets for the single day in the sample data set:

{"day":"14 Feb 2013","count":1071}

If you have questions or suggestions, please leave a comment below.

---------------------------------------------------------------

Article updated October 30, 2015

---------------------------------------------------------------

Love to work on open source? Check out EMR's careers page.

----------------------------------------------------------------

Comments