3

I have a stream that I want to partition using a certain key, and then run through several transformations, each using a state. When I call keyBy() I get a KeyedStream and the next transformation can access a partitioned state correctly, but another transformation chained after that gets an exception when trying to access a partitioned state. The exception is:

State key serializer has not been configured in the config. This operation cannot use partitioned state

It seems that the key information is only passed to the first transformation and not further down the chain.

The code I try to run is along the lines of this code (but actually does something):

DataStream<Long> newStream = eventsStream
    .keyBy("username")
    .filter(new RichFilterFunction<Event>() {
        private ValueState<Boolean> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE1", Boolean.class, Boolean.TRUE));
        }

        @Override
        public boolean filter(Event value) throws Exception {
            return stateStore.value();
        }
    })
    .map(new RichMapFunction<Event, Long>() {
        private ValueState<Long> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            stateStore = getRuntimeContext().getState(new ValueStateDescriptor<>("VALUE2", Long.class, 0L));
        }

        @Override
        public Long map(Event value) throws Exception {
            return Long.parseLong(value.data) + stateStore.value();
        }
    });

This code will throw an exception at the second getState() call.

I can call keyBy() again, but then I remove the ability to chain the operations. Can I manually manipulate the objects of the stream graph so that the key information is passed, or is this sort of chaining not supported?

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137

1 Answers1

3

You can not.

Even if you would call keyBy() a second time (or somehow pass the "key-ed" information downstream), you would get a new state because a state is associated to a single operator only.

As as workaround, you need to merge both operators into one.

If you think this feature might be helpful, feel free to suggest it at dev@flink.apache.org.

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137