12

I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.

I need toconvert avro schema object into StructType for creating DataFrame.

Can you please help .

Dushyant Singh
  • 141
  • 1
  • 2
  • 7

6 Answers6

5

com.databricks.spark.avro has a class to help you with this

 StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();

Please go through this specific example : http://bytepadding.com/big-data/spark/read-write-parquet-files-using-spark/

KrazyGautam
  • 2,839
  • 2
  • 21
  • 31
5

In pyspark 2.4.7 my solusion is to create an empty dataframe with avroschema and then take the the StructType object from this empty dataframe.

with open('/path/to/some.avsc','r') as avro_file:
    avro_scheme = avro_file.read()

df = spark\
    .read\
    .format("avro")\
    .option("avroSchema", avro_scheme)\
    .load()

struct_type = df.schema

Wisnia
  • 51
  • 1
  • 2
5

The answer from Wisnia works, but FYI another solution my coworkers and I came up with was the following:

avro_schema = "..."

java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
    spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)

java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)
chas
  • 407
  • 4
  • 6
4

Updated as of 2020-05-31

Use below if you're on scala 2.12 with a newer spark version.

sbt:

scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType

val schemaType = SchemaConverters
  .toSqlType(avroSchema)
  .dataType
  .asInstanceOf[StructType]
Leif Wickland
  • 3,693
  • 26
  • 43
moon
  • 1,702
  • 3
  • 19
  • 35
1

Databrics gives support to avro related utilities in spark-avro package, use below dependency in sbt "com.databricks" % "spark-avro_2.11" % "3.2.0"

Code

*

val sqlSchema= SchemaConverters.toSqlType(avroSchema)

*

Before '3.2.0' version, 'toSqlType' is private method so if you are using older version than 3.2 then copy complete method in your own util class else upgrade to latest version.

Sagar balai
  • 479
  • 6
  • 13
1

Any example for doing same in pyspark? Below code works for me but there should be some other easier way to do this

# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4

import requests
import os
import avro.schema

from pyspark.sql.types import StructType

schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)

spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))
cchantep
  • 9,118
  • 3
  • 30
  • 41
Sathya
  • 332
  • 1
  • 3
  • 10