I found KafkaUtils.createRDD(...)
to batch read an RDD from a Kafka topic for batch processing. But my Kafka topic has Avro values and this method has (amongst others) the arguments
Class<V> valueClass
Class<VD> valueDecoderClass
where
VD extends kafka.serializer.Decoder<V>
Say my avro class is AvroType. The confluent KafkaAvroDecoder has the signature
KafkaAvroDecoder extends AbstractKafkaAvroDeserializer implements Decoder<Object>
so it cannot be used in as the valueDecoderClass for the valueClass AvroType. It could only be used if my value class was Object.
How to proceed? Creating my own
KafkaAvroDecoder<T> extends AbstractKafkaAvroDeserializer implements Decoder<T>
does not seem ideal at first glance as the deserialize()
method of AbstractKafkaAvroDeserializer is doing a lot of heavy lifting. Not sure if I would need to rewrite that method every single time we changed the Avro schema.
Maybe KafkaUtils.createRDD(...)
is the wrong direction to take and there is a better way? Yes, I need RDDs. Can it be done?