0

I am getting the following exception when i try to retrieve an entry from kafka streams persistent state store: org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state store kafka-state-dir because the stream thread is STARTING, not RUNNING

I am using spring boot and kafka streams Here is my code:

Configuration class

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "applicationId");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.messageContextSerde().getClass().getName());
        props.put(STATE_DIR_CONFIG, "/app/kafka-state-dir");

        return new KafkaStreamsConfiguration(props);
    }

The kafka streams processor:

@Component
public class Processor {

    private static final Logger logger = LoggerFactory.getLogger(MessageFinProcessor.class);
    private static final Serde<String> STRING_SERDE = Serdes.String();
    public static final String FIN_CACHE = "kafka-state-dir";

    @Value("${kafka.topic.fin.message.ctx}")
    private String finMessageContextTopic;

    @Autowired
    private void process(StreamsBuilder streamsBuilder) {
        logger.info("Starting MessageFinProcessor on topic {}", finMessageContextTopic);

        streamsBuilder.table(finMessageContextTopic,
                Materialized.<String, MessageContext, KeyValueStore<Bytes, byte[]>>as(FIN_CACHE)
                        .withKeySerde(STRING_SERDE)
                        .withValueSerde(CustomSerdes.messageContextSerde()));
    }

}

The Service where i am retrieving the entry from the cache:


@Service
public class KafkaStreamsStorageService {

    private final StreamsBuilderFactoryBean streamsFactoryBean;
    public static final String FIN_CACHE = "kafka-state-dir";
    public KafkaStreamsStorageService(StreamsBuilderFactoryBean streamsFactoryBean) {
        this.streamsFactoryBean = streamsFactoryBean;
    }

    public MessageContext get(String correlationId) {
        KafkaStreams kafkaStreams = streamsFactoryBean.getKafkaStreams();
        if (kafkaStreams != null) {
            ReadOnlyKeyValueStore<String, MessageContext> keyValueStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(
                    FIN_CACHE, QueryableStoreTypes.keyValueStore()));
            return keyValueStore.get(correlationId);
        }
        return null;
    }
}

Inside the container where my java app runs i see only the following files in store dir:

ls -a /app/kafka-state-dir/applicationId
.lock
kafka-streams-process-metadata

Here is my Dockerfile:

FROM ...
ENV JVM_MEM_ARGS=-Xms128m\ -Xmx2g
ARG JAR_FILE
ADD ./target/${JAR_FILE} /app/myapp.jar
WORKDIR /app
CMD ["./run.sh", "myapp.jar"]

Here is also the volumes that i pass to the service in docker-compose.yml

...
volumes:
      - /home/myuser/kafka-streams/:/app/kafka-state-dir/

Whereas when i run my app as a standalone java jar from intelij (with different profile), the whole procedure works as expected (i can retrieve the entry from the persistent store) and i see the following files inside the store dir:

ls -a /app/kafka-state-dir/applicationId
0_0
0_1
0_2
0_3
kafka-streams-process-metadata
.lock
ls kafka-state-dir/applicationId/0_0
.checkpoint
rocksdb

I have tried many different paths for the state.dir in order kafakstreams lib to be able to find it, but none of them worked. Do you have any ideas?

Thanks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Why do you need to mount a directory from your host? Have you tried using a managed Docker volume instead? – OneCricketeer Aug 24 '22 at 21:19
  • No, I haven't. Dont know if the problem is the docker env or the kafka brokers themselves. In my logs i am getting continuously the following: : WARN 1 --- [read-1-producer] org.apache.kafka.clients.NetworkClient : [Producer clientId=applicationIdr-450a24af-de07-40c8-9324-744da1bd3671-StreamThread-1-producer] Bootstrap broker :IP (id: -2 rack: null) disconnected – Efth Efthim Aug 25 '22 at 07:23
  • For that particular error, please see https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker – OneCricketeer Aug 26 '22 at 17:48

0 Answers0