Triggering ETL from an S3 Event via AWS Lambda

Overview

This example links the arrival of a new object in S3 and automatically triggers a Matillion ETL job to load it, transform it and append the transformed data to a fact table.

S3 File Lands

A file could be uploaded to a bucket from a third party service for example Amazon Kineses, AWS Data Pipeline  or Attunity directly using the API to have an app upload a file.
 

Trigger an AWS Lambda Function

  1. Lambda functions can be triggered whenever a new object lands in S3. The function is passed some metadata too, including the object path.

  2. Matillion ETL can run a job whenever a message arrives on an SQS queue. The message tells the tool which job to run, and any variables it needs. (In this case, the filename that just arrived in S3 is the only variable we need.)

The diagram below outlines the basic architecture
 


 

This looks quite complex however it is just a very simple Lambda function to glue those processes together.

The Lambda Function

To get started:-

  1. In the AWS Management Console Navigate to Services → Lambda
  2. Select Create a Lambda Function.
  3. Choose s3-get-object-python.
  4. Configure the correct S3 source for your bucket.

  1. Click Next, enter a Name for the function.
  2. Change the python handler name to lambda_handler. The line should now read "def lambda_handler (event, context):'
  3. The function needs a role. That role needs to be able to monitor the S3 bucket, and send the SQS message. This can all be done with simple managed policies from the IAM console. Create or attach an IAM Role with permissions to write to SQS Queues (e.g. Policy - AmazonSQSFullAccess).
  4. You can add the code below as a starting point for your function (you can edit this further later)

 

AWS Lambda supports Python, and includes the Python API for AWS. So no compilation or 3rd party libraries are required for this function, it can even be written directly into the AWS console. Below is an example lambda function to to get started. The elements highlighted in red will need to be edited for your environment and are explained below.
 


import boto3
import json

sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='ETL_Launchpad')
def handler (event, context):

    for record in event['Records']:
        filename_from_event = record['s3']['object']['key']
        sqs_msg = {
            "group":       "AWS"
           ,"project":     "Lambda"
           ,"version":     "default"
           ,"environment": "live"
           ,"job":         "RunETL"
           ,"variables":  {
                "file_to_load": filename_from_event
            }
        }
        queue.send_message(MessageBody=json.dumps(sqs_msg))
    return event



In the above example the variables are used as follows. All must exactly match the text name strings used elsewhere.

  • QueueName - The name of the Amazon SQS Queue that will be used to store and pass the messages.
  • Group - The project group name in Matillion ETL.
  • Project - The project name in Matillion ETL.
  • Version - The name of the version of the project to execute in Matillion ETL.
  • Environment - The name of the environment to execute against in Matillion ETL.
  • Job - The name of the orchestration job in Matillion ETL.
  • Variables - The variables that are passed to the job in Matilion ETL. In this case we just pass one variable “file_to_load” however it is possible to pass as many as required. Later we will show how this variable is used.

 

It might be sensible to add some more checking, commenting and so on. The key to writing these functions effectively is to understand the format of the event and context objects pass in, and that depends to some extent on the services that trigger the function.

The function needs a role. That role needs to be able to monitor the S3 bucket, and send the SQS message. That can all be done with simple managed policies from the IAM console.

  1. In the Role dropdown create a new S3 Execution Role. Once this is created we will also need to  modify this role to allow SQS access:-
    1. In Services → IAM → Roles, select the role you created.
    2. Click Attach Policy and add AmazonSQSFullAccess.
  2. Click Next, review and then Create Function.


Trigger an ETL job to extract, load and transform it.

Here is the Matillion ETL job that will load the data each time a file lands. It maintains the target table, and on each run truncates it and loads the latest file into it. It then runs a data transformation on the loaded data which adds some calculated fields, looks up some details of the airline and airport, and finally appends the results to the final fact table.

The exported job and data files are available at the bottom of this page. Be sure to download the json that applies to your platform (named RS_ for Redshift, SF_ for Snowflake).

 


 

Each time this runs we want to load a different file. So we can define a variable for that - we already saw in the Lambda function that we intend to pass a variable named “file_to_load”, so we should define that within Matillion ETL and provide a default value we can use to test the job. To do this:-

  1. Select ProjectEdit Environment Variables.
  2. Create an environment variable as below.



 

This is referenced in the component Load Latest File (an S3 Load Component) as the S3 Object Prefix parameter. In this case, the entire path to the file is provided by the lambda function.
 


 

Since all variables must have a default value, the job can be tested in isolation. Then, to actually run this whenever a new Queue message arrives (via our Lambda function) we can configure SQS within Matillion ETL.

To do this select ProjectSQS

The SQS Configuration has a Listen section:


 

Notice the Listen Queue is the queue that our Lambda function writes to. Notice also that Matillion ETL is using the Instance Credentials attached to the EC2 Instance - it could have used a manual access key and secret key, but either way the credentials must be able to read from SQS to pick up the message, and read from S3 to read the data files.

Now you are ready to add some files into the bucket and trigger the Job, you can see the job executing in your task panel or via ProjectTask History.

Further Consideration

In a production environment you should probably also consider:

  • What if the ETL job can’t keep up with the arrival rate?
    • There is some overhead loading a file, so each run will take a certain minimum amount of time. If jobs are stacking up behind one another, you probably need a smaller number of larger files to make that overhead worthwhile.
    • Take a clone of the EC2 instance running Matillion ETL and launch more instance from it. The ETL job, the connection details and the SQS listen configuration will all be cloned, and you will now have two instance that can listen for and process arrival messages. 
  • What if it fails?
    • The SQS configuration you also post return messages on success and failure. They could be handled by lambda functions that move the S3 files into “success” and “failed” folders.
    • We can also post an SNS message notifying that a load succeeded or failed.

Below are links for the exported job and data files. Be sure to download the json that applies to your platform (named RS_ for Redshift, SF_ for Snowflake).