AWS Big Data Blog

Using AWS Lambda for Event-driven Data Processing Pipelines

February 2023 Update: Console access to the AWS Data Pipeline service will be removed on April 30, 2023. On this date, you will no longer be able to access AWS Data Pipeline though the console. You will continue to have access to AWS Data Pipeline through the command line interface and API. Please note that AWS Data Pipeline service is in maintenance mode and we are not planning to expand the service to new regions. For information about migrating from AWS Data Pipeline, please refer to the AWS Data Pipeline migration documentation.

Some big data customers want to analyze new data in response to a specific event, and they might already have well-defined pipelines to perform batch processing, orchestrated by AWS Data Pipeline. One example of event-triggered pipelines is when data analysts must analyze data as soon as it arrives, so that they can immediately respond to partners. Scheduling is not an optimal solution in this situation. The main question is how to schedule data processing at an arbitrary time using Data Pipeline, which relies on schedulers.

Here’s a solution. First, create a simple pipeline and test it with data from Amazon S3, then add an Amazon SNS topic to notify the customer when the pipeline is finished so data analysts can review the result. Lastly, create an AWS Lambda function to activate Data Pipeline when new data is successfully committed into an S3 bucket—without managing any scheduling activity. This post will show you how.

Solution that activates Data Pipeline when new data is committed to S3

When Data Pipeline activity can be scheduled, customers can define preconditions that see whether data exists on S3 and then allocate resources. However, the use of Lambda is a good mechanism when Data Pipeline needs to be activated at a random time.

Cloning pipelines for future use

In this scenario, the customer’s pipeline has been activated through some scheduled activity but the customer wants to be able to invoke the same pipeline in response to an ad-hoc event such as a new data commit to an S3 bucket. The customer has already developed a “template” pipeline that has reached the Finished state.

One way to re-initiate the pipeline is to keep the JSON file with the pipeline definition on S3 and use it to create a new pipeline. Some customers have multiple versions of the same pipeline stored on S3 but are willing to clone and reuse only the version of the pipeline that has been recently executed. The light way to accommodate such request can be done by getting the pipeline definition from the finished pipeline and creating a clone. This approach relies on recently-executed pipelines and does not require the customer to keep a registry of pipeline versions from S3 and track which version has been executed recently.

Even if customers want to maintain such a registry of pipelines on S3, they might also be willing to get a pipeline definition on-the-fly from an existing pipeline using the Lambda API. They could have complicated, event-driven workflows where they need to clone finished pipelines, re-run them, and then delete the cloned pipelines. That’s why it is important to first to detect pipelines in the Finished state.

In this post, I demonstrate how you can accomplish such on-the-fly pipeline cloning. There is no direct clone API in Data Pipeline, so you implement this by making several API calls. I also provide code for deleting old clones that have finished.

Three-step workflow

  1. Create a simple pipeline for testing.
  2. Create an SNS notification to notify analysts that the pipeline has finished.
  3. Crate a Lambda function to activate the pipeline when new data get committed to an S3 bucket.

Step 1: Create a simple pipeline

Create a simple pipeline

  1. Open the AWS Data Pipeline console.
  1. If you haven’t created a pipeline in this region, the console displays an introductory screen. Choose Get started now. If you’ve already created a pipeline in this region, the console displays a page that lists your pipelines for the region. Choose Create new pipeline.
  2. Enter a name and description.
  3. Select an Elastic MapReduce (EMR) template and choose Run once on pipeline activation.
  4. In the Step field, enter the following:
/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,s3n://elasticmapreduce/samples/wordcount/input,-output,s3://example-bucket/wordcount/output/#{@scheduledStartTime},-mapper,s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate

You can adjust the number of Amazon EMR cluster nodes and select distributions. For more information about creating pipelines, see Getting Started with AWS Data Pipeline.

Step 2: Create an SNS topic

To create an SNS topic:

  1. In a new browser tab, open the Amazon SNS console.
  2. Choose Create topic.
  3. In the Topic name field, type a topic name.
  4. Choose Create topic.
  5. Select the new topic and then choose the topic ARN. The Topic Details page appears.

Topic Details page

  1. Copy the topic ARN for the next task.
  2. Create the subscription for that topic and provide your email address. AWS sends email to confirm your subscription.

To configure the topic notification action in the pipeline:

  1. In the the AWS Data Pipeline console, open your pipeline in the Architect window.
  2. In the right pane, choose Others.
  3. Under DefaultAction1, do the following:
    1. Enter the name for your notification (for example, MyEMRJobNotice).
    2. In the  Type field, choose SnsAlarm.
    3. In the Subject field, enter the subject line for your notification.
    4. In the Topic Arn field, enter the ARN of your topic.
    5. In the Message field, enter the message content.
    6. Leave Role set to the default value.

Save and activate your pipeline to ensure that it can be executed successfully.

Step 3: Create a Lambda function

On the Lambda console, choose Create a Lambda function. You can select a blueprint or just skip the first step and proceed with Step 2: Configure function, where you provide a function name (such as LambdaDP) and a description, and choose Node.js as the value for the Runtime field.

Your test pipeline is finished. Rerunning a finished pipeline is not currently supported. To re-run a finished pipeline, clone the pipeline from the template and Lambda triggers a new pipeline. You’ll need Lambda to create a new clone every time you clean up old clones. Below are helpful functions to do that. On the Lambda console, use the Code entry type and Edit code inline fields, and start with the following:

console.log('Loading function');
var AWS = require('aws-sdk'); 
exports.handler = function(event, context) {
    var Data Pipeline = new AWS.Data Pipeline();
    var pipeline2delete ='None';
    var pipeline ='df-02….T';
……….
}

Define your pipeline ID and create a variable for your cloned pipeline IDs, such as pipeline2delete. Then, add a function to check for existing clones left from previous runs, as follows:

//Iterate over the list of pipelines and check if the pipeline clone already exists 
    Data Pipeline.listPipelines(paramsall, function(err, data) {
        if (err) {console.log(err, err.stack); // an error occurred}
        else {console.log(data);           // successful response
            for (var i in data.pipelineIdList){
                if (data.pipelineIdList[i].name =='myLambdaSample') {
                    pipeline2delete = data.pipelineIdList[i].id;
                    console.log('Pipeline clone id to delete: ' + pipeline2delete); 
};

If the finished clone from a previous run has been identified, you must invoke the delete function within this loop. The sample code to do that is as follows:

var paramsd = {pipelineId: pipeline2delete /* required */};
        Data Pipeline.deletePipeline(paramsd, function(err, data) {
            if (err) {console.log(err, err.stack); // an error occurred}
            else console.log('Old clone deleted ' + pipeline2delete + ' Create new clone now');            
});

Finally, you need to make three API calls to create a new clone from your original Data Pipeline template. The APIs you can use are as follows:

  • getPipelineDefinition (for the finished pipeline)
  • createPipeline
  • putPipelineDefinition (from #1)

Below are examples of those three calls.

1. Use this pipeline’s definition to create the next clone:

    var params = {pipelineId: pipeline};  
    Data Pipeline.getPipelineDefinition(params, function(err, definition) {
      if (err) console.log(err, err.stack); // an error occurred
      else {    
          var params = {
          name: 'myLambdaSample', /* required */
          uniqueId: 'myLambdaSample' /* required */
        }; 

2. Use the pipeline definition from the definition object:

        Data Pipeline.createPipeline(params, function(err, pipelineIdObject) {
          if (err) console.log(err, err.stack); // an error occurred
          else     { //new pipeline created with id=pipelineIdObject.pipelineId
              console.log(pipelineIdObject);           // successful response
              //Create and activate pipeline
              var params = {
                  pipelineId: pipelineIdObject.pipelineId,
                  pipelineObjects: definition.pipelineObjects//(you can add parameter objects and values)

3. Use the definition from the getPipelineDefinition API result:

              Data Pipeline.putPipelineDefinition(params, function(err, data) {
               if (err) console.log(err, err.stack);
               else { 
                   Data Pipeline.activatePipeline(pipelineIdObject, function(err, data) { //Activate the pipeline finally
                      if (err) console.log(err, err.stack);
                      else console.log(data);
                   });
               }
              });
        }});
    }});

Now you have all function calls for the Lambda function. You can also wrap those calls as an independent function as follows:

  • Enter a value for the Handler field as the name of your function (LambdaDP.index).
  • Select Role, which can let you access resources like S3 and Data Pipeline.
  • Keep the default Memory and Timeout values.
  • Choose Next, review your function, and choose Create function.
  • In the Event source field, choose S3.
  • Provide the bucket name used by the pipeline.
  • In Event type, choose Put, which activates your pipeline when the new file is committed to the bucket.
  • Save the pipeline and upload a data file to your S3 bucket.
  • Check the Data Pipeline console to make sure that the new pipeline has been created and activated (you should get a SNS notification when pipeline is finished).

Conclusion

Congratulations! You have successfully cloned and launched your pipeline from a Lambda function to perform data processing after successfully committing new data to the S3 bucket. You can continue evolving your workflow to include other AWS services, such as Amazon Redshift, Amazon RDS for MySQL, and Amazon DynamoDB.

If you have questions or suggestions, please leave a comment below.

Appendix

Below is a template of the Lambda function that uses all function calls discussed above. This template is only a starting point and isn’t meant for a production environment.

console.log('Loading function');
 
var AWS = require('aws-sdk');
//var s3 = new aws.S3({ apiVersion: '2012-10-29' });
 
exports.handler = function(event, context) {
    var datapipeline = new AWS.DataPipeline();
    var pipeline2delete ='None';
    var pipeline ='df-02364022NP3BYIO2UPBT';

    var paramsall = {
        marker:''
    };
     
    //Check if pipelien clone already exist 
    datapipeline.listPipelines(paramsall, function(err, data) {
        if (err) {
            console.log(err, err.stack); // an error occurred
            context.fail('Error', "Error getting list of pipelines: " + err);
        }
        else {
            console.log(data);           // successful response
            for (var i in data.pipelineIdList){
                if (data.pipelineIdList[i].name =='myLambdaSample') {
                    pipeline2delete = data.pipelineIdList[i].id;
                    console.log('Pipeline id to delete: ' + pipeline2delete); 
                    
                    var paramsd = {
                        pipelineId: pipeline2delete /* required */
                    };
                    datapipeline.deletePipeline(paramsd, function(err, data) {
                    if (err) {
                        console.log(err, err.stack); // an error occurred
                        context.fail('Error', "Error deleting pipelines: " + err);
                    }
                    else console.log('Old clone deleted ' + pipeline2delete + ' Create new clone now');           // successful response
                    }); 
                }
                else     console.log('No clones to delete');              
            }
        }
    });

    var params = {
      pipelineId: pipeline
    }; //Using this pipeline's definition to create the next
    datapipeline.getPipelineDefinition(params, function(err, definition) {
      if (err) {
          console.log(err, err.stack); // an error occurred
          context.fail('Error', "Error getting pipeline definition: " + err);
      }
      else { 
           
          var params = {
          name: 'myLambdaSample', /* required */
          uniqueId: 'myLambdaSample' /* required */
        }; //definition object contains pipeline definition
        datapipeline.createPipeline(params, function(err, pipelineIdObject) {
          if (err) {
              console.log(err, err.stack); // an error occurred
              context.fail('Error', "Error creating pipeline: " + err);
          }
          else     { //new pipeline created with id=pipelineIdObject.pipelineId
              console.log(pipelineIdObject);           // successful response
              //Create and activate pipeline
              var params = {
                  pipelineId: pipelineIdObject.pipelineId,
                  pipelineObjects: definition.pipelineObjects//(you can add parameter objects and values too)
              } //Use definition from the getPipelineDefinition API result
              datapipeline.putPipelineDefinition(params, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   context.fail('Error', "Error putting pipeline definition: " + err);
               }
               else { 
                   datapipeline.activatePipeline(pipelineIdObject, function(err, data) { //Activate the pipeline finally
                      if (err) {
                          console.log(err, err.stack);
                          context.fail('Error', "Error activating pipeline: " + err);
                      }
                      else console.log(data);
                      context.succeed();
                   });
               }
              });
        }});
    }});
};

——————-

Related

Automating Analytic Workflows on AWS