0

I found KafkaUtils.createRDD(...) to batch read an RDD from a Kafka topic for batch processing. But my Kafka topic has Avro values and this method has (amongst others) the arguments

Class<V> valueClass
Class<VD> valueDecoderClass

where

VD extends kafka.serializer.Decoder<V>

Say my avro class is AvroType. The confluent KafkaAvroDecoder has the signature

KafkaAvroDecoder extends AbstractKafkaAvroDeserializer implements Decoder<Object>

so it cannot be used in as the valueDecoderClass for the valueClass AvroType. It could only be used if my value class was Object.

How to proceed? Creating my own

KafkaAvroDecoder<T> extends AbstractKafkaAvroDeserializer implements Decoder<T>

does not seem ideal at first glance as the deserialize() method of AbstractKafkaAvroDeserializer is doing a lot of heavy lifting. Not sure if I would need to rewrite that method every single time we changed the Avro schema.

Maybe KafkaUtils.createRDD(...) is the wrong direction to take and there is a better way? Yes, I need RDDs. Can it be done?

SCouto
  • 7,808
  • 5
  • 32
  • 49
sil
  • 433
  • 8
  • 20
  • But your Value is a type of Object. All Java classes are – OneCricketeer Sep 17 '19 at 09:37
  • In any case, why not use Structured Streaming, then use foreachBatch to get RDDs? https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry and unless you're still stuck with Spark 1.x, you can use DataFrames for batch processing https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries – OneCricketeer Sep 17 '19 at 09:43
  • 1. AvroType is an Object, true, but can I then convert that Object to AvroType? For example, given RDD rdd, can I do RDD rddAvro = rdd.map(o -> (AvroType) o);? Will information be lost? 2. The batch job will just run for 10 minutes per hour. Significantly cheaper 3. Getting a RDD via dataset or dataframe is an acceptable answer here. A hard-typed dataset would be preferable as then the work of converting to the AvroType has been done. Dataframe is fine I guess, but don't want to have to rewrite the conversion each time the schema changes. – sil Sep 17 '19 at 11:30
  • Actually, I made a mistake with answer '1' above (and it is too late to edit it). It should be as follows: 1. AvroType is an Object, true, but V = AvroType and VD 'must' extend Decoder. Not Decoder where V extends W. I could set V = Object, but then I would end up with RDD rdd. Given this rdd, can I convert it to RDD avroRdd = rdd.map(o->(AvroType) o);? It seems to me that this is not possible as AvroType never existed as a Java object, so conversion in this manner (or any other) will fail – sil Sep 17 '19 at 11:44
  • You're correct there's type erasure... "Dataframe is fine I guess, but don't want to have to rewrite the conversion each time the schema changes." -- I have yet to find a way to not require changing consumer code when the schema changes... If you want to query a new field, then you need to update the schema. But using Avro and the Registry, you can stay on schema version 1 as long as it's backwards compatible with new schemas. – OneCricketeer Sep 17 '19 at 12:40
  • @cricket_007 since you're the Confluent guy, maybe you can advise... I'm using from_avro method to convert byte[] values to the type of my schema according to this https://sparkbyexamples.com/spark/spark-streaming-consume-and-produce-kafka-messages-in-avro-format/#consume-avro but I get ArrayIndexOutOfBoundException (https://github.com/AbsaOSS/ABRiS/issues/14). One potential cause is "The payload is being produced by a Confluent-compliant producer - which includes the id of the schema to the message - and consumed by a standard Avro parser." Are you familiar with this? If so what do I do? – sil Sep 20 '19 at 07:16
  • The abris library assumes Confluent Wire format, which cannot be directly parsed using standard Avro binarydecoder methods. https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format I've not used Spark Streaming to handle either type, but I know Structured Streaming works based on the first link I sent – OneCricketeer Sep 20 '19 at 12:55
  • 1
    Also, the Abris library has its own `from_confluent_avro` function that you may want to try – OneCricketeer Sep 20 '19 at 12:57
  • is there any solution that works specifically for batch processing, given that I don't want to use streaming? I have managed to get abris to work for batch with a simple avro format, but this fails on a more complex schema. Given that on their github wiki they claim batch is not supported (I just discovered this), I don't want to proceed with it as a solution – sil Sep 24 '19 at 05:34
  • In my second comment, the second link is about batch processing... You need to provide the start and end offsets, and I'm curious how complex your Avro schemas are because the solution in the first link I sent seems to work with fields that are nested like 5 layers down in my schemas – OneCricketeer Sep 24 '19 at 05:37
  • Thanks for the help on this. My issue with the batch processing link is that it does not explain how to get a hard typed Dataset from confluent schema-registry encoded avro. That's the essence of the problem I am having here. I have zero issue loading batches of bytes from Kafka. I could convert the values to String (which I don't want), but the data is still encoded. The first link was when you proposed that I use structured streaming which I don't want to do. If there are elements of it that can be used for a batch solution then that's great, I will go back and look at it. – sil Sep 24 '19 at 06:16
  • OK, I've just seen your answer (the second answer) to the structured streaming stack overflow question. I was looking at the first answer. At first glance your answer seems similar to this: https://medium.com/@ivan9miller/making-sense-of-avro-kafka-schema-registry-and-spark-streaming-bbb09eb5039c which I tried and didn't work, but there are some differences which I think could get around the problems I was having. – sil Sep 24 '19 at 06:23

0 Answers0