0

From following this question, AWS DynamoDB Stream into Redshift

DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.

How do I configure my Kinesis function to pick up the Lambda function source?

I created a DynamoDB table (Purchase Sales), and Added DynamoDB Streams. Then I configured the Lambda function to pickup the DynamoDB Stream. My question is how do I configure Kinesis to pick up the Lambda function Source? I know how to configure Lambda Transformation, however would like to pick up as Source. Not sure how to configure the Direct Put Source below.

Thanks,enter image description here

Performed these steps: enter image description here

enter image description here

  • With my understanding, you do not know how the data go from Lambda to Firehose. In this case, you need to Put the record in lambda to firehose yourself by using PutRecords API (https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html#putRecord-property) – kkpoon Jun 11 '18 at 09:47
  • would I write the Put function in Lambda or firehose console? Can you give example syntax, reading the page, firehose.putRecord("PurchaseSalesFirehose",), what is record in the sync? I was hoping Amazon would make this plug-n-play instead of writing scripts, etc –  Jun 11 '18 at 14:45

1 Answers1

1

In your case, you would stream the dynamodb to redshift

DynamoDB --> DynamoDBStreams --> Lambda Function --> Kinesis Firehose --> Redshift.

First, you need a lambda function handle the DynamoDBStream. For each DynamoDBStream event, use firehose PutRecord API to send the data to firehose. From the example

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'STRING_VALUE', /* required */
  Record: { /* required */
    Data: new Buffer('...') || 'STRING_VALUE' /* Strings will be Base-64 encoded on your behalf */ /* required */
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

Next, we have to know how the data being insert into the RedShift. From the firehose document,

For data delivery to Amazon Redshift, Kinesis Firehose first delivers incoming data to your S3 bucket in the format described earlier. Kinesis Firehose then issues an Amazon Redshift COPY command to load the data from your S3 bucket to your Amazon Redshift cluster.

So, we should know what data format to let the COPY command map the data into RedShift schema. We have to follow the data format requirement for redshift COPY command.

By default, the COPY command expects the source data to be character-delimited UTF-8 text. The default delimiter is a pipe character ( | ).

So, you could program the lambda which input dynamodb stream event, transform it to pipe (|) separated line record, and write it to firehose.

var firehose = new AWS.Firehose();
firehose.putRecord({
  DeliveryStreamName: 'YOUR_FIREHOSE_NAME',
  Record: { /* required */
    Data: "RED_SHIFT_COLUMN_1_DATA|RED_SHIFT_COLUMN_2_DATA\n"
  }
}, function(err, data) {
  if (err) console.log(err, err.stack); // an error occurred
  else     console.log(data);           // successful response
});

remember to add \n as the firehose will not append new line for you.

kkpoon
  • 1,939
  • 13
  • 23
  • I am getting error, firehose is not defined; please fix or add /*global firehose */ firehose.putRecord({ DeliveryStreamName: 'PurchaseSalesKinesis', Record: { /* required */ Data: "PurchaseSalesId\n" } }, function(err, data) { if (err) console.log(err, err.stack); // an error occurred else console.log(data); // successful response }); –  Jun 11 '18 at 20:21
  • by reading the document at here https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Firehose.html, you need to create a firehose API client `var firehose = new AWS.Firehose();` and in your run time environment, you need aws-sdk npm package installed https://www.npmjs.com/package/aws-sdk – kkpoon Jun 12 '18 at 00:58