2

If there is a shared variable in my kafka streams application and that is updated by multiple threads in the processing code, how is that handled? Do I have to make that shared-variable thread safe or is that some how handled by the Kafka streams library? Somewhere in the docs, I read that there is no need to co-ordinate between threads when running a Kafka streams app. For example, here is a pseudo code:

KStream<byte[], byte[]> input = ...;
int counter = 0;

KStream<byte[], byte[]>[] processed = input.map(
    (k, v) -> {
      ....
      ....
      //update counter by multiple threads.
);

What will happen to counter if this code is executed by multiple stream tasks from the same app instance? How about the variable "processed" as this can also be updated by multiple threads? This requires some kind of synchronization in normal Java scenario. I am curious if that is handled by the Kafka streams library.

Thank you!

sobychacko
  • 5,099
  • 15
  • 26

1 Answers1

2

It depends on how many threads you’ve configured to execute your tasks. If you have one thread executing all your tasks, then you don’t have to make that shared variable thread safe. But if you have more than one thread, you will need to make it thread safe because tasks inside your application instance will be distributed among multiple threads. Your Kafka Streams application is just a running JVM that you start with main(). The Kafka Streams framework orchestrates processing based on the number of threads you specify. But it’s just a regular Java runtime and concurrent access is still concurrent access.

More regarding threads and tasks here: Kafka Streams thread number

More regarding threads and tasks and shared state: Kafka stream processor thread safe?

Obviously, generally speaking, the pattern you show in your code sample is one you probably want to avoid unless it’s actually just counting something application local. In a production application where you’re running multiple application instances, tasks get redistributed if an application instance goes up or down, so your shared variable probably won’t be useful. That’s what makes the Kafka Streams store mechanism so useful: your state moves with the tasks.

Dmitry Minkovsky
  • 36,185
  • 26
  • 116
  • 160
  • Thank you for the answer. Agreed on the "counter" example, i just used that as an example. You answer confirms what I was thinking. Basically, I need to synchronize on any shared data structure. Then I found this comment - https://stackoverflow.com/questions/39985048/kafka-streaming-concurrency#comment79840080_39992430. That comment seems to indicate that it is okay to share resources among threads. – sobychacko Feb 01 '18 at 22:59
  • I think it says that since a value joiner belongs to at most one thread (aka will only ever be executed by one thread), any state that it may have is thread safe. That is, if instead of lambda you implement the value joiner as a full on class with some local private property, it's `apply` function will only be called by at most one thread, so that local private property does not have to be synchronized. But the state in your example would potentially be shared by many threads, because it can be accessed by many value joiners running in various threads. So it needs to be synchronized. – Dmitry Minkovsky Feb 01 '18 at 23:04