2

I am accepting two kinds of records A and B in Streamsets v3.21 - there is a common field called correlationid common between the parent type A and multiple child type B. Type A always arrives first. Type A and Type B get written to separate elasticsearch indices on the same cluster from the same pipeline. The sending and composition of type A and type B is not within my control. They are pre-processed by Logstash 7.81 by a filter group to which I can add new files, but not alter existing ones.

There is a field X on type A that I need to put in the Type B records that get written to elasticsearch. Does anyone know a way of making elasticsearch update the type B when they arrive by looking up type A? Alternatively can anyone tell me a way of looking up the type A on elasticsearch (from streamsets) before type B are written and applying value X to the type B records? _Alternatively_I've considered using an environment variable named as correlationid with value X so that I can look it up but I'm concerned about blowing the heap as I can never know when to remove the env var as there can be up to N type B records
Alternatively maybe logstash could cache the value of correlationid and X somehow; there is a filter called "environment" whcih would allow me to store env_vars for type A and apply them to type B but I can find no way to clear it down periodically

bigbadmouse
  • 216
  • 1
  • 11
  • i'm experimenting with a timer ES origin that reads all Bs with a blank job name and then looks up their parent A using an HTTP processor stage and then uses an ES destination to PUT the Bs back with updated columns from the A . – bigbadmouse Mar 02 '21 at 12:47

3 Answers3

1

How about using the jython evaluator and 'state' object. You can (carefully) use the state object for a cache and just add a field to a record before sending to elastic.

eze
  • 2,332
  • 3
  • 19
  • 30
  • thanks for answering - in the end i just gave up (even streamsets consultants didnt know) and wrote it myself in threaded java using ES API classes. I've plussed you as your answer is a good solution, even if i'm not familiar with it. It _would_ work. – bigbadmouse Apr 21 '21 at 07:56
0

In the end I just gave up (even streamsets consultants didnt know) and wrote it myself in threaded java using ElasticSearch API classes

bigbadmouse
  • 216
  • 1
  • 11
0

You can also setup JDBC driver for elasicsearch. https://www.elastic.co/guide/en/elasticsearch/reference/master/sql-jdbc.html

And then use JDBC lookup stage in your pipeline. JDBC lookup stage supports providing a JDBC driver class. https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JDBCLookup.html

To be 100% honest, I didn't try it with elasticsearch driver. But did try with other JDBC drivers.

It used to work great.

Another way of doing that is to use Scala or Groovy evaluator. And perform the lookup inside the scala or Groovy code.

Andrey E
  • 856
  • 8
  • 18