0

I am using below spark streaming Scala code for consuming real time kafka message from producer topic. But the issue is sometime my job is failed due to server connectivity or some other reason and in my code auto commit property is set true due to that some message is lost and not able to store in my database.

So just want to know is there any way if we want to pull old kafka message from specific offset number. I tried to set "auto.offset.reset" is earliest or latest but it fetch only new message those is not yet commit.

Let's take the example here like my current offset number is 1060 and auto offset reset property is earliest so when I restart my job it starts reading the message from 1061 but in some case if I want to read old kafka message from offset number 1020 then is there any property that we can use to start the consuming message from specific offset number

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext._ 


val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

val topic = "test123"
val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[KafkaAvroDeserializer],
  "schema.registry.url" -> "http://abc.test.com:8089"
  "group.id" -> "spark-streaming-notes",
  "auto.offset.reset" -> "earliest"
  "enable.auto.commit" -> true
)


val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topic, KafkaParams)

stream.print()

ssc.start()
ssc.awaitTermination()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

From Spark Streaming, you can't. You'd need to use kafka-consumer-groups CLI to commit offsets specific to your group id. Or manually construct a KafkaConsumer instance and invoke commitSync before starting the Spark context.

import org.apache.kafka.clients.consumer.KafkaConsumer

val c = KafkaConsumer(...)
val toCommit: java.util.Map[TopicPartition,OffsetAndMetadata] = ...
c.commitSync(toCommit) // But don't do this every run of your app

ssc.start()

Alternatively, Structured Streaming does offer startingOffsets config.

auto.offset.reset only applies to non existing group.id's

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Could you please share more details about kafka-consumers-group. Is there any properties in scala spark streaming ? – abhishek chechani Dec 18 '22 at 10:11
  • I have tried to add startingOffsets property but when i execute my job it's showing this property is not configure – abhishek chechani Dec 18 '22 at 10:15
  • First is a shell script installed by Kafka, nothing to do with Spark. As answered, you'd use a KafkaConsumer class instead if you want to use code. Secondly, I said startingOffsets is **only for Spark Structured Streaming**, which you're not using – OneCricketeer Dec 18 '22 at 14:54
  • Thanks for the response. As i have to use spark streaming with scala code only in my project. So is there any other way or properties we can use for manual offset commit or restart job with older offset number in case of failure so we will not lost the data. – abhishek chechani Dec 18 '22 at 15:36
  • 1) Spark "Streaming" vs "Structured Streaming" has no significant difference, both can do the same thing, if you want to use `startingOffsets`, then use that instead 2) `KafkaConsumer.seek` can still be called in Scala, regardless of using Spark – OneCricketeer Dec 19 '22 at 02:47
  • 1
    I am able to read the old kafka message with startingOffsets parameter using Structured streaming. But topic key and value coming as binary format so how can we map value with avro deserilizer using schema registry – abhishek chechani Dec 19 '22 at 17:34
  • Glad to hear. Feel free to add your code in another answer, or accept this one using checkmark next to the post – OneCricketeer Dec 19 '22 at 17:35
  • Could you please help on the desilizer part also as in my topic value coming as binary format so how can we convert each record value(json format) with avro deserilizer using schema registry – abhishek chechani Dec 19 '22 at 17:40
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/250534/discussion-between-abhishek-chechani-and-onecricketeer). – abhishek chechani Dec 19 '22 at 17:44
  • That has [already been answered](https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry) – OneCricketeer Dec 19 '22 at 17:44
  • From producer side they are sending some message that is not in correct avro format that's why i am getting the below error. (org.apache.kafka.common.errors.SerializationException: Unknown magic byte) .But if i want to skip those messages through spark streaming scala code then is there any property or way to filter out these messages ? – abhishek chechani Dec 22 '22 at 06:01
  • Sure. Use filter method. Open a new post for new issues and accept this one using checkmark next to the post if it addressed your initial question – OneCricketeer Dec 22 '22 at 18:19