2

I'm finding issues running a job in a specific small cluster and in my local machine. The job runs smoothly on larger machines. I'm using:

  • com.twitter "chill-protobuf" 0.7.6
    • .excluding com.esotericsoftware.kryo "kryo"
  • com.google.protobuf "protobuf-java" 3.18.1

I'm using:

volumeMounts:
  - name: rocksdb-volume
    volume:
      emptyDir:
        sizeLimit: 5Gi
      name: rocksdb-volume
    volumeMount:
      mountPath: /opt/flink/rocksdb

Logging into one taskmanager I can see that the path /opt/flink/rocksdb has 1GB and the k8s does not indicate DiskPressure.

And with the following resources:

numberOfTaskManagers: 2
parallelism: 4
resources:
   taskmanager:
      cpu: 0.5
      memory: 6G

2 TMs with 6G is too much compared to other clusters that run the job nicely with more load.

I'm getting the following error:

[FlinkOperatorName] (2/2)#15 (c2f42784dab9c9046192b067c0115499) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Could not create class [ProtobufClassName]
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
    at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:76)
    at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:29)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:126)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left.
    at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
    at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 23 more
Caused by: java.io.EOFException: No more bytes left.
    ... 26 more

The Protobuf + Kryo read + EOFException tells that the issue might be reading either checkpoints or savepoints.

What should I be looking at?

Juancki
  • 1,793
  • 1
  • 14
  • 21

1 Answers1

1

The issue is related to a bad tombstone being added to the Kafka topic that is the input to this job.

Kafka differentiates between null and empty "" values. I was incorrectly feeding "" instead of null.


From the error stack trace, the issue is not happening under the deserialiser code but elsewhere. Not sure how it can build a Protobuf Object that then is not serialisable by Kryo.

Juancki
  • 1,793
  • 1
  • 14
  • 21