28

I have an AWS Kinesis Firehose Stream set up to feed data into an AWS ElasticSearch cluster, and I can successfully insert documents by sending them to the Firehose Stream, which loads them into ElasticSearch.

But I would like to be able to manually specify/set a document's id value when sending it off to the Firehose Stream. I'm successfully using the AWS PHP SDK to send data to Firehose, I just can't figure out if there's a way to manually set a document's id.

$firehoseParams = [
    'DeliveryStreamName' => 'myStreamName', // REQUIRED
    'Record' => [ // REQUIRED
        'Data' => '{"json_encoded": "data", ...}', // REQUIRED
    ],
];
$firehoseResult = $this->_firehoseClient->putRecord($firehoseParams);

I've tried setting id, _id, and esDocumentId values in the JSON data, all to no avail.

Anyone have any ideas?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Alex Coleman
  • 601
  • 7
  • 18
  • 1
    I tried changing the id once a few years back, and that resulted in some queries not returning the correct values, like when using avg. So you might want to double check it works when you figure it out. – WoodyDRN Jul 16 '21 at 12:17
  • To manually set the document IDs for Elasticsearch indexing, you would need to manipulate the data before it reaches Elasticsearch. One option is to use an AWS Lambda function to process the records before they are indexed by Elasticsearch. By utilizing Lambda, you can modify the records and set a custom document ID. – Arian Sakhaei Jul 03 '23 at 06:59

2 Answers2

0

Firehose Delivery Stream destinations are append-only and in the case of Opensearch (AWS Elasticsearch), do not support upsert. Firehose will generate a unique ID for each record it streams and use that as the document ID. This cannot be user-configured at this time. If you are an AWS Enterprise Support customer, you can request this feature be added to Firehose by talking with your Solution Architect (SA) or Technical Account Manager (TAM).

One possible short-term solution is to use a Kinesis Stream and trigger a Lambda function to upsert documents to Opensearch using the Opensearch APIs. The Python client would push JSON data to the Kinesis Stream, and rather than having the Lambda function only perform transformations, it would trigger for records in the stream, perform the transformation, and handle upserting to Opensearch.

-1

You can use Kinesis Data Streams for this purpose, you can send your documents to the stream and via a lambda function, you can provide the _id property by using the official Elasticsearch API.

rounak tadvi
  • 21
  • 1
  • 5