AWS Big Data Blog

Ensuring Consistency When Using Amazon S3 and Amazon Elastic MapReduce for ETL Workflows

February 2023 Update: Console access to the AWS Data Pipeline service will be removed on April 30, 2023. On this date, you will no longer be able to access AWS Data Pipeline though the console. You will continue to have access to AWS Data Pipeline through the command line interface and API. Please note that AWS Data Pipeline service is in maintenance mode and we are not planning to expand the service to new regions. For information about migrating from AWS Data Pipeline, please refer to the AWS Data Pipeline migration documentation.

The EMR File System (EMRFS) is an implementation of HDFS that allows Amazon Elastic MapReduce (Amazon EMR) clusters to store data on Amazon Simple Storage Service (Amazon S3). Many Amazon EMR customers use it to inexpensively store massive amounts of data with high durability and availability. However, Amazon S3 was designed for eventual consistency, which can cause issues for certain multi-step, extract-transform-load (ETL) data processing pipelines. For instance, if you list objects in an Amazon S3 bucket immediately after adding new objects from another step in the pipeline, the list may be incomplete. Because this list is the input for the next step, the set of files being processed in that stage of the job will be incomplete.

Creating a Consistent View of Amazon S3 for Amazon EMR

To address the challenges presented by Amazon S3’s eventual consistency model, the Amazon EMR team has released a new feature called “consistent view” for EMRFS. Consistent view is an optional feature that allows Amazon EMR clusters to check for list and read-after-write consistency for new Amazon S3 objects written by or synced with EMRFS. If it detects that Amazon S3 is inconsistent during a file system operation, it will retry that operation according to user defined rules. Consistent view does this by storing metadata in Amazon DynamoDB to keep track of Amazon S3 objects. This creates stronger ETL pipelines by making sure the output from a previous step is completely listed as the input for the current step. By default an Amazon DynamoDB table is created to hold the EMRFS metadata with 500 read capacity and 100 write capacity, so there is a small Amazon DynamoDB charge associated with enabling consistent view.  The table read/write capacity settings are configurable depending on how many objects EMRFS is tracking and the number of concurrent nodes reading from the metadata.

One of the many strengths of Hadoop is its robust applications that provide several higher-level languages and analytic functions to express operations on your data. We see customers using Hive, which uses a SQL-like language called HiveQL, in conjunction with Pig, which uses a syntax called Pig Latin, to create complex ETL pipelines for data stored in Amazon S3. In many cases, Amazon EMR users store the output of certain portions of the workflow in Amazon S3 so it can be durably stored throughout a long pipeline or be available for processing by a different Amazon EMR cluster for the next step. Consistent view checks to make sure these intermediates in Amazon S3 are consistent for all steps in the pipeline.

When consistent view is enabled, Amazon EMR also has better performance when listing Amazon S3 prefixes with over 10,000 objects. In fact, we’ve seen a 5x increase in list performance on prefixes with over 1 million objects. This speed-up is due to using the EMRFS metadata, which is required for consistent view, to make listing large numbers of objects more efficient.

Using EMRFS Consistent View in an ETL Pipeline

Below is an example of how you can set up a multi-step Amazon EMR workflow using Hive and Pig with Amazon S3 in a consistent way using EMRFS consistent view. The data used in this example is a sample set of logs with information about website visitors such as IP address, time, search term, and referrer. You can download one of the log files to view the raw data. We will use Hive and Pig to transform these raw logs into a useful report, showing hourly traffic, top browsers used, how many visitors were referred from Google or Bing, and the top 10 Google keywords when referred.

Create an EMR Cluster

We will use the AWS Management Console to create the Amazon EMR cluster. On the EMR Create Cluster page, create a cluster with these configuration changes:

  1. Termination protection: off
  2. Logging: disabled
  3. AMI version: 3.2.1

  1. Consistent view: enabled. This creates an instance of EMRFS metadata called “EmrFSMetadata” (or use an already existing instance of the EMRFS metadata with that name) and uses it to track files written by EMRFS.

Add a Pig Step for the First Part of the Workflow

Next we add the steps for the Amazon EMR cluster to execute. For the first step of the ETL pipeline, we use Pig to read the raw logs from Amazon S3 and create two new output files in Amazon S3.

  1. Go to the bottom of the Create Cluster page to the Steps section, and add and configure a Pig program.
    1. In the Add Step dialog box that appears when you click Configure and add, change the following:
      1. Name to “Pig Program_Step 1”
      2. Script S3 location to s3://emr-workshop/scripts/pig_sample.pig
      3. Input S3 location to s3://elasticmapreduce/samples/pig-apache/input
      4. Output S3 location to s3://mybucket/myfolder (change “mybucket” to an existing Amazon S3 bucket in your AWS account and “myfolder” to anything you want)
      5. Action on Failure to Terminate cluster
    2. Click Add.

Let’s take a closer look at this script you provided in this step (download the script to check it out). To start, we load the raw logs into a sequence of tuples, and convert each log string into a structure with named fields:

raw_logs =
  LOAD '$INPUT' USING TextLoader AS (line:chararray);

logs_base =
  FOREACH
    raw_logs
  GENERATE
    FLATTEN (
      EXTRACT(
        line,
'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+)   (\S+) "([^"]*)" "([^"]*)"'
      )
    )
    AS (
      remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
      request: chararray, status: int, bytes_string: chararray, referrer: chararray,
      browser: chararray
    );

Next, Pig cleans up each log entry by formatting the time value to DateTime and the bytes value to integer.

	    FOREACH
    logs_base
  GENERATE
    *,
    FORMAT_DT('yyyy-MM-dd HH:mm:ss',DATE_TIME(time, 'dd/MMM/yyyy:HH:mm:ss Z', 'UTC')) as dtime,
    (int)REPLACE(bytes_string, '-', '0')as bytes;

Finally, Pig filters the logs for entries with the referrer value matching Google or Bing and records the search term associated with that record.

	google_and_bing_urls =
  FILTER
(FOREACH logs GENERATE FORMAT_DT('yyyy-MM-dd HH:mm:ss',DATE_TIME(time, 'dd/MMM/yyyy:HH:mm:ss Z', 'UTC')) as  (time:chararray),referrer,   FLATTEN(EXTRACT(referrer, '.*[&\?]q=([^&]+).*')) as (term:chararray) , browser)
  BY
    referrer matches '.*bing.*'
  OR
    referrer matches '.*google.*';

EMRFS writes the output files from this job to the Amazon S3 location you provided in the step.  It also adds entries about these files to the consistent view in the EMRFS metadata (which will be automatically created when the cluster starts), stored in an Amazon DynamoDB table called “EmrFSMetadata,” so EMRFS can track their consistency.

Add a Hive Step for the Second Part of the Workflow

Now, shift attention back to the Create Cluster page to add the next step in the ETL pipeline. For the second and final step, we use Hive to process the output from the previous Pig job located in Amazon S3 to create the final reports.

  1. Add and configure a Hive program under the Steps section.
    1. In the dialog box that appears when you click Configure and add, change the following:
      1. Name to “Hive Program_Step 2”
      2. Script S3 location to s3://emr-workshop/scripts/hive_sample.q
      3. Input S3 location to the output location of the previous step (e.g. s3://mybucket/myfolder)
      4. Output S3 location to s3://mybucket/myfolder/finaloutput (changing “mybucket” and “myfolder” to the values you used in the last step with Pig)
      5. Arguments to “-d DATE=2014-10-22”
      6. Action on Failure to “terminate cluster”
    2. Click Add.

When this Hive step starts, it invokes EMRFS to use the Amazon S3 list operation on “s3://mybucket/myfolder” to identify the objects in the input set. Because the output of the previous Pig job is being tracked in the consistent view, Amazon EMR can make sure that all of the objects are present in the list before continuing the job (instead of continuing the job with an inconsistent list).

Let’s take a closer look at this Hive script (download the script to check it out). The script will first create two tables: one from the cleaned up log reports and one from the subset of data about Google and Bing search terms (both were outputs from the Pig step).

DROP TABLE IF EXISTS logs;
CREATE EXTERNAL TABLE logs
(
remoteAddr string, remoteLogname  string, user  string, time_local  string, request  string, status  int, bytes_string  string, referrer  string, browser  string, time_UTC string, bytes int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't'
LOCATION '${INPUT}/pig_output_logs_all/';

DROP TABLE IF EXISTS searchEngine;
CREATE EXTERNAL TABLE searchEngine
(
time string,
referrer string,
term string,
browser string
)
ROW FORMAT DELIMITED FIELDS TERMINATED by 't'
LOCATION '${INPUT}/pig_output_logs_bing_google/';

The first Hive query creates a report showing the hourly traffic:

	INSERT OVERWRITE DIRECTORY '${OUTPUT}/Hourly_Traffic_${DATE}'
	Select HOUR(time_UTC), count(remoteAddr)
	from logs
	group by HOUR(time_UTC)
	order by 1 desc; 

The second query generates an ordered list of the top browsers accessing the website:

	INSERT OVERWRITE DIRECTORY '${OUTPUT}/Report_Top_browsers_${DATE}'
	select browser, count(remoteAddr)
	from logs
	group by browser
	order by 2 desc;

The third report creates a report showing the counts of keywords from Google and Bing referrals:

	INSERT OVERWRITE DIRECTORY '${OUTPUT}/Google_Bing_refer_${DATE}'
	select case when referrer like '%bing%' then 'Bing'
	when referrer like '%google%' then 'Google' end, count(time) as Number_References
	from searchEngine
	group by case
	when referrer like '%bing%' then 'Bing'
	when referrer like '%google%' then 'Google'
	end;

And the final report shows the top 10 keywords from Google referrals:

	INSERT OVERWRITE DIRECTORY '${OUTPUT}/Top_10_KeyWords_Google_${DATE}'
	select term, count(time)
	from searchEngine
	where referrer like '%google%'
	group by term
	order by 2 desc
	limit 10;

Now that you’ve added the two steps in the ETL pipeline and looked at the work being done in the Pig and Hive scripts, we’re almost ready to launch the cluster. Make sure to select “yes” for Auto-terminate to shut down your cluster after your workflow is complete, and click “Create cluster.” Your Amazon EMR cluster will start up and begin processing the logs, and you can check on the status of your cluster on the Cluster Details page.

View the Output and Clean Up Your EMRFS Metadata Store

The final outputs of this job are written to Amazon S3 using EMRFS, and the Amazon EMR cluster shuts down after the workflow is complete.  Go check your Amazon S3 output location from the Hive step, and your reports should be ready for consumption.

Your EMRFS metadata will remain in Amazon DynamoDB after your Amazon EMR cluster terminates since it could be used with other Amazon EMR clusters in later workflows. In this example we do not need a consistent view of these objects in Amazon S3 for other EMR clusters or future workloads so we can delete the EMRFS metadata.  To do this, go into the Amazon DynamoDB console, select the table EmrFSMetadata and delete it.

Note: you can automate this process by adding a final step to the Amazon EMR workflow which calls the EMRFS CLI to delete the EMRFS metadata. From the Steps section on the Create Cluster page, you would add a Custom JAR with these settings:

  1. JAR location as s3://elasticmapreduce/libs/script-runner/script-runner.jar
  2. Arguments as “/home/Hadoop/bin/emrfs delete-metadata”
  3. Action on Failure as Terminate cluster

This was a simple example using two steps on a single cluster to process log data. For real production ETL workflows, you can have daily or hourly pipelines with many steps and multiple clusters to process your data. AWS Data Pipeline, another service on AWS, is great for constructing recurring, complex workflows using Amazon EMR and other AWS services. Now, go forth and process!


About the author

Jonathan Fritz is a Senior Product Manager for Amazon EMR. AWS Solutions Architect Manjeet Chayel also contributed to this post.