4

I am trying to read from a Kafka topic in my Spark batch job and publish to another topic. I am not using streaming because it does not fit our use case. According to the spark docs, the batch job starts reading from the earliest Kafka offsets by default, and so when I run the job again, it again reads from the earliest. How do I make sure that the job picks up the next offset from where it last read?

According to the Spark Kafka Integration docs, there are options to specify "startingOffsets" and "endingOffsets". But how do I figure them out?

I am using the spark.read.format("kafka") API to read the data from Kafka as a Dataset. But I did not find any option to get the start and end offset range from this Dataset read.

white-hawk-73
  • 856
  • 2
  • 10
  • 24
  • @cricket_007 Thanks for your reply. Yes, they do. But my question is how do I figure what they are. Is there an API to get offset range for the dataset I read from Kafka? – white-hawk-73 Sep 29 '19 at 14:03
  • The starting offset is always stored by a consumer group id. If you can find that and persist that ID between runs, you can lookup the start offset, then calculate the amount of records you want to scan per partition in your batch – OneCricketeer Sep 29 '19 at 16:11
  • 1
    Thanks @cricket_007. The starting offset for a batch should be equal to the end offset of last batch + 1. But how do I know where it ends reading. Ideally, I would want to read till the latest offsets available, except when the job has failed multiple times because of which man records have queued up in Kafka for processing. Then, I would want to read only X records, or else the job might fail with not enough resources. – white-hawk-73 Sep 29 '19 at 16:31
  • I tried maxOffsetsPerTrigger but it only works for Stream queries – white-hawk-73 Sep 29 '19 at 16:34
  • `kafka-consumer-groups` command is a shell wrapper around some Scala code to lookup offsets for a particular group... Otherwise, when offsets are stored back in Kafka, then groups always start at the (last commited offset + 1). If you always want to read to the end, you can set end offsets to latest. If you only want a specific number without going off the end of the partition, you'll need to lookup the latest available offset as well (also part of that script via the lag) – OneCricketeer Sep 29 '19 at 16:40
  • Hmm..but according to the spark docs(https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#kafka-specific-configurations), it says that Kafka source doesn't commit any offsets. So, after we are done with our processing, we only have to store the last processed offset, and in the next batch start reading from that offset +1. Is my understanding correct ? – white-hawk-73 Sep 29 '19 at 16:45
  • Maybe not because it says a group id will be unique for each query, and you can't set it... I'll be honest, I've not done batching from Kafka in Spark, and I've also tried to find examples of how offsets are managed. Outside of the Kafka DStream docs https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets there is this Cloudera blog https://blog.cloudera.com/offset-management-for-apache-kafka-with-apache-spark-streaming/ – OneCricketeer Sep 29 '19 at 17:49
  • Hi @cricket_007, Thanks a lot for your help. The way I am now doing this is to use https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) API to get the end Offsets for all the partitions, and use this for the endOffsets option which spark expects. StartOffsets for the first run = endOffsets - lookback. For next runs, it is simply endOffset for the previous run + 1(which I am storing in HDFS) – white-hawk-73 Oct 12 '19 at 20:30
  • 1
    Cool. Feel free to answer your own question with your implementation – OneCricketeer Oct 13 '19 at 04:00

0 Answers0