I'm trying to prepare flowfile to make bulk call with versioning to elasticsearch using standard InvokeHttp processor, since I failed to make it work with PutElasticsearch* processors
Elasticsearch bulk endpoint accept ndjson data format that looks like follow:
{"index": {"_index":"foo","id":"bar", "version":1, "version_type":"external}}\n
{"id":"bar", "version":1, "field1":"field1_value", "field2":"field2_value"}\n
{"index": {"_index":"foo","id":"bizz", "version":2, "version_type":"external}}\n
{"id":"bizz", "version":2, "field1":"field3_value", "field2":"field4_value"}\n
My current setup of processors looks like follow: Change to elasticsearch bulk format flow
I'm extracting id to attribute, version to attribute and flowfile content to attribute message.body
using ExtractText processor with simple regex .*
,
With content in attribute I can then use ConvertRecord processor with RecordReader: JsonTreeReader
and RecordWriter: FreeFormTextRecordSetWriter
with Text property set to:
{"index": {"_index":"foo","_id":"${id}", "version":"${version", "version_type":"external"}}}
${message.body}
Having flowfiles converted now I merge them using MergeContent processor, and sending them to elasticsearch by InvokeHttp processor with url set to ${elastic.url}/foo/_bulk and Content-Type set to x-ndjson type.
Is it possible to make it better? I have this feeling that my solution is not best one, but I'm Newby to Nifi and I couldn't solve it in other way.