Trying to read from kafka source. I want to extract timestamp from message received to do structured spark streaming. kafka(version 0.10.0.0) spark streaming(version 2.0.1)
Asked
Active
Viewed 4,746 times
5

Ruslan Ostafiichuk
- 4,422
- 6
- 30
- 35

shivali
- 437
- 1
- 6
- 13
-
Could you show a snippet of your current code? – vanekjar Nov 14 '16 at 23:33
-
@vanekjar val ds1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() – shivali Nov 15 '16 at 12:07
2 Answers
3
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "your.server.com:9092")
.option("subscribe", "your-topic")
.load()
.select($"timestamp", $"value")
Field "timestamp" is what you are looking for. Type - java.sql.Timestamp. Make sure that you are connecting to 0.10 Kafka server. There is no timestamp in earlier versions. Full list of fields described here - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

Gorini4
- 373
- 1
- 5
- 13
1
I'd suggest couple things:
Suppose you create a stream via latest Kafka Streaming Api (0.10 Kafka)
E.g. you use dependency:
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
Than you create a stream, according to the docs above:
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "spark-streaming-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val sparkConf = new SparkConf() // suppose you have 60 second window val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, Array[Byte]](topics, kafkaParams))
Your stream will be an DStream of ConsumerRecord[String,Array[Byte]] and you can get a timestamp and key-value as simple as:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
Hope that helps.

Vlad Vlaskin
- 110
- 8
-
How do you convert the above stream.map { record => (record.timestamp(), record.key(), record.value()) to DF. am new to spark i want to onclude timestamp from kafka also while converting to DF. can you please tell me how to do it – BigD Feb 08 '19 at 16:09
-
@BigD You could do something like: stream.foreachRDD{ (rdd: RDD[ConsumerRecord[String, Array[Byte]]], time: Time) => rdd.map(//some mapping).toDF } – Vlad Vlaskin Feb 09 '19 at 17:12
-
if i do rdd.map(//some mapping).toDF i am converting my values part to DF.. i want to include timestamp also while converting to DF – BigD Feb 09 '19 at 18:41