AWS Big Data Blog

Presto-Amazon Kinesis Connector for Interactively Querying Streaming Data

This is a guest post by Sivaramakrishnan Narayanan, Member of Technical Staff at Qubole, and Xing Quan, Director of Product Management at Qubole. Qubole is an AWS Advanced Technology Partner.

Amazon Kinesis is a scalable and fully managed service for streaming large, distributed data sets. As applications (particularly on mobile and wearable devices) start to collect more and more data, Amazon Kinesis is becoming the starting point for data ingestion into AWS. However, all that data is only useful when it can be analyzed and used to influence business decisions. Many solutions can consume Kinesis streams for processing in various ways, but none of them offered near real-time querying of Kinesis using SQL. We at Qubole realized this, and worked on the Presto connector for Kinesis.

Presto is designed to meet that interactive and near real-time SQL querying use case. Originally developed and open sourced by Facebook, Presto has since been widely adopted by organizations such as Netflix, Dropbox, and Airbnb to interactively query petabyte-scale datasets. At Qubole, we offer Presto as a Service on AWS so our end customers, typically data scientists and business analysts, can easily get started with interactive SQL querying. Qubole manages the underlying Presto cluster configuration and automatically provisions and scales the cluster to meet jobs’ demands.

Most of our customers store data in Amazon Simple Storage Service (S3) and use a combination of Presto, Hive, Spark and MapReduce (all of which Qubole also offers as services) for interactive and batch workloads. However, with the fast, interactive processing of Presto, we’ve increasingly heard from customers the desire to query from a real-time streaming source, such as Amazon Kinesis. To meet this demand, we built an open source connector for Presto to read and query directly from Kinesis. The illustration below shows how the connector works (sketch inspired by Martin Kleppman’s blog).

Open-source connector for Presto that reads directly from Kinesis

User applications push data into Kinesis shards. The Presto cluster consists of a number of workers and one coordinator process. The Kinesis connector maps each shard to one worker responsible for reading data from the shard using GetRecords API. Every record in Kinesis is just a blob, and the connector must be told how to interpret these as rows of a logical table. The user, as part of table definitions (described later), tells the connector how to interpret the blob. The connector then translates the Kinesis record into a table record, which is then processed by different query stages within the Presto engine. Each worker also optionally records how much data they’ve read in each shard in a DynamoDB table. This is used for the checkpointing feature described a little later in the article.

We demonstrate the capabilities of the connector within Qubole using a sample application that will take us back to the 90s when NASA made public HTTP requests to the NASA Kennedy Space Center WWW server in Florida. To travel back in time, follow these steps.

  1. You will need an AWS account with access to Kinesis streams. Ensure that the machine on which you’re going to generate data has access to Kinesis via the DefaultAWSCredentialsProviderChain. For example, you can set environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  1. Download, build, and untar the kinesis-data-gen tool. Use this tool to create an Amazon Kinesis stream called nasahttp and push http logs in JSON form into the stream. You can use sample_data.txt as seed data.
[localhost datagen-0.0.1-SNAPSHOT]$ ./bin/kinesis-datagen -f etc/sample_data.txt -create -n nasahttp -s 1
Jun 29, 11:01:26 AM com.qubole.kinesis.nasa.DataGenerator safeDeleteStream INFO: trying to delete nasahttp
Jun 29, 11:01:32 AM com.qubole.kinesis.nasa.DataGenerator safeDeleteStream INFO: stream nasahttp not found
Jun 29, 11:01:36 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: waiting for stream to become active
Jun 29, 11:01:36 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: CREATING
Jun 29, 11:01:38 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: CREATING

Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.DataGenerator safeCheckStream INFO: ACTIVE
Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.DataGenerator pushRecords INFO: opening file etc/sample_data.txt
Jun 29, 11:02:25 AM com.qubole.kinesis.text.FileLineReader end INFO: records = 9
Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamProducerRunnable run INFO: producer is done
Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: producer is done
Jun 29, 11:02:25 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: waiting for consumers to finish
Jun 29, 11:02:25 AM com.qubole.kinesis.nasa.RecordParser parseRecord SEVERE: unable to parse:
Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamConsumerRunnable run INFO: no-op count 1
Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Produced 9 records
Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Consumed 9 records
Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Total time 1.16748948 seconds
Jun 29, 11:02:26 AM com.qubole.kinesis.executor.StreamExperiment runExperiment INFO: Records per second 7.7088489054308225
  1. To create a much larger stream, download the NASA HTTP requests dataset from here. You can use these files as sample files to seed the data generator.
  1. Sign up for an account in Qubole to use Qubole’s Presto as a Service. You can enter your AWS credentials to be able to access the Amazon Kinesis stream you created. This short guide walks you through how to set up your Presto cluster.
  1. Once you’ve created an account with Qubole, you will see a Presto cluster auto-configured in your Clusters page. Click on the edit icon to see cluster configuration details and scroll down to Presto Settings section.

  1. In this section, you can enter the following text. You can read through Connector Configuration and Table Definitions section of the wiki page to understand this in detail.
config.properties:
datasources=jmx,hive,kinesis

catalog/kinesis.properties:
connector.name=kinesis
kinesis.table-names=nasahttp

kinesis/nasahttp.json:
{
	"tableName": "nasahttp",
	"schemaName": "default",
	"streamName": "nasahttp",
	"message": {
    	"dataFormat": "json",
    	"fields": [
        	{"name": "host", "mapping": "host", "type": "VARCHAR"},
        	{"name": "at", "mapping": "timestamp", "type": "VARCHAR"},
        	{"name": "request", "mapping": "request", "type": "VARCHAR"},
        	{"name": "code", "mapping": "code", "type": "BIGINT"},
        	{"name": "bytes", "mapping": "bytes", "type": "BIGINT"}
    	]}
}
  1. Go back to the Analyze page and compose a Presto command against this table. The screenshot below shows different components of the Analyze page. The first Presto query will launch the cluster, which may take ~2 minutes. You will see logs with progress information while the query is executing and results when the query is done. The query in this example finds hosts that have requested maximum bytes from the HTTP server.
select host, sum(bytes) as total_bytes from kinesis.default.nasahttp group by host order by total_bytes desc limit 2;

Congratulations! You have just run your first SQL query against the Amazon Kinesis stream using Presto! You can use the same kinesis-datagen tool to insert additional data into kinesis (don’t use the -create option) and rerun the same query. You will see different numbers in total_bytes.

The Presto-Amazon Kinesis connector also supports checkpointing queries. If you want to run queries on incremental data that has come in since the last query, you can use this feature. As an example, you can use this feature to build a dashboard that shows top 5 requesters every minute. See checkpointing documentation on how to use this feature.

The connector is open-source (Apache license) and available on github. The README file describes how you can build the connector against a specific version of Presto and install the binaries. The configuration wiki page will walk you through the right configuration elements to connect to Amazon Kinesis. The table definitions wiki page shows you how to describe a Presto-Amazon Kinesis table, including the data format and field mappings. You also find can find instructions on how to install and run Presto on your laptop or in a cluster.

Finally, Amazon Kinesis holds data for a fixed period of time and it is desirable to offload data onto S3 on a daily basis in formats that are more suitable for analytics like ORC or Parquet. Hive is well-suited for this use-case. We’ve built and open-sourced an Amazon Kinesis storage handler (available on github). This storage handler can be used in Hive to query directly from Amazon Kinesis and write to partitioned tables in HDFS or S3 in optimized formats like ORC or Parquet. This connector also supports checkpointed access to Amazon Kinesis to ensure that data is written out exactly once for historical analysis.

We would like to thank Shubham Agarwal, Shubham Tagra and Akhilesh Anand for their contributions to this project. We’re excited to make interactive querying of Amazon Kinesis streaming data sets available to our customers.

If you have questions or suggestions, please leave a comment below.

———————————-

Related:

Snakes in the Stream: Feeding and Eating Amazon Kinesis Streams with Python