2

In my spark structured streaming application, I am reading messages from Kafka, filtering them and then finally persisting to Cassandra. I am using spark 2.4.1. From the structured streaming documentation

Fault Tolerance Semantics Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

But I am not sure how does Spark actually achieve this. In my case, if the Cassandra cluster is down leading to failures in the write operation, will the checkpoint for Kafka not record those offsets.

Is the Kafka checkpoint offset based only on successful reads from Kafka, or the entire operation including write is considered for each message?

raizsh
  • 456
  • 1
  • 6
  • 16
  • I recently found, that checkpoint is not responsible for storing the offsets. It stores the current state of the spark application. Since I don't have any aggregations I might not need checkpoint at all. But is the offset commit by spark to kafka based only on successful read or does it consider the entire read and write to external source as one operation ? – raizsh Apr 24 '19 at 06:05

2 Answers2

1

Spark Structured Streaming is not commiting offsets to kafka as a "normal" kafka consumer would do. Spark is managing the offsets internally with a checkpointing mechanism.

Have a look at the first response of following question which gives a good explanation about how the state is managed with checkpoints and commitslog: How to get Kafka offsets for structured query for manual and reliable offset management?

pgras
  • 12,614
  • 4
  • 38
  • 46
1

Spark uses multiple log files to ensure fault tolerance. The ones relevant to your query are the offset log and the commit log. from the StreamExecution class doc:

 /**
   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
   * that a given batch will always consist of the same data, we write to this log *before* any
   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
   */
  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))

  /**
   * A log that records the batch ids that have completed. This is used to check if a batch was
   * fully processed, and its output was committed to the sink, hence no need to process it again.
   * This is used (for instance) during restart, to help identify which batch to run next.
   */
  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))

so when it reads from Kafka it writes the offsets to the offsetLog and only after processing the data and writing it to the sink (in your case Cassandra) it writes the offsets to the commitLog.

user_s
  • 1,058
  • 2
  • 12
  • 35