I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.
I need toconvert avro schema object into StructType for creating DataFrame.
Can you please help .
I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.
I need toconvert avro schema object into StructType for creating DataFrame.
Can you please help .
com.databricks.spark.avro has a class to help you with this
StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();
Please go through this specific example : http://bytepadding.com/big-data/spark/read-write-parquet-files-using-spark/
In pyspark 2.4.7 my solusion is to create an empty dataframe with avroschema and then take the the StructType object from this empty dataframe.
with open('/path/to/some.avsc','r') as avro_file:
avro_scheme = avro_file.read()
df = spark\
.read\
.format("avro")\
.option("avroSchema", avro_scheme)\
.load()
struct_type = df.schema
The answer from Wisnia works, but FYI another solution my coworkers and I came up with was the following:
avro_schema = "..."
java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)
java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)
Updated as of 2020-05-31
Use below if you're on scala 2.12
with a newer spark version.
sbt:
scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
val schemaType = SchemaConverters
.toSqlType(avroSchema)
.dataType
.asInstanceOf[StructType]
Databrics gives support to avro related utilities in spark-avro package, use below dependency in sbt "com.databricks" % "spark-avro_2.11" % "3.2.0"
Code
*
val sqlSchema= SchemaConverters.toSqlType(avroSchema)
*
Before '3.2.0' version, 'toSqlType' is private method so if you are using older version than 3.2 then copy complete method in your own util class else upgrade to latest version.
Any example for doing same in pyspark? Below code works for me but there should be some other easier way to do this
# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4
import requests
import os
import avro.schema
from pyspark.sql.types import StructType
schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)
spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))