0

Recently I was implementing a Kryo serializer for a Kafka Streams DSL application. Kryo is not thread-safe by default, and serialization methods were throwing exceptions most likely caused by unsynchronized access from multiple threads. Adding synchronization solved the problem, but raised some questions.

What is the threading model of Kafka Streams application with respect to different processing objects? Which objects are shared between threads and which are being used by a single thread only? Is it safe to have an unsynchronised local state (fields, not state stores) in these objects?

I'm especially interested in Processor / Transformer and Serializer / Deserializer objects.

I've seen this answer, but it's still not clear to me. Note I'm not trying to share any state between threads, but rather to avoid having such shared state.

Both DSL and PAPI require a supplier (i.e. factory) for Processors / Transformers, Serde interface is also a factory, so I assumed that a single instance is created per thread or per task. This assumption seems to be false, but at the same time it's very weird to accept a factory, create multiple instances and then access them from multiple threads at the same time.


My Serde implementation basically looks like this (a new Kryo instance is created per Serializer / Deserializer instance):

public class MySerde<T> implements Serde<T> {
    @Override
    public Serializer<T> serializer() {
        final Kryo kryo = new Kryo();
        return (topic, data) -> { /* use kryo instance */ };
    }

    @Override
    public Deserializer<T> deserializer() {
        final Kryo kryo = new Kryo();
        return (topic, data) -> { /* use kryo instance */ };
    }
}

Serdes were invoked for reading / writing repartition topic:

stream
    .groupByKey(Grouped.with(/* set serdes here */))
    .windowedBy(...)
    .aggregate(...)
Boris Sukhinin
  • 110
  • 1
  • 7

1 Answers1

0

I've put up a small demo application to detect concurrent access to serdes and transformers: https://github.com/sukhinin/kafka-streams-threading.

  1. Serializers and deserializers are accessed concurrently from multiple threads and thus must be thread-safe.

  2. Transformers are accessed from a single thread at a time.

Still I would be grateful to hear from someone with more Kafka Streams experience.

Boris Sukhinin
  • 110
  • 1
  • 7