AWS Big Data Blog

Use Sqoop to Transfer Data from Amazon EMR to Amazon RDS

Sai Sriparasa is a consultant with AWS Professional Services

Customers commonly process and transform vast amounts of data with Amazon EMR and then transfer and store summaries or aggregates of that data in relational databases such as MySQL or Oracle. This allows the storage footprint in these relational databases to be much smaller, yet retain the ability to process larger, more granular datasets using the Hadoop ecosystem in Amazon EMR.

In this post, I will show you how to transfer data using Apache Sqoop, which is a tool designed to transfer data between Hadoop and relational databases. Support for Apache Sqoop is available in Amazon EMR releases 4.4.0 and later.

Tutorial

This post contains step-by-step instructions on how to create an EMR cluster with Sqoop, process a sample dataset on Hive, build the aggregates that are ready to be exported, create a MySQL database instance on Amazon RDS, and then use Sqoop to export the data into RDS from EMR. The following graphic shows the computing resources used for the tutorial.

Prerequisites

You need to pick a region and VPC to launch these instances into. For this tutorial, I used the us-west-2 region, and the AWS CLI to launch the computing resources.

The scripts for the Hive tables & MySQL tables are in the GitHub repository in the hive-tables.hql and mysql-tables.sql files. The Sqoop options file is available in the options-file.txt on GitHub. The API calls are available in the GitHub repository in the audience-commands.txt file.

Step 1: Build a cluster with Sqoop

You have a VPC and security groups, so you can use the create-cluster CLI command to build the EMR cluster with Sqoop and receive the cluster ID as part of the response.

In the following command, make these changes:

  • Replace “your-key” and “your-bucket” with your pem key and S3 bucket.
  • Replace “your-bucket” with your S3 bucket.
  • Replace “sg-masterid” and “sg-slaveid” with the security group IDs for EMR.
aws emr create-cluster --applications Name=Hadoop Name=Hive Name=Hue Name=Sqoop-Sandbox --tags 'Purpose=Sqoop-Blog' --ec2-attributes '{"KeyName":"your-key","InstanceProfile":"EMR_EC2_DefaultRole"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.5.0 --log-uri 's3://your-bucket/logs' --name 'Sqoop-Demo' --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master instance group - 1"},{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core instance group - 2"}]' --region us-west-2

You should receive the following response:

Step 2: Create a Hive table

The cluster is running, so you can log onto the master node and create a Hive table.  For this tutorial, you will use an hour’s worth of data that contains page view statistics.

The following script builds an external table on an hour’s worth of data and then creates aggregates to be stored in your bucket. Replace “your-bucket” in the script with your S3 bucket.

DROP TABLE IF EXISTS pageviews;

CREATE EXTERNAL TABLE pageviews(
  code STRING,
  page STRING,
  views INT,
  bytes STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
LINES TERMINATED BY 'n'
STORED AS TEXTFILE
LOCATION 's3://aws-bigdata-blog/artifacts/sqoop-blog/';

DROP TABLE IF EXISTS pv_aggregates;

CREATE TABLE  pv_aggregates(
        dt string,
        code string,
        views int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
LINES TERMINATED BY 'n'
STORED AS TEXTFILE
LOCATION 's3://your-bucket/data/sqoop/hive/pv-aggregates/';

WITH
 q1 as (SELECT DISTINCT split(split(INPUT__FILE__NAME, '/')[7], '-')[1] FROM pageviews),
 q2 as (SELECT code, sum(views) as sm FROM pageviews GROUP BY code ORDER BY sm DESC LIMIT 10)
INSERT OVERWRITE TABLE pv_aggregates
SELECT * FROM q1,q2;

Step 3: Launch an RDS instance

The EMR cluster is running and the dataset to export to RDS is ready.

To launch an RDS instance, you need to create a subnet group or use an existing subnet group. In the following command, replace “subnetid1”, “subnetid2” and “subnetid3” with the IDs for the subnets in your VPC. You can also use a custom group name, or stay with ‘sqoop blog’.

aws rds create-db-subnet-group --db-subnet-group-name sqoop-blog --db-subnet-group-description sqoop-blog --subnet-ids subnet-subnetid1 subnet-subnetid2 subnet-subnetid3 --tags Key=Purpose,Value=Blog

You should receive the following response:

Step 4: Create a security group

Another prerequisite for launching an RDS instance is creating an Amazon EC2 security group that can be assigned to it. In the following command, replace “vpc-yourvpcid” with your VPC ID.

aws ec2 create-security-group --group-name blog-rds --description "SG for Sqoop Blog" --vpc-id vpc-yourvpcid

Step 5: Add security group rules

You need to add some rules to the new security group in order for the required resources to be able to access the RDS instance. MySQL needs port 3306 to be open, and you open the ports only to the instances launched with EMR source groups. In the following commands, replace “sg-masterid” and “sg-slaveid” with the source group IDs for EMR.

aws ec2 authorize-security-group-ingress --group-id sg-cbc48eac --protocol tcp --port 3306 --source-group sg-masterid
aws ec2 authorize-security-group-ingress --group-id sg-cbc48eac --protocol tcp --port 3306 --source-group sg-slaveid

Step 6: Build the RDS instance with MySQL

Now that you have the subnet group and security group, use the create-db-instance command to build the RDS instance with a MySQL database engine.

aws rds create-db-instance --db-instance-identifier sqoop-blog  --db-instance-class db.t1.micro --engine MySQL --db-name  sqoopblog --db-subnet-group-name sqoop-blog --vpc-security-group-ids sg-securitygroupid --allocated-storage 5 --backup-retention-period 0 --master-username your-username --master-user-password your-password

Step 7: Get the instance endpoint

After creating the RDS instance, use the describe-db-instances command to retrieve the endpoint so that you can connect to the RDS instance. Use the database instance identified in Step 3, ‘sqoop-blog’ or your custom name. The RDS endpoint is in the “Endpoint” section.

aws rds describe-db-instances --db-instance-identifier sqoop-blog

Step 8: Access the instance

You have the endpoint but you can only access the RDS instance from the EMR cluster. Log in to the EMR cluster and use the “mysql” command line utility to access the database engine.

In the following command, replace “db-instance-identifier” using the same db-instance-identifier as in Step 7. When you receive a prompt to enter the password, enter the “master-password” value from Step 6.

mysql -u admin -h db-instance-identifier -p

Step 9: Create the tables

You can now use the database that was created when the RDS instance was launched. Use the following script to create the table that will house the aggregates. You also create a staging table with the same schema. After creating the table, disconnect from MySQL.

CREATE TABLE `pv_aggregates` ( `dt` varchar(10) DEFAULT NULL, `code` varchar(20) DEFAULT NULL, `views` int(11) DEFAULT NULL );

CREATE TABLE pv_aggregates_stg LIKE pv_aggregates;

Step 10: Install the JDBC driver

Sqoop requires MySQL’s JDBC driver to be installed in order to talk to the MySQL database engine. Download the driver and install it in /usr/lib/sqoop/lib.

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.38.tar.gz
tar -xvzf mysql-connector-java-5.1.38.tar.gz
sudo cp mysql-connector-java-5.1.38/mysql-connector-java-5.1.38-bin.jar /usr/lib/sqoop/lib/

Step 11: Get the tables available

Now that you have a table created, use Sqoop to get a list of tables that are available in the database.

In the following command, use the –P option to be prompted to enter a password. Replace “db-instance-endpoint” with the RDS endpoint retrieved in Step 7, and replace “yourdatabase” with the name of your database in which the “pv_aggregates” table was created.

sqoop list-tables --connect jdbc:mysql://db-instance-endpoint/yourdatabase --username your-username -P

You can use the “–password” option with Sqoop, but the password needs to be provided directly. In the next step, I show two more methods by which passwords can be provided.

You should receive the following response:

Step 12: Store encrypted passwords

One way to avoid entering the password on every prompt is to store it in a password file. However, the password stored in that file will be in clear text; hence, it is not considered secure.

In this step, you use the Java Key Store (JKS) to store passwords that would be encrypted. Using Hadoop’s CredentialProvider API, you achieve the separation of applications and how they store their required passwords. After running the following command, you are prompted to enter a password: enter the password to the RDS instance.

hadoop credential create sqoop-blog-rds.password -provider jceks://hdfs/user/hadoop/sqoop-blog-rds.jceks

You should receive the following response:

Step 13: Export aggregates

Use the Sqoop tool to export the aggregates that were built in Step 2.

Here are a few best practices for exporting with Sqoop:

  • Options file—As commands with Sqoop export and Sqoop import tend to be bigger in size, I recommend storing the commands in an options file. By keeping it in an options file, you can even make it part of a version control pipeline to monitor changes to the command.
  • Field termination—With Sqoop export, I recommend providing field termination metadata using the “–fields-terminated-by” option. Also, other formatting options such as “lines-terminated-by”, “enclosed-by”, “escaped-by”, etc., can be used as required.
  • Mapper tuning—When an export job is submitted, Sqoop creates a Java class and submits a MapReduce job based on input splits; then, each mapper connects to the database to export the data. The default number of mappers is 4, so I recommend tuning the number of mappers depending on the availability of processors on the cluster. Too many mappers might cause the load to increase on the database. We recommend that you monitor the number of connections and keep track of processlist on MySQL.
  • Staging table—The Sqoop export job is broken down into multiple transactions based on the mappers. Each transaction is therefore atomic and does not have any dependencies on other transactions. I recommend using the “–staging-table” option that acts as the buffer table for the separate transactions. After all transactions have been committed, a single transaction move is made to move the data to the final destination. Use the “–clear-staging-table” option to clean up the staging table after the export job.

In the following file, replace “your-db-identifier” and “your-db-name” with your database identifier from Step 7. Replace “your-username” with the user name for the MySQL instance from Step 6.

export
-Dhadoop.security.credential.provider.path=jceks://hdfs/user/hadoop/sqoop-blog-rds.jceks
--connect
jdbc:mysql://sqoop-blog.c70ekihosiai.us-west-2.rds.amazonaws.com/sqoopblog
--username
admin
--password-alias
sqoop-blog-rds.password
--table
pv_aggregates
--staging-table
pv_aggregates_stg
--fields-terminated-by
' '
--export-dir
s3://your-bucket/data/sqoop/hive/pv-aggregates/

Use the following command to export the aggregates:

sqoop --options-file options-file.txt

Step 14: Verify data export

Log back into RDS and check the table to verify that the data export was successful. Use the following command:

SELECT * FROM pv_aggregates;

You should receive the following response:

Cleaning up

After you finish the tutorial, use the following commands to delete the AWS resources that were created so that you no longer accrue charges.

aws rds delete-db-instance --db-instance-identifier sqoop-blog --skip-final-snapshot
aws ec2 delete-security-group --group-name blog-rds
aws rds delete-db-subnet-group --db-subnet-group-name sqoop-blog
aws emr modify-cluster-attributes --cluster-id j-your-clusterid --no-termination-protected
aws emr terminate-clusters --cluster-ids j-your-clusterid

Summary

That’s it! You’ve learned how use Apache Sqoop on EMR to transfer data between Hadoop and relational databases. You created an EMR cluster with Sqoop, processed a sample dataset on Hive, built the aggregates that are ready to be exported, created a MySQL database instance on Amazon RDS, and then used Sqoop to export the data into RDS from EMR.

This processs allows the storage footprint in your relational databases to be much smaller while providing you with the ability to process larger, more granular datasets using the Hadoop ecosystem in Amazon EMR.

In a future post, I plan to show how these operations with Sqoop can be scheduled and automated with Oozie.

If you have questions or suggestions, please comment below.

———————–

Related

Using Spark SQL for ETL