I am getting the following exception when i try to retrieve an entry from kafka streams persistent state store: org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store kafka-state-dir because the stream thread is STARTING, not RUNNING
I am using spring boot and kafka streams Here is my code:
Configuration class
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "applicationId");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.messageContextSerde().getClass().getName());
props.put(STATE_DIR_CONFIG, "/app/kafka-state-dir");
return new KafkaStreamsConfiguration(props);
}
The kafka streams processor:
@Component
public class Processor {
private static final Logger logger = LoggerFactory.getLogger(MessageFinProcessor.class);
private static final Serde<String> STRING_SERDE = Serdes.String();
public static final String FIN_CACHE = "kafka-state-dir";
@Value("${kafka.topic.fin.message.ctx}")
private String finMessageContextTopic;
@Autowired
private void process(StreamsBuilder streamsBuilder) {
logger.info("Starting MessageFinProcessor on topic {}", finMessageContextTopic);
streamsBuilder.table(finMessageContextTopic,
Materialized.<String, MessageContext, KeyValueStore<Bytes, byte[]>>as(FIN_CACHE)
.withKeySerde(STRING_SERDE)
.withValueSerde(CustomSerdes.messageContextSerde()));
}
}
The Service where i am retrieving the entry from the cache:
@Service
public class KafkaStreamsStorageService {
private final StreamsBuilderFactoryBean streamsFactoryBean;
public static final String FIN_CACHE = "kafka-state-dir";
public KafkaStreamsStorageService(StreamsBuilderFactoryBean streamsFactoryBean) {
this.streamsFactoryBean = streamsFactoryBean;
}
public MessageContext get(String correlationId) {
KafkaStreams kafkaStreams = streamsFactoryBean.getKafkaStreams();
if (kafkaStreams != null) {
ReadOnlyKeyValueStore<String, MessageContext> keyValueStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(
FIN_CACHE, QueryableStoreTypes.keyValueStore()));
return keyValueStore.get(correlationId);
}
return null;
}
}
Inside the container where my java app runs i see only the following files in store dir:
ls -a /app/kafka-state-dir/applicationId
.lock
kafka-streams-process-metadata
Here is my Dockerfile:
FROM ...
ENV JVM_MEM_ARGS=-Xms128m\ -Xmx2g
ARG JAR_FILE
ADD ./target/${JAR_FILE} /app/myapp.jar
WORKDIR /app
CMD ["./run.sh", "myapp.jar"]
Here is also the volumes that i pass to the service in docker-compose.yml
...
volumes:
- /home/myuser/kafka-streams/:/app/kafka-state-dir/
Whereas when i run my app as a standalone java jar from intelij (with different profile), the whole procedure works as expected (i can retrieve the entry from the persistent store) and i see the following files inside the store dir:
ls -a /app/kafka-state-dir/applicationId
0_0
0_1
0_2
0_3
kafka-streams-process-metadata
.lock
ls kafka-state-dir/applicationId/0_0
.checkpoint
rocksdb
I have tried many different paths for the state.dir in order kafakstreams lib to be able to find it, but none of them worked. Do you have any ideas?
Thanks