AWS Big Data Blog

Streaming Analytics with DataTorrent RTS and Amazon EMR

Nick Durkin is a Senior Solution Engineer for DataTorrent. DataTorrent is an AWS Technology Partner.

In this blog post, we introduce fast big data and provide context about the DataTorrent RTS streaming analytics platform. In addition, we show you how to implement a real-time, streaming analytics application for capturing social media trends from Twitter using DataTorrent RTS. This post shows all the required steps for implementing such an application including how to spin up Amazon EMR, install DataTorrent RTS, configure Twitter as an input source, create an application to consume tweets, and then display a trend of top URLs in real time.

Fast Big Data and Streaming Analytics

Data is generated today in not only unprecedented volume and variety, but also velocity. Human-created data is surpassed by machine-generated data (for example, sensor data, mobile devices, and transaction data) at a very rapid pace. We refer to this as “fast big data”. Fast big data can provide organizations with valuable business insight and help them take advantage of these insights in a timely manner.

Organizations have been using big data, batch processing, and enterprise data warehouses (EDW) to gather information about business and power decisions, using insights gained from that data. Organizations today are looking for options beyond batch processing, to generate insights faster and react to them appropriately. Streaming analytics is rapidly becoming the norm as enterprises rush to deliver differentiated offerings to generate revenue or create operational automated efficiencies to save costs.

DataTorrent RTS

DataTorrent RTS provides a real-time, fast big data, stream-processing solution that delivers fault tolerance, linear scalability, and guaranteed event processing. This enables organizations to process data in motion, harness the full potential of real-time big data, and take opportune action.

DataTorrent RTS is a Hadoop 2.x native application, and as such runs unmodified on EMR. It is fully integrated via YARN and HDFS.  Streaming applications on DataTorrent RTS are built using Java modules called operators.  There are more than 450 operators in the open source code available via the Malhar project (https://github.com/DataTorrent/Malhar).

Building an Application for Streaming Analytics on Fast Big Data

To illustrate real-time stream processing with DataTorrent RTS, we’ll use Twitter as the source of events. While websites, consumer mobile devices, and IoT-enabled devices might generate millions of events per second of varying length and formats, Twitter has events or tweets generated at the rate of 6-7K events per second with a max size of 140 characters. Despite the low velocity, Twitter still is a great real-time, event-based data source to demonstrate the basic concepts behind streaming analytics.

The steps below demonstrate how you can implement an application to find the top URLs referred in tweets using EMR with Hadoop 2.4.0 and DataTorrent 2.0.

Prior to starting, ensure that you have:

  1. An AWS login with an Amazon EC2 key pair for you to connect using SSH into the EMR master node
  2. A Twitter account with a consumer key (API key), consumer secret (API secret), access token, and access token secret

Downloading DataTorrent

Go to http://www.datatorrent.com, click Download DataTorrent on the upper right corner of the page and then click Download in the Evaluation Edition field. Provide your contact Information on the next page and then click Submit. On the subsequent page, open the evaluation edition page. At the bottom of the evaluation edition page, there is a command line similar to the screenshot below; which you can use to download DataTorrent RTS from the EMR master node.

Amazon EMR Setup

Launch a new cluster using the AWS CLI command listed below:

aws emr create-cluster --name DT-EMR-LAB --ami-version 3.3.1  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m1.medium  InstanceGroupType=CORE,InstanceCount=2,InstanceType=m1.medium --ec2-attributes  KeyName=keyPairName

Note: Replace the string keyPairName with appropriate values before running the statement above. The unique identifier returned, in the form of j- XXXXXXXXXXXX, is your cluster-id value.

You can also use the EMR console to launch a cluster with the default settings. After your cluster is running, you should be able to see the cluster in the EMR console with a status of “Waiting”:

Installing DataTorrent RTS

Follow the steps provided by the SSH link next to the Master public DNS field on the EMR console page to connect using SSH into the EMR cluster master node. Run the previously captured command to download the DataTorrent RTS package:

After the download completes, run the following command to install DataTorrent on the EMR master node:

	sh ./datatorrent-rts*

This installs the DataTorrent platform and start the DTGateway on port 9090.

To connect to the DTGateway, you need to set up a proxy to the EMR master node. You can enable the same by following the proxy configuration process outlined by the Enable Web Connection link on the EMR console next to the Connections field. After you’ve successfully completed setting up the proxy, refresh the cluster details page and the Enable Web Connection link changes to Resource Manager. This demonstrates that the proxy is functioning.

Now you can connect to the DTGateway by pointing your browser to port 9090 of the master public DNS (http://master-public-dns:9090), to display a welcome screen similar to the following screenshot:

Click Continue through the Welcome, Hadoop, License, and Summary pages to finish the installation with the default parameters. After the setup completes, you should see the DataTorrent operations page similar to the following screenshot:

The diagram below shows the DataTorrent RTS application deployed on the EMR cluster. The core nodes host the YARN node managers, the master node hosts the resource manager. DataTorrent RTS is installed on the master node, which hosts DT Gateway and acts as client to the cluster from which applications are launched. There is no need to install anything on the core nodes.

Launch Twitter Demo

Click Develop, and then import demos, marked as step 1 and 2 in the following screenshot:

Select DataTorrent Twitter Demo and then click Import. From the list of packages displayed, select DataTorrent Twitter Demo and then click Launch.

For the application to read tweets from Twitter, select “Specify custom properties” and then click on “add required properties”. Insert your consumer key (API key), consumer secret (API secret), access token, and access token secret into the appropriate value field, and click Save this configuration as. Provide a name for the file to save your custom properties, and allow other applications to use the same properties.

After that’s done, click Launch to start the application. You should see a confirmation similar to the one below:

The state of the application should turn to RUNNING and statistics should start counting. To visualize output tuples, click physical, select topURLs, and then click record a sample, marked as steps 1, 2, and 3 in the following screenshot:

The TwitterDemo application was launched through DT Gateway, which acts as the YARN client that submits the application to the resource manager. YARN takes over and allocates the application master (AM) container. After DT AppMaster launches, it translates the application into a physical plan and requests the resources required for execution from YARN.

After the YARN containers are allocated, DT AppMaster deploys the operators into the containers. The deployed operators establish the data flow (shown as black arrows), which does not involve the AM. The processing is decentralized and asynchronous. The AM is responsible for monitoring the execution and orchestrating recovery in the event of failure. Operators may interface with external systems (here, the Twitter API for input and WebSocket for publishing results).

Code Walkthrough

As you can see, DataTorrent RTS has an intuitive user interface that is easy to use.  Now, look at the code used in the Twitter URL demo. All operator code is available as open source on Github (https://github.com/DataTorrent/Malhar/). The Twitter-specific demo code is at https://github.com/DataTorrent/Malhar/tree/release-2.0.0/demos/twitter/src/main/java/com/datatorrent/demos/twitter/.

There are two key concepts: operators and applications. Operators define the processing logic and applications define how operators connect with streams. Those are two different types of development.

The first is operator logic. This is the code used to affect the data directly. Look at the operator named “URLExtractor” in the application (shown yellow in the diagram). This operator receives the Twitter API Status objects from the TweetSampler operator. The following code snippet shows the logic used to extract the URL from these objects and emit them to the output port to the downstream UniqueCounter operator.

The second is application logic. This strings together all of the operators and defines the streams between them. This logic builds the DAG (Directed Acyclic Graph).

Installing Web App Visualizations for Demos

Web App Visualizations require “nodejs” for visualizations, and “forever” to run the web app as a service in the background. The Sandbox comes preconfigured with the Web App Visualizations for Demos page running; however, the install we used requires manual setup. This is simply to prevent overwriting installations or configurations that already exist on a system.

Use SSH to connect to the EMR cluster, and run the following commands:

wget https://rpm.nodesource.com/setup
sudo sh ./setup
sudo yum install –y nodejs
sudo npm install -g forever
cd datatorrent/current/demos/ui/
sed -i s/node/forever/ start.sh
./start.sh

After that’s complete, verify that the Web App Visualizations for Demos is up and running by pointing your browser to port 3003 of the Master Public DNS (http://master-public-dns:3003/). This should display the top URL referred in the tweets in the last 5 minutes, and is updated continuously every half-second.

Cleanup

After you’re finished, you can choose Shutdown App from the DataTorrent console and terminate the EMR cluster using the AWS Management Console.

Conclusion

In this post, we walked through the deployment of a real-time, streaming analytics application—Twitter Hashtag Trending—to demonstrate the power, flexibility, and ease of use of DataTorrent RTS on EMR.  We also provided a look at the code written to create the application.

In addition to supporting EMR, DataTorrent also provides integration with Amazon Kinesis for reading from and writing to Amazon Kinesis streams. You can access the Amazon Kinesis application (https://github.com/DataTorrent/Malhar/tree/release2.0.0/contrib/src/main/java/com/datatorrent/contrib/kinesis), and use it to build real-time analytics applications in a cost-effective, scalable, and efficient way.

If you have a question or suggestion, please leave a comment below.

—————————————————

Related:

Processing Amazon Kinesis Stream Data Using Amazon KCL for Node.js

—————————————————————

Love to work on open source? Check out EMR’s careers page.