I am accepting two kinds of records A and B in Streamsets v3.21 - there is a common field called correlationid common between the parent type A and multiple child type B. Type A always arrives first. Type A and Type B get written to separate elasticsearch indices on the same cluster from the same pipeline. The sending and composition of type A and type B is not within my control. They are pre-processed by Logstash 7.81 by a filter group to which I can add new files, but not alter existing ones.
There is a field X on type A that I need to put in the Type B records that get written to elasticsearch. Does anyone know a way of making elasticsearch update the type B when they arrive by looking up type A? Alternatively can anyone tell me a way of looking up the type A on elasticsearch (from streamsets) before type B are written and applying value X to the type B records?
_Alternatively_I've considered using an environment variable named as correlationid with value X so that I can look it up but I'm concerned about blowing the heap as I can never know when to remove the env var as there can be up to N type B records
Alternatively maybe logstash could cache the value of correlationid and X somehow; there is a filter called "environment" whcih would allow me to store env_vars for type A and apply them to type B but I can find no way to clear it down periodically