20

I have a Firehose stream that is intended to ingest millions of events from different sources and of different event-types. The stream should deliver all data to one S3 bucket as a store of raw\unaltered data.

I was thinking of partitioning this data in S3 based on metadata embedded within the event message like event-souce, event-type and event-date.

However, Firehose follows its default partitioning based on record arrival time. Is it possible to customize this partitioning behavior to fit my needs?

Update: Accepted answer updated as a new answer suggests the feature is available as of Sep 2021

mowienay
  • 1,264
  • 4
  • 19
  • 32
  • Similar to: [Partitioning AWS Kinesis Firehose data to s3 by payload](https://stackoverflow.com/q/45432265/174777) – John Rotenstein Jul 13 '18 at 04:33
  • @JohnRotenstein Unfortunately answers do not address the question. Both suggesting attaching a lambda function that would route the incoming data based to different streams based on a particular ID. This and the other question was addressing whether it is possible to define the partitioning methodology for firehose. Thank you for the reference, though !! – mowienay Jul 13 '18 at 13:23

6 Answers6

14

As of writing this, the dynamic partitioning feature Vlad has mentioned is still pretty new. I needed it to be a part of CloudFormation template, which was still not properly documented. I had to add in DynamicPartitioningConfiguration to get it working properly. MetadataExtractionQuery syntax was also not properly documented.

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

Yves M.
  • 29,855
  • 23
  • 108
  • 144
Murali Varma
  • 193
  • 2
  • 7
  • 5
    It is April 2022 and there is still no documentation I could find on MetadataExtractionQuery. This answer was really helpful to figure out the syntax. Thanks! – BjornO Apr 22 '22 at 08:54
  • There are some examples here: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html ...but they are rather unclear compared to the simple partitioning in this answer – KC54 May 18 '23 at 23:52
  • Great to have a working example! However, I had to write `Prefix: "clients/client_id=!{partitionKeyFromQuery:client_id}/dt=!{timestamp:yyyy-MM-dd}/"`, maybe it's useful for someone else. – Gabriele Jun 09 '23 at 15:18
  • For anyone else not being able to figure it out on the spot: the "Dynamic partitioning keys" in the UI correspond to the "ProcessingConfiguration" here in the template. – Gabriele Jun 09 '23 at 15:21
10

Since September 1st, 2021, AWS Kinesis Firehose supports this feature. Read the announcement blog post here.

From the documentation:

You can use the Key and Value fields to specify the data record parameters to be used as dynamic partitioning keys and jq queries to generate dynamic partitioning key values. ...

Here is how it looks like from UI:

enter image description here enter image description here

Vlad Holubiev
  • 4,876
  • 7
  • 44
  • 59
5

No. You cannot 'partition' based upon event content.

Some options are:

  • Send to separate Firehose streams
  • Send to a Kinesis Data Stream (instead of Firehose) and write your own custom Lambda function to process and save the data (See: AWS Developer Forums: Athena and Kinesis Firehose)
  • Use Kinesis Analytics to process the message and 'direct' it to different Firehose streams

If you are going to use the output with Amazon Athena or Amazon EMR, you could also consider converting it into Parquet format, which has much better performance. This would require post-processing of the data in S3 as a batch rather than converting the data as it arrives in a stream.

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
2

To build on John's answer, if you don't have the near real-time streaming requirements, we've found batch-processing with Athena to be a simple solution for us.

Kinesis streams to a given table unpartitioned_event_data, which can make use of the native record arrival time partitioning.

We define another Athena table partitioned_event_table which can be defined with custom partition keys and make use of the INSERT INTO capabilities that Athena has. Athena will automatically repartition your data in the format you want without requiring any custom consumers or new infrastructure to manage. This can be scheduled with a cron, SNS, or something like Airflow.

What's cool is you can create a view that does a UNION of the two tables to query historical and real-time data in one place.

We actually dealt with this problem at Radar and talk about more trade-offs in this blog post.

J Kao
  • 2,023
  • 2
  • 15
  • 16
2

To expand on Murali's answer, we have implemented it in CDK:

Our incomming json data looks something like this:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

The CDK code looks as following:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})
  • Do you know how to use 2 fields as 2 separate values? – ArielB Dec 15 '21 at 09:20
  • 3
    sample parameter with 2 fields, { parameterName: 'MetadataExtractionQuery', parameterValue: '{Topic:.data.defaultTopic,out1:.data.data.OUT1}', } – MikA Apr 25 '22 at 16:14
  • 1
    Thank you, @MikA ! I think this is the only place on the web that this is documented. – sambol Nov 21 '22 at 16:38
0

My scenario is:

Firehose needs to send data to s3, which is tied to glue table, parquet as format, and dynamic partitioning enabled since I want to consider the year, month, and day from the data I push to firehose instead of the default.

Below is the working code

  rawdataFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Join ["-", [rawdata, !Ref AWS::StackName]]
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt rawdataS3bucket.Arn
        Prefix: parquetdata/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        ErrorOutputPrefix: errors/
        RoleARN: !GetAtt FirehoseRole.Arn
        DynamicPartitioningConfiguration:
          Enabled: true
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{year:.year,month:.month,day:.day}"
                - ParameterName: "JsonParsingEngine"
                  ParameterValue: "JQ-1.6"
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              HiveJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseRole.Arn
            DatabaseName: !Ref rawDataDB
            TableName: !Ref rawDataTable
            Region:
              Fn::ImportValue: AWSRegion
            VersionId: LATEST

  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: !Sub firehose-glue-${Envname}
          PolicyDocument: |
            {
              "Version": "2012-10-17",
              "Statement":
                [
                  {
                    "Effect": "Allow",
                    "Action":
                      [
                        "glue:*",
                        "iam:ListRolePolicies",
                        "iam:GetRole",
                        "iam:GetRolePolicy",
                        "tag:GetResources",
                        "s3:*",
                        "cloudwatch:*",
                        "ssm:*"
                      ],
                    "Resource": "*"
                  }
                ]
            }

Note:

rawDataDB is a reference to glue database

rawDataTable is a reference to table

rawdataS3bucket is a reference to s3 bucket

U Swaroop
  • 31
  • 1
  • 1
  • 3