I'm trying to create a dStream from a kafka server and then do some transformations on that stream. I have included a catch for if the stream is empty (if(!rdd.partitions.isEmpty)
); however, even when no events are being published to the kafka topic, the else
statement is never reached.
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
Is there an alternative statement I should use to check if the stream is empty when using KafkaUtils.createDirectStream
rather than createStream
?