7

First of all, this is very similar to Kafka consuming the latest message again when I rerun the Flink consumer, but it's not the same. The answer to that question does NOT appear to solve my problem. If I missed something in that answer, then please rephrase the answer, as I clearly missed something.

The problem is the exact same, though -- Flink (the kafka connector) re-runs the last 3-9 messages it saw before it was shut down.

My Versions

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

My Code

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

My SBT Dependencies

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

My Process

(3 terminals)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

My Expectations

When there are no errors in the system, I expect to be able to turn flink on and off without reprocessing messages that successfully completed the stream in a prior run.

My Attempts to Fix

I've added the call to setStateBackend, thinking that perhaps the default memory backend just didn't remember correctly. That didn't seem to help.

I've removed the call to enableCheckpointing, hoping that perhaps there was a separate mechanism to track state in Flink vs Zookeeper. That didn't seem to help.

I've used different sinks, RollingFileSink, print(); hoping that maybe the bug was in kafka. That didn't seem to help.

I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that maybe the bug was in the latest version. That didn't seem to help.

I've added the zookeeper.connect config to the properties object, hoping that the comment about it only being useful in 0.8 was wrong. That didn't seem to help.

I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea drfloob). That didn't seem to help.

My Plea

Help!

Community
  • 1
  • 1
mbarlocker
  • 1,310
  • 10
  • 16
  • 1
    Just for fun, try setting EXACTLY_ONCE explicitly. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) – drfloob Sep 13 '16 at 06:23
  • 1
    I have exactly the same problem, the same event is consumend again after starting a flink streaming job. Maybe the offset is not correctly incremented when saving the checkpoint? – static-max Sep 13 '16 at 16:27
  • The explicit checkpointing mode didn't work. I've updated the post. Good idea, though. – mbarlocker Sep 13 '16 at 19:57

2 Answers2

10

(I've posted the same reply in the JIRA, just cross-posting the same here)

From your description, I'm assuming you're manually shutting down the job, and then resubmitting it, correct?

Flink does not retain exactly-once across manual job restarts, unless you use savepoints (https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). The exactly-once guarantee refers to when the job fails and then automatically restores itself from previous checkpoints (when checkpointing is enabled, like what you did with env.enableCheckpointing(500) )

What is actually happening is that the Kafka consumer is simply start reading from existing offsets committed in ZK / Kafka when you manually resubmitted the job. These offsets were committed to ZK / Kafka the first time you executed the job. They however are not used for Flink's exactly-once semantics; Flink uses internally checkpointed Kafka offsets for that. The Kafka consumer commits those offsets back to ZK simply to expose a measure of progress of the job consumption to the outside world (wrt Flink).

Gordon Tai
  • 319
  • 1
  • 7
  • Hi Gordon, what if I update/change the job and re-submit it? Why can't the correct offset from ZK be used for thatt? It seems rather easy to fix by incrementing the offset by one, or am I missing a point? I don't want to use the commandline or add additinal de-duplication in my downstream apps. Or am I using flink wrong? – static-max Sep 14 '16 at 09:42
  • Like I said, the offsets in ZK are only committed back to expose progress. Flink achieves exactly-once with its internal checkpointing mechanism; the state backends are where the correct offset snapshots are kept. The "external" offsets committed to ZK may not necessarily be the correct offsets, because they do not really cooperate with Flink's checkpointing. – Gordon Tai Sep 14 '16 at 10:59
  • If you want to update/change job, usually this is managed downtime of a streaming job, and Flink provides savepoints for this. When resubmitting a job, you can provide previous savepoints as the starting point. There's also ongoing work in Flink to support "savepoint and resubmit" a job in a single command. – Gordon Tai Sep 14 '16 at 11:02
  • 1
    Another thing worth pointing out so that we don't get confused here: "Exactly-once" means that all records are accounted in Flink's user / windows state exactly one time, not that records are processed only once. If failure happens, and a job needs to replay a bit of data, Flink's checkpointing makes sure that even the internal states of operators are replayed back so that they seem to not have included any changes from the to-be-replayed data. It is important to know that Flink's exactly-once is a combo of replaying data, and at the same time rolling the internal states back – Gordon Tai Sep 14 '16 at 11:08
  • Using manual savepoints did the trick in my small test, thanks for helping me out and clarifying. Maybe the documentation can be updated to make that behavior a bit more clear? – static-max Sep 14 '16 at 13:56
  • I was running locally, and shutting down the entire system. That's good to know. Thanks! – mbarlocker Sep 15 '16 at 03:32
  • 1
    I agree the documentation can be more informative regarding this. Let's open up a ticket for that ;) – Gordon Tai Sep 15 '16 at 06:22
  • Hi @melmoth and mbarlocker, I've revisited this issue, and I think this may actually be a bug. Like what melmoth mentioned, the Kafka consumer is either committing 1 less offset back to ZK, or that it's supposed to start from the next record when starting from offsets found in ZK. Note that this still has nothing to do with Flink's exactly-once guarantee though; this is simply that the consumer isn't starting at the right place when starting with offsets found from ZK. I'm sorry for making a wrong conclusion on this bug in the first place. – Gordon Tai Sep 15 '16 at 16:35
1

Update 2: I fixed the bug with the offset handling, it got merged in the current MASTER.

Update: Not an issue, use manual savepoints before canceling the job (thanks to Gordon)

I checked the logs and it seems like a bug in the offset handling. I filed a report under https://issues.apache.org/jira/browse/FLINK-4618. I will update this answer when I got feedback.

static-max
  • 739
  • 10
  • 19