1

One application that consumes data from 2 Kafka Topics and joins related events is continuously failing whenever the list state is cleaned by TTL config. The logic of the flatMap is a little bit different from the majority I have seen around. Every event from the first stream may join multiple times with events from the second stream, so this is a one to many context.

This is the code of the join:

val join = streamA
    .connect(streamB)
    .flatMap(
      new JoinOneToManyStreams[StreamA, StreamB](
        "streamA",
        "streamB",
        Time.minutes(3)
      )
    ).uid("join")
    .keyBy(_.streamB.id)

This is the class that implements the logic:

class JoinOneToManyStreams[A, B] (stateDescriptorA: String, stateDescriptorB: String, time: Time) extends RichCoFlatMapFunction[A, B, StateOut[A, B]] {
    val ttlConfig: StateTtlConfig = StateTtlConfig
        .newBuilder(time)
        // Set TTL just once
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .cleanupInRocksdbCompactFilter(1000)
        .build

    val streamAStateDescriptor = new ValueStateDescriptor[StateIn[A]](stateDescriptorA, classOf[StateIn[A]])
    streamAStateDescriptor.enableTimeToLive(ttlConfig)
    lazy val streamAState: ValueState[StateIn[A]] = getRuntimeContext.getState(streamAStateDescriptor)

    val streamBStateDescriptor = new ListStateDescriptor[StateIn[B]](stateDescriptorB, classOf[StateIn[B]])
    streamBStateDescriptor.enableTimeToLive(ttlConfig)
    lazy val streamBState: ListState[StateIn[B]] = getRuntimeContext.getListState(streamBStateDescriptor)

    override def flatMap1(streamA: A, out: Collector[StateOut[A, B]]): Unit = {
        streamAState.update(StateIn[A](stateValue = streamA))
        val stateB = streamBState.get
        if (stateB.nonEmpty) {
        streamBState.clear()
        stateB.forEach(row => {
            out.collect(StateOut(streamA, row.stateValue))
        })
        }
    }

    override def flatMap2(streamB: B, out: Collector[StateOut[A, B]]): Unit = {
        streamBState.add(StateIn(streamB))
        val stateA = streamAState.value
        if (stateA != null) {
        val stateB = streamBState.get
        streamBState.clear()
        stateB.forEach(row => {
            out.collect(StateOut(stateA.stateValue, row.stateValue))
        })
        }
    }
}

As you can see, cleanupInRocksdbCompactFilter is used, but I tried fullSnapshot as well and the same behavior was noticed.

Error:

    Exception in thread "Thread-19" java.lang.IllegalArgumentException: classLoader cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:477)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:337)
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:151)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:193)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:180)

I know that the list state is the problem because I disabled the tll for this type of state and the problem stopped. Is there any way to make it work?

  • Are streamA and streamB both keyed streams? How are they keyed? – David Anderson Apr 29 '21 at 20:58
  • Yes, they are. val streamA= env.addSource(streamAConsumer).keyBy(_.id) – Ygor de Fraga Apr 29 '21 at 22:37
  • This is the bug reported and discussed in https://stackoverflow.com/questions/60745711/flink-kryo-serializer-because-chill-serializer-couldnt-be-found. See also https://issues.apache.org/jira/browse/FLINK-16686. – David Anderson Apr 30 '21 at 07:40
  • Got it. The workaround suggested is to use either primitive types or POJOS to avoid this problem. The state I have in the state list is of the type StateIn, which has just one param called StateValue. Since StateIn is a case class, isn't it considered a POJO? – Ygor de Fraga Apr 30 '21 at 14:05
  • No. Case classes are not POJOs, and moreover, generic types (and type erasure) cause problems for Flink's type inference system. – David Anderson Apr 30 '21 at 14:52
  • Ok, but how can I transform my case classes in POJOs? – Ygor de Fraga Apr 30 '21 at 16:05

0 Answers0