I'm obviously a beginner with kafka/kafka streams. I just need to read given messages from a few topics, given their id. While our actual topology is fairly complex, this Stream app just needs to achieve this single simple goal
This is how a store is created :
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table(
topic,
Materialized.<String, String>as( persistentKeyValueStore(storeNameOf(topic)))
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withCachingDisabled());
// Materialized.<String, String>as( inMemoryKeyValueStore(storeNameOf(topic)))
// .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
// .withCachingDisabled());
);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), new Properties() {{ /** config items go here**/ }})
kafkaStreams.start();
//logic for awaiting kafkaStreams to reach `RUNNING` state as well as InvalidStateStoreException handling (by retrying) is ommited for simplicity :
ReadOnlyKeyValueStore<String, String> replyStore = kafkaStreams.store(storeNameOf(topicName), QueryableStoreTypes.keyValueStore());
So, when using the commented inMemoryKeyValueStore
materialization replyStore
is sucessfully created and I can query the values within without a problem
With persistentKeyValueStore
the last line fails with java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR
. Note that I do check that KafkaStreams is in state RUNNING
before the store
call; the ERROR state is reached somehow within the call rather.
Do you think i might have missed anything when setting up the persistent store? Debugging hints would also greatly help, i'm quite stuck here I must confess Thanks !
Edit : The execution happens under a docker container. This was quite relevant but I ommited to add initialy