4

I have simple spark streaming application which reads data from Kafka and then send this data after transformation on a http end point (or another kafka - for this question let's consider http). I am submitting jobs using job-server.

I am currently starting the consumption from source kafka with "auto.offset.reset"="smallest" and interval=3s. In happy case everything looks good. Here's an excerpt:

kafkaInputDStream.foreachRDD(rdd => {
  rdd.foreach(item => {
  //This will throw exception if http endpoint isn't reachable
      httpProcessor.process(item._1, item._2)
  })
})

Since "auto.offset.reset"="smallest", this processes about 200K messages in one job. If I stop http server mid-job (simulating some issue in POSTing) and httpProcessor.process throws exception, that Job fails and whatever is unprocessed is lost. I see it keeps on polling every 3 seconds after that.

So my question is:

  1. Is my assumption right that if in next 3 second job if it got X messages and only Y could be processed before hitting an error, rest X-Y will not be processed?
  2. Is there a way to pause the stream/consumption from Kafka? For instance in case there's a intermittent network issue and most likely all messages consumed will be lost in that time. Something which keeps on retrying (maybe exponential backoff) and whenever http end point is UP, start consuming again.

Thanks

K P
  • 861
  • 1
  • 8
  • 25
  • 1
    If you have network trouble while processing a particular stage of the job, you can propagate (throw) the exception to fail the entire job, and replay the entire batch that failed. This does have some overhead and only works in case your DAG is referentially transparent. – Yuval Itzchakov May 01 '16 at 11:55

2 Answers2

3

Yes, your assumption is correct that if your partition fails, the remaining events are not processed for the moment.

However, there are quite a few parameter for you to tune to get your desired behavior (if you use DirectKafkaInputDStream).

Lets start with "auto.offset.reset"="smallest": This parameter tells Kafka to begin from the beginning, when there is no stored commit for the current group. As you mentioned that your RDD contains a lot of messages after starting, I assume that you do not commit your messages properly. If you expect exactly-once semantics, you definitely should consider keep track of your offsets as the DirectKafkaStreamInput explicitly does not keep track of that.

Starting offsets are specified in advance, and this DStream is not responsible for committing offsets, so that you can control exactly-once

Comment in the DirectKafkaInputSream Branch 1.6

That said currently your message are reprocessed, every time you restart your streaming job.

If you commit your processed offsets and pass it into the InputDStream on startup, the listener will continue from the last committed offset.

Regarding backpressure, the DirectKafkaInputDStream already uses a RateController which estimates how much events should be processed in one batch.

To use it, you have to enable backpressure:

"spark.streaming.backpressure.enabled": true

You can also limit the "spark.streaming.kafka.maxRatePerPartition" to add a upper bound for the batch size.

If you want to control backpressure on your own (and perhaps stop the consumer completely for a while), you may want to implement some methods of StreamingListener and use it in your job. You can e.g. decide after each completed batch to stop your streaming job or not with the StreamingListener.

  • All good points. You forgot to mention CheckPointing too, which I've tested and works really well if you wish to restart the driver for an upgrade etc. I was more worried about intermittent failures, like network failure for 2 minutes ~ 10K potentially failed messages. If I track offset somewhere say Zookeeper/Cassandra, it'll be tricky be replay messages when network recovers and stream resumes processing messages again. Need to think about it a little more, thanks for answer. If there's no better answer, I'll award you bounty points. – K P Apr 29 '16 at 06:20
  • 1
    @KP If you're already partially isolated by backpressure and you want to handle only intermittent failures just don't fail. Since each partition is handled sequentially using `Try recoverWithDelay recoverWithDelay ... fail` chain for each element should be enough to effectively "pause" the stream for a short duration of time. – zero323 Jun 29 '16 at 09:11
  • How to pause and resume using streaming listener? – user1870400 Aug 05 '19 at 09:25
1

I think Spring Cloud Stream could solve your problem. Kafka is source. Spark Streaming is processor. Http is sink. Only if there is input from Kafka, Spark Streaming will process. You needn't stop or resume input from Kafka. Hope it helps.

kevin king
  • 81
  • 1
  • 3
  • I don't understand - you needn't stop/resume input from Kafka? What if Http Sink isn't responding ~ maybe it's down or temporary network outage? – K P Apr 27 '16 at 17:54
  • Source processor and sink work like Linux PIPELINE. It's OK without sink, and processor would work well. You can know more from [spring-cloud-stream](http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC2/reference/htmlsingle/index.html) and [spring-cloud-data-flow](http://docs.spring.io/spring-cloud-dataflow/docs/1.0.0.M2/reference/html/) – kevin king Apr 28 '16 at 05:35
  • If Http Sink is down and you don't want to lose data from Spark Streaming, you can add Kafka or Redis between Spark Streaming and Http Sink. When Http Sink is up, it get data from Kafka. – kevin king Apr 28 '16 at 05:46
  • I considered that option too, but I can't add another layer like that. I guess only way is to stop consuming data or make application die after some time. – K P Apr 28 '16 at 19:16