0

I have a requirement in streaming where I have to convert the GenericRecord to DatFrame so that I can use EXPLODE and other features available in DF. So first, am looking at how to convert GenericRecord to DF.

I have checked the below URL which helps in converting the record to DF. But am not able to understand how to add the class SchemaConverterUtils to avro object.

How to convert RDD[GenericRecord] to dataframe in scala?

Its giving me read-only file when I try to edit. Am new to scala/java. Could you please help me understand how to do this.

Thanks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Kiran
  • 451
  • 1
  • 6
  • 23

1 Answers1

1

About that post, the spark-avro library is deprecated by DataBricks and is donated to Spark.

ABRiS library provides a UDF to convert a column of Array[Byte] to a column of complex type and eventually to a DataFrame.

In your case, you should first couple of transformations.

import org.apache.spark.sql.DataFrame
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.examples.data.generation.AvroDataUtils

val spark: SparkSession = SparkSession
    .builder().master("local[*]").getOrCreate()
// read data into an RDD of GenericRecord called "genericRecordRdd"
// Have your schema in string format in a variable called "stringSchema"
import spark.implicits._
val domainDF: DataFrame = genericRecordRdd
        .map(AvroDataUtils.recordToBytes)
        .toDF("value")
        .select(from_avro(col("value"), stringSchema) as 'data).select("data.*")

The AvroDataUtils.recordToBytes is part of ABRiS library that converts a GenericRecord object to Array[Byte]. Then you create a DataFrame with only one column and it is called "value". At this moment, you are ready to use from_avro UDF. Following the documentation on the website, you do have other options but from your description, I thought this would be the closest.

Iraj Hedayati
  • 1,478
  • 17
  • 23