Recently I was implementing a Kryo serializer for a Kafka Streams DSL application. Kryo is not thread-safe by default, and serialization methods were throwing exceptions most likely caused by unsynchronized access from multiple threads. Adding synchronization solved the problem, but raised some questions.
What is the threading model of Kafka Streams application with respect to different processing objects? Which objects are shared between threads and which are being used by a single thread only? Is it safe to have an unsynchronised local state (fields, not state stores) in these objects?
I'm especially interested in Processor
/ Transformer
and Serializer
/ Deserializer
objects.
I've seen this answer, but it's still not clear to me. Note I'm not trying to share any state between threads, but rather to avoid having such shared state.
Both DSL and PAPI require a supplier (i.e. factory) for Processor
s / Transformer
s, Serde
interface is also a factory, so I assumed that a single instance is created per thread or per task. This assumption seems to be false, but at the same time it's very weird to accept a factory, create multiple instances and then access them from multiple threads at the same time.
My Serde
implementation basically looks like this (a new Kryo instance is created per Serializer
/ Deserializer
instance):
public class MySerde<T> implements Serde<T> {
@Override
public Serializer<T> serializer() {
final Kryo kryo = new Kryo();
return (topic, data) -> { /* use kryo instance */ };
}
@Override
public Deserializer<T> deserializer() {
final Kryo kryo = new Kryo();
return (topic, data) -> { /* use kryo instance */ };
}
}
Serdes were invoked for reading / writing repartition topic:
stream
.groupByKey(Grouped.with(/* set serdes here */))
.windowedBy(...)
.aggregate(...)