1

I have a stream which gives messages map to two different map() call and further is filtered and written to two different topics.

KStream<String, byte[]>[] stream = builder.<String, byte[]>stream("source-topic");

stream.map(logic1OnData).filter(
                (key, value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic1", Produced.with(Serdes.String(), Serdes.String())

stream.map(logic2OnData).filter(
                (key, value) -> {
                    if (key == null || value == null)
                        return false;
                    return value.data() != null;
                }).to("topic2", Produced.with(Serdes.String(), Serdes.String())

Is there a way I can run stream.map(logc1OnData)... and stream.map(logic2OnData) parallel? Looks like they are running one after other i.e. the first map is executed and written to topic1 and then second map is executed and written to topic2 FYI.. I don't want num.threads.count as my stream input is from single topic and I am running multiple instances of the same application to read from source-topic topic to achieve parallelism while consuming.

What I am looking is parallelism while executing and writing to different topics

1 Answers1

1

What you are looking at is the order in which your operations are added to the topology. Once the topology is executed the recorder will flow through the otpology in the order they arrive but logic2OnData will not wait for logic1OnData to finish processing before it runs.

If you are concerned about performance you can look into stream threads if you want to get more parallelism.

EDIT: it seems I may have miss-understood the question.

A single sub-topology does not let you run each branch with parallelism. However you can use repartition() to make a logic2OnData into its own sub-topology and everything after the repartition() call will be able to run in parallel with everything before it.

wcarlson
  • 216
  • 1
  • 9
  • Could you please elaborate ? I wanted logic2OnData should execute in parallel to logic1OnData and further writing to their own topics. I see it is happening one after other than parallel. I am running multiple instances of same app but wanted to make these two sub tasks running in parallel in each instance. – Sudarshan sridhar Aug 27 '20 at 19:07
  • As explained by @wcarlosn, your program is a single/fully-connected `Topology` and thus it's executed by a single thread. Check out the output of `Topology#describe()` to see that there is only one sub-topology. A sub-topology is a unit of parallelism that is only parallelized across partitions: for each partition a so-called task is created and a task is executed by a thing thread; there is no parallelism within a task. (Cf https://docs.confluent.io/platform/current/streams/architecture.html) – Matthias J. Sax Dec 20 '20 at 18:03
  • If you really want to get more parallelism, you need to split your topology into multiple sub-topologies: sub-topologies are parts of your overall topology that are only connected via topics to each other. If you insert a `repartition()` as suggested by @wcarlson, you will get two sub-topologies, and thus tasks will be created for each sub-topology. Thus, those task can be executed in parallel. -- You still need to increase `num.stream.thread`, because if you have only one thread, it can only executed one task at a time, and thus, if would only "round robin" through all assigned tasks only. – Matthias J. Sax Dec 20 '20 at 18:07