0

I am trying to access the inMemoryStore that I am creating with in the same java program. But returning a exception as "Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, storeName, may have migrated to another instance."

When I am using the persistentKeyValueStore it is working fine and able to create the store and return the values.

package com.bakdata.streams_store.demo;

import java.util.Collection;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.stre7ams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;

public class InMemoryStore {

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-id-0001");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    String storeName = "sample";
    KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);
    StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(stateStore, Serdes.String(), Serdes.String());

    StreamsBuilder builder = new StreamsBuilder();
    builder.addStateStore(storeBuilder);
    KStream<String, String> inputStream = builder.stream("material_test1");
    KafkaStreams streams = new KafkaStreams(builder.build(), props);

    try {
        streams.start();
        Thread.sleep(30000);
    } catch (final Throwable e) {
        System.exit(1);
    }
    final ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
    KeyValueIterator<String, String> range = keyValueStore.all();
    while (range.hasNext()) {
        KeyValue<String, String> next = range.next();
        System.out.println("Key: " + next.key + ", value: " + next.value);
    }
}
}

Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, sample, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:62) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1067) at com.bakdata.streams_store.demo.InMemoryStore.main(InMemoryStore.java:59)

I am expecting to print the values from the ReadOnlyStoreQuery.

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
Patan
  • 21
  • 1
  • 6
  • "adding" a store itself is not sufficient. You also need to "connect" it to a `Processor`. Otherwise the store in "dangling" and not accessible. – Matthias J. Sax Jul 17 '19 at 21:38

1 Answers1

0

You cannot have a StateStore on a stream as there could be multiple values for a single key. You need to turn it into a KTable (streams.table(...)) or GlobalKtable (streams.globalTable(...)) first.

Kotlin example:

val businessObjects = streamsBuilder.globalTable("topic", eventStore("store-name"))

where eventStore is:

fun eventStore(name: String) = Materialized.`as`<String, String>(Stores.inMemoryKeyValueStore(name))
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.String())

after you started the streams:

var store: ReadOnlyKeyValueStore<String, String> = streams.store("store-name", keyValueStore<String, String>())

Note: There is also an interface KafkaStreams.StateListener for when the streams are ready

 override fun onChange(newState: KafkaStreams.State?, oldState: KafkaStreams.State?) =
    Option.fromNullable(newState)
        .filter { REBALANCING == oldState && RUNNING == it }
        .map { store = streams.store("store-name", keyValueStore<String, String>()) }
        .getOrElse { log.info("Waiting for Kafka being in REBALANCING -> RUNNING, but it is $oldState -> $newState") }

Alternatively you also could turn your stream into a KTable with

stream.groupByKey().reduce(...)

like described here.

sschrass
  • 7,014
  • 6
  • 43
  • 62
  • Thank you for the answer that is what actually I am looking for. Also I would like to know is there anyway that we can access the stateStore as well in global way to perform the lookup activity. – Patan Jul 15 '19 at 09:06
  • depends on what table you chose. if its a globaltable you create a store on, you can access in it with `store.get("id")` globally. – sschrass Jul 15 '19 at 09:12
  • Thank you for your response, but I would like to access the store from another instance of streams. Ex: I have created a global table in a separate streams application (streams instance_1) and then from another application (streams instance_2) by calling the store.get("id") method is it possible to get the store content? if not how to to achieve this. Thank you in advance. – Patan Jul 22 '19 at 07:58