5

I have a Processor-API Processor, which internally forwards to several separate sinks (think of an event classifier, although it also has stateful logic between the events). I was thinking of having a join later between two of those topics. Once a join is made, I forward an updated (enriched) version of the elements to those topics I'm actually joining.

How would you mix DSL if in your Processor API code you forward to more than one sink(sink1, sink2) that in turn are sent to topics?

I guess you could you create separate streams, like

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)

and build from there? However this creates more subtopologies - which are the implications here?

Another possibility is to have your own state store in the Processor API and manage it there, in the same Processor (I'm actually doing that). It adds complexity to the code, but wouldn't it be more efficient? For example, you can delete data you no longer use (once a join is made, you can forward new joined data to sinks and it is no longer eligible for a join). Any other efficiency gotcha?

xmar
  • 1,729
  • 20
  • 48

1 Answers1

5

The simplest way might be to mix Processor API with the DSL by starting with a StreamsBuilder and use transform()

StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
                           .transform(/* put your processor API code here */)
                           .branch(...);

KStream joined = streams[0].join(streams[1], ...);

Writing the intermediate streams into topic first and read them back is also possible. The fact that you get more sub-topologies should be of no concern.

Doing the join manually via states is possible but hard to code correctly. If possible, I would recommend to use the provided join operator from the DSL.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • For this, the output of transform should be of the same type, right? Right now, using process() I can create and send object of each type to their sink (e.g. a "pageview" event comes that will create a Pageview object as well as create/update a Session object) Separately, is there any example of ProcesorAPI-made join? I am using some mutable stores. I guess also, if you can mutate your stores you can optimize space too - e.g. deleting from a KV store data you no longer need. – xmar May 18 '18 at 18:01
  • 2
    Same type would be required -- if you have different types, you would wrap all those with a POJO though that always has only one member set... I am not aware of a join example. However, you can use the DSL join as example (the DSL internally compiles down to PAPI: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L657) – Matthias J. Sax May 18 '18 at 20:23