5

I'm using Spark Streaming 1.5.2 and I am ingesting data from Kafka 0.8.2.2 using the Direct Stream approach.

I have enabled the checkpoints so that my Driver can be restarted and pick up where it left off without loosing unprocessed data.

Checkpoints are written to S3 as I'm on Amazon AWS and not running on top of a Hadoop cluster.

The batch interval is 1 second as I want a low latency.

Issue is, it takes from 1 to 20 seconds to write a single checkpoint to S3. They are backing up in memory and, eventually, the application fails.

2016-04-28 18:26:55,483 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6068 bytes and 1515 ms
2016-04-28 18:27:06,667 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882408000', took 6024 bytes and 6340 ms
2016-04-28 18:27:11,689 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6067 bytes and 5022 ms
2016-04-28 18:27:15,982 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882409000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882409000', took 6024 bytes and 4293 ms

Is there a way to increase the interval between checkpoints without increasing the batch interval?

Alexis Seigneurin
  • 1,433
  • 2
  • 15
  • 20
  • 1
    you can avoid using checkpoints, you need to store the offsets in Kafka in order for your executors to "only" commit the offsets to kafka when they're done and re-process in case of failure. – Olivier Girardot May 02 '16 at 19:39
  • What do you mean by "store the offsets in Kafka"? Do you suggest using the receiver-based approach or rather the direct approach and writing specific code to store the offsets in Zookeeper? – Alexis Seigneurin May 02 '16 at 20:16
  • I would also recommend to store the offsets in Kafka (or in another DB) and use the direct approach Interesting slides about it : http://www.slideshare.net/jjkoshy/offset-management-in-kafka – Paul Leclercq May 02 '16 at 20:36
  • @AlexisSeigneurin - Don't use S3, it's way too slow for checkpointing. You should try using a much faster datastore for checkpoints like DynamoDB. You might also consider using Kinesis if you're already in AWS, which has some nice utility methods that handle the checkpoint storage optimization in AWS. – Myles Baker May 04 '16 at 15:18
  • 3
    We kept offsets in Cassandra with a reverse time-based index, so that we could also recover time-based windows. – maasg May 04 '16 at 16:06
  • Ok, thanks for the input. I an now storing the offsets in Zookeeper and it's running smoothly. – Alexis Seigneurin May 04 '16 at 19:36
  • Direct approach and Zookeeper or Kafka directly :) – Olivier Girardot May 06 '16 at 14:47
  • Alexis Seigneurin I suggest you to prefer kafka offset manager to zookeeper for offset storage. @MylesBaker Do DynamoDB can be use for checkpointing easily instead of S3 in the way to improve performances? – crak Jul 29 '16 at 14:08

1 Answers1

0

Yes, you can achieve that using checkpointInterval parameter. You can set the duration while doing checkpoint like shown in below doc.

Note that checkpointing of RDDs incurs the cost of saving to reliable storage. This may cause an increase in the processing time of those batches where RDDs get checkpointed. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently causes the lineage and task sizes to grow, which may have detrimental effects. For stateful transformations that require RDD checkpointing, the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

avr
  • 4,835
  • 1
  • 19
  • 30
  • I see this interval can be set when setting up the checkpointing of the DStream itself. I assume this comes in addition to the metadata checkpointing but doesn't change the main checkpointing operation, right? – Alexis Seigneurin May 02 '16 at 20:08
  • Yes you are right. You can achieve checkpointing in 2 ways. 1) metadata checkpointing which is checkpointed for every batch. 2) data checkpointing which is checkpointed for every `checkpointInterval`. I though you were doing data checkpointing, but as per my experience metadata( which contains only sparkConf, code and tasks info) checkpointing shouldn't take ~20 seconds! you can try upgrading to better network! – avr May 03 '16 at 17:24
  • I'm on AWS and writing to S3, so there's nothing I can do on the side of the network. – Alexis Seigneurin May 03 '16 at 20:40