AWS Big Data Blog

Using AWS Data Pipeline’s Parameterized Templates to Build Your Own Library of ETL Use-case Definitions

February 2023 Update: Console access to the AWS Data Pipeline service will be removed on April 30, 2023. On this date, you will no longer be able to access AWS Data Pipeline though the console. You will continue to have access to AWS Data Pipeline through the command line interface and API. Please note that AWS Data Pipeline service is in maintenance mode and we are not planning to expand the service to new regions. For information about migrating from AWS Data Pipeline, please refer to the AWS Data Pipeline migration documentation.

In an earlier post, we introduced you to ETL processing using AWS Data Pipeline and Amazon EMR. This post shows how to build ETL workflow templates with AWS Data Pipeline, and build a library of recipes to implement common use cases. This is an introduction to parameterized templates, which serve as proven recipes and can be shared as a library of reusable pipeline definitions with other parts of the organization/company or contributed to the larger community.

Data Pipeline allows you to define complex workflows for data movement and transformation. You can easily create complex data processing workloads, manage inter-task dependencies, and configure retries of transient failures and failure notification for individual tasks.

Creating Pipelines Using Built-in Templates

Data Pipeline supports a library of parameterized templates for common ETL use cases. The Create Pipeline page on the AWS Management Console provides an option to create your pipeline using templates.

Create Pipeline page on AWS Management Console

Select the template, fill out the parameters, and the pipeline is ready to launch. In a few clicks, you can create and launch pipelines for complex ETL use cases. Currently, this library has parameterized templates for common use cases related to Amazon DynamoDB, Amazon Redshift, Amazon RDS, and Amazon EMR. For more information, see the related blog post AWS Data Pipeline Update – Parameterized Templates on the AWS blog.

Authoring New Templates

Data Pipeline also supports the use of custom templates. You can author a template for your ETL use case and use it for launching pipelines.

The JSON-formatted template consists of three sections: “objects”, “parameters,” and “values.” The objects section consists of a list of pipeline objects that define your pipeline workflow, the parameters section defines the attributes of each parameter, and the values section defines values for parameters.

{
  "objects": [],
  "parameters": [],
  “values”:{}
}

Parameters are referenced within a pipeline object using a parameter ID, such as “#{<myparameterId>}”. The parameter ID always starts with the prefix my. Other attributes within the parameter object are available to help provide more context to the parameter, such as “description”, “type”, “optional”, “watermark”, “default”, “isArray”, etc. For more details, see Template Parameter Objects.

The values section in the template is a list of parameter values that are used by the pipeline at runtime, after the pipeline is activated. Values must be present either in the values section or as the default field in the parameters section. Values can also be applied externally to the pipeline definition as arguments to the PutPipelineDefinition or ActivatePipeline API actions (see AWS documentation to learn more about Data Pipeline API). Values for the parameter object default field are taken as the default if no values are present in the values section. Actions return errors for empty or null values in the parameters or values sections.

Let’s look at how to build a template for copying DynamoDB tables from one AWS region to another using EMR.

Copying DynamoDB tables across regions using EMR

DynamoDB tables can be copied from one region to another using Hive DynamoDBStorageHandler on an EMR cluster. To learn how to export and import DynamoDB using EMR, see Hive Command Examples for Exporting, Importing, and Querying Data in DynamoDB.

The pipeline objects and dataflow for this use case do not change for different users. However, the source and destination tables, region, and read and write throughput can vary, and are therefore parameterized. You can also parameterize other object fields, such as pipelineLogUri and EMR cluster fields.

The template given is the JSON template for copying a DynamoDB table from one AWS region to another.

The pipeline objects in the definition below define a workflow with DynamoDB table fields parameterized. The parameters section defines the attributes for the parameterized field values. The values section in the template are optional but, if provided, will be the default values for the parameters.

{
  "objects": [
    {
      "id": "Default",
      "scheduleType": "cron",
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "name": "Default",
      "pipelineLogUri": "#{myPipelineLogUri}",
      "role": "DataPipelineDefaultRole",
      "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
      "region": "#{mySourceTableRegion}",
      "id": "EmrClusterForCopy",
      "terminateAfter": "2 Hours",
      "amiVersion": "3.3.2",
      "masterInstanceType": "m1.medium",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "name": "EmrClusterForCopy",
      "type": "EmrCluster"
    },
    {
      "region": "#{mySourceTableRegion}",
      "id": "DDBSourceTable",
      "tableName": "#{mySourceTableName}",
      "name": "DDBSourceTable",
      "dataFormat": {
        "ref": "DDBExportFormat"
      },
      "type": "DynamoDBDataNode",
      "readThroughputPercent": "#{mySourceReadThroughputRatio}"
    },
    {
      "id": "DefaultSchedule",
      "startDateTime": "2015-04-07T21:55:00",
      "occurrences": "1",
      "name": "RunOnce",
      "type": "Schedule",
      "period": "1 days"
    },
    {
      "resizeClusterBeforeRunning": "true",
      "id": "TableCopyActivity",
      "input": {
        "ref": "DDBSourceTable"
      },
      "name": "TableCopyActivity",
      "runsOn": {
        "ref": "EmrClusterForCopy"
      },
      "type": "HiveCopyActivity",
      "output": {
        "ref": "DDBDestinationTable"
      }
    },
    {
      "id": "DDBExportFormat",
      "name": "DDBExportFormat",
      "type": "DynamoDBExportDataFormat"
    },
    {
      "region": "#{myDestTableRegion}",
      "id": "DDBDestinationTable",
      "writeThroughputPercent": "#{myDestWriteThroughputRatio}",
      "tableName": "#{myDestTableName}",
      "name": "DDBDestinationTable",
      "dataFormat": {
        "ref": "DDBExportFormat"
      },
      "type": "DynamoDBDataNode"
    }
  ],
  "parameters": [
    {
      "id": "myDestWriteThroughputRatio",
      "watermark": "Enter value between 0.1-1.0",
      "default": "0.2",
      "description": "Destination table write throughput ratio",
      "type": "Double"
    },
    {
      "id": "mySourceTableName",
      "description": "Source table name",
      "type": "String"
    },
    {
      "id": "mySourceTableRegion",
      "watermark": "us-east-1",
      "description": "Source table region",
      "optional": "true",
      "type": "String"
    },
    {
      "id": "myDestTableRegion",
      "watermark": "us-east-1",
      "description": "Destination table region",
      "optional": "true",
      "type": "String"
    },
    {
      "id": "myDestTableName",
      "description": "Destination table name",
      "type": "String"
    },
    {
      "id": "mySourceReadThroughputRatio",
      "watermark": "Enter value between 0.1-1.0",
      "default": "0.2",
      "description": "Source table read throughput ratio",
      "type": "Double"
    },
   {
      "id": "myPipelineLogUri ",
      "description": "Pipeline log uri",
      "type": " AWS::S3::ObjectKey"
    }
  ],
  "values": {
    "myDestWriteThroughputRatio": "0.2",
    "myDestTableName": "DestinationTable",
    "myDestTableRegion": "us-west-2",
    "mySourceTableName": "SourceTableName",
    "mySourceTableRegion": "us-east-1",
    "mySourceReadThroughputRatio": "0.2",
   “myPipelineLogUri”:”s3://<>/”
  }
}

Importing a Template from the Console

AWS Data Pipeline template support helps separate the user authoring the template from those who ultimately use the template. You can create templates for common use cases and share them within your organization or community, using Amazon S3 or your own local file system.

Importing a template from the console

After the template is defined, it can be used to create pipelines. The Create Pipeline page on the AWS Management Console provides an option to import definitions from S3 or your local file system.

After you choose and import the template, parameters in the template are rendered as a form. This allows you to launch pipelines in a few clicks.

Launching a pipeline

You can further customize the pipeline workflow by navigating to the Architect page. If you would like to change parameter values after the pipeline has been activated, change them on the Architect page and activate the pipeline again. You must activate the pipeline again after the changes have been saved because the Data Pipeline console updates the saved pipeline definitions of active pipelines.

Note: Currently, the console enables importing of S3 templates in the user’s account and from disk. To import an S3 template shared across accounts, first download the file locally and import it using the Load local file option on the console.

Importing Templates from the AWS CLI

Templates can also be used with the AWS CLI and SDKs. A JSON file imported using the CLI can have all three sections in a single JSON file, or given as separate arguments to a CLI command. You can also overwrite parameter values when activating a pipeline without having to edit the pipeline definition again.

PutPipelineDefinition with parameters using the AWS CLI

Parameter values can be provided as arguments with the put-pipeline-definition command. For a list type parameter values, use the same key name and specify each value as a key-value pair (for example, arrayValue=value1 arrayValue=value2). In JSON list types, parameter values are expressed as an array of values (for example, “arrayValue” : [“value1”, “value2”]).

$aws datapipeline put-pipeline-definition --pipeline-definition file://./DynamoDBCrossRegionCopyTemplate.json --parameter-values myDestWriteThroughputRatio=0.2  myDestTableName=DestinationTable  myDestTableRegion=us-west-2  mySourceTableName=SourceTableName  mySourceTableRegion=us-east-1  mySourceReadThroughputRatio=0.2 --pipeline-id df-002163815JNZCB8XOUQS

OR

$aws datapipeline put-pipeline-definition --pipeline-definition file://./DynamoDBCrossRegionCopyTemplate.json --parameter-values-uri  file://./DynamoDBCrossRegionCopyTemplateParameters.json

Below is the DynamoDBCrossRegionCopyTemplateParameters.json file content:

{
  "values": {
    "myDestWriteThroughputRatio": "0.2",
    "myDestTableName": "DestinationTable",
    "myDestTableRegion": "us-west-2",
    "mySourceTableName": "SourceTableName",
    "mySourceTableRegion": "us-east-1",
    "mySourceReadThroughputRatio": "0.2"
  }
}

ActivatePipeline with parameters using the AWS CLI

You can overwrite parameter values with the activate-pipeline command. Parameter values provided during activation are applied to executions triggered after the activation of the pipeline.

$aws datapipeline activate-pipeline --parameter-values myDestWriteThroughputRatio=0.2  myDestTableName=DestinationTable  myDestTableRegion=us-west-2  mySourceTableName=SourceTableName  mySourceTableRegion=us-east-1  mySourceReadThroughputRatio=0.2 --pipeline-id df-002163815JNZCB8XOUQS

OR

$aws datapipeline activate-pipeline --parameter-values-uri file://./DynamoDBCrossRegionCopyTemplatePrameters.json --pipeline-id df-002163815JNZCB8XOUQS

Conclusion

AWS Data Pipeline parameterized templates make it easy to implement complex workflows, and to author complex pipelines and share them within your organization or community.

Resources:

AWS CLI reference (AWS Data Pipeline)

AWS Data Pipeline API reference

 AWS SDK for Java API Reference

If you have questions or suggestions, please comment below.


Related:

ETL Processing Using AWS Data Pipeline and Amazon EMR

Data Pipeline workflow


About the author

Leena Joseph is an SDE for AWS Data Pipeline.