Turning Amazon EMR into a Massive Amazon S3 Processing Engine with Campanile

Michael Wallman is a senior consultant with AWS ProServ

Have you ever had to copy a huge Amazon S3 bucket to another account or region? Or create a list based on object name or size? How about mapping a function over millions of objects? Amazon EMR to the rescue! EMR allows you to deploy large managed clusters, tightly coupled with S3, transforming the problem from a single, unscalable process/instance to one of orchestration.

The Campanile framework is a collection of scripts and patterns that use standard building blocks like Hadoop MapReduce, Streaming, HIVE, and Python Boto. Customers have used Campanile to migrate petabytes of data from one account to another, run periodic sync jobs and large Amazon Glacier restores, enable SSE, create indexes, and sync data before enabling CRR.

Traditionally, you could perform these tasks with simple shell commands: aws s3 ls s3://bucket | awk '{ if($3 > 1024000) print $0 }'. More recently, you could use complex threaded applications, having single processes make hundreds of requests a second. Now, with object counts reaching the billions, these patterns no longer realistically scale. For example, how long would it take you to list a billion objects with a process listing 1000 objects/sec? 11 days without interruption.

This post examines how the Campanile framework can be used to implement a bucket copy at speeds in excess of 100 Gbps or tens of thousands of transactions per second. Campanile also helps streamline the deployment of EMR, by providing a number of bootstrap actions that install system dependencies like Boto, write instance-type specific configuration files, and provide additional useful development and reporting tools like SaltStack and Syslog.

Each block below corresponds to an EMR step, and in most cases a single S3 API request.

Bucketlist

The first challenge of processing a large number of objects is getting the list of objects themselves. As mentioned earlier, it quickly becomes a scaling problem, and any manageable listing operation has to be distributed across the clusters. Therefore, the first and most important pattern of Campanile is the use of a part file which is a new-line separated text file in <delimiter>,<prefix> format. Relying on the S3 lexicographically sorted index and the ability to list keys hierarchically, Hadoop splits the part file across mapper tasks and lists the entire bucket in parallel. The more parts/nodes there are, the more concurrent listing operations. Using this model, can you list more than a million objects per second? YES

For a bucket containing objects in UUID format, a part file might look like the one below. Upon the completion of the BucketList step, a list of objects is written to hdfs.

Distributed diff using Hive

Now that you have an easy method for listing very large buckets, deriving the difference between two buckets is no longer a tedious, difficult, or time consuming task. If a diff is required, simply list the destination and run the Hive script below. Finding one missing or changed object amongst millions or billions is no problem. 

NOTE: The Hive script relies on Hive variables, an additional feature of EMR where you can include variables in scripts dynamically by using the dollar sign and curly braces.

DROP TABLE IF EXISTS src;  
DROP TABLE IF EXISTS dst;  
DROP TABLE IF EXISTS diff;  
CREATE EXTERNAL TABLE src(key STRING, etag STRING, size BIGINT, mtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '${SRC}';  
CREATE EXTERNAL TABLE dst(key STRING, etag STRING, size BIGINT, mtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '${DST}';  
CREATE EXTERNAL TABLE diff(key STRING, etag STRING, size BIGINT, mtime STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION '${DIFF}';  
INSERT OVERWRITE TABLE diff SELECT src.key,src.etag,src.size,src.mtime FROM src LEFT OUTER JOIN dst ON (src.key = dst.key) WHERE (dst.key IS NULL) OR (src.etag != dst.etag);

Multipartlist

Probably the most complex function of the group, multipartlist pre-processes objects that were uploaded to S3 using the MultiPart API operation (for more information, see Multipart Upload Overview). These objects have ETag values suffixed with a hyphen and the number-of-parts that made up the original object.  While S3 supports a maximum of 5 GB per PUT operation, you can use MultiPart, to tie together any number of parts between 1 and 10,000, and create an object up to 5 TB in size. 

NOTE: Uploading 5 GB in a single PUT is both inefficient and error prone. This is why many of the S3 tools, including the AWS CLI and console, start using MultiPart on objects greater than 8 MB.

NOTE: MultipartUploadInitiate is the operation where the destination object’s metadata, cache control headers, ACLs, and encryption parameters are set.

To replicate ETags across buckets, you must mimic the original object part map. Campanile currently supports a few functions for this calculation (and is looking to expand). But splitting larger objects also has a positive effect on performance. While non-multipart objects are serviced by a single mapper, multipart objects can be processed asynchronously across independent mapper tasks. Thousands of parts of the same object, all being processed at the same time.

NOTE: Objects without a multipart ETag value are simply passed through, as seen in object 00 below.

Objectcopy and Multipartcomplete

Corresponding to S3 API operations GET and PUT, objectcopy does most of the work in terms of transactions per second (tps) and overall network throughput. Reading the output of Multipartlist, it downloads each object/part in the list, and immediately uploads it to the destination bucket. For single-part objects, the destination’s metadata, cache control headers, ACLs, and encryption parameters are set in this step..

To maximize performance, it relies on two configuration parameters set by Campanile's EMR bootstrap action. The first setting is ephemeral, which is a comma-separated list of the instance’s ephemeral mount points. To distribute load across disks, one is randomly selected as the temporary location of the downloaded object/part. But in most cases, a downloaded object/part never reaches the disk because of the second setting. Maxtmpsize (dependent on the instance's memory size) tells the tempfile class to flush bytes to disk only when this size is reached. Objects or parts that are smaller than maxtmpsize, stay entirely in memory. See an example of Campanile’s configuration below.

$ cat /etc/campanile.cfg
[DEFAULT]
maxtmpsize=134217728
ephemeral=/mnt,/mnt1

For single-part objects, the copy completes here. Multipart uploads require a final UploadComplete request, which incorporates information about each individual part. Therefore, objectcopy outputs the results of each uploaded part, for the reducer(s[WM3] ) to process. Remember, parts are being uploaded and processed asynchronously, so Campanile relies on Hadoop’s Reducer function to sort and group parts that constitute a single object. Multipartcomplete, orders part data, and signals the upload complete. This completes the copy process.

Conclusion

Within the aws-big-data-blog repo, you can find Campanile code samples, test files, detailed documentation, and a test procedure to exercise the steps covered above. From these samples, you can unlock the power of S3 and EMR. So clone the repo and get going!

If you have any questions or suggestions, please leave a comment below.

-----------------------------------------------------------

Related

Nasdaq's Architecture using Amazon EMR and Amazon S3 for Ad Hoc Access to a Massive Data Set

 

 

Comments