6

I have a set of Avro based hive tables and I need to read data from them. As Spark-SQL uses hive serdes to read the data from HDFS, it is much slower than reading HDFS directly. So I have used data bricks Spark-Avro jar to read the Avro files from underlying HDFS dir.

Everything works fine except when the table is empty. I have managed to get the schema from the .avsc file of hive table using the following command but I am getting an error "No Avro files found"

val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));

val schema = new Schema.Parser().parse(schemaFile);

spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()

Workarounds:

I have placed an empty file in that directory and the same thing works fine.

Are there any other ways to achieve the same? like conf setting or something?

Abu Shoeb
  • 4,747
  • 2
  • 40
  • 45
Vinay Kumar
  • 1,664
  • 2
  • 15
  • 19

4 Answers4

9

You don't need to use emptyRDD. Here is what worked for me with PySpark 2.4:

empty_df = spark.createDataFrame([], schema) # spark is the Spark Session

If you already have a schema from another dataframe, you can just do this:

schema = some_other_df.schema

If you don't, then manually create the schema of the empty dataframe, for example:

schema = StructType([StructField("col_1", StringType(), True),
                     StructField("col_2", DateType(), True),
                     StructField("col_3", StringType(), True),
                     StructField("col_4", IntegerType(), False)]
                     )

I hope this helps.

luvrock
  • 91
  • 1
  • 3
  • 2
    You should probably add that the data types need to be imported, e.g. `from pyspark.sql.types import StructType, StructField` and that the boolean at the end indicates whether the column is nullable https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.types.StructField – 00schneider Jun 05 '20 at 08:00
5

Similar to EmiCareOfCell44's answer, just a little bit more elegant and more "empty"

val emptySchema = StructType(Seq())
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
                emptySchema)
Y.G.
  • 661
  • 7
  • 7
4

To create an empty DataFrame:

val my_schema = StructType(Seq(
    StructField("field1", StringType, nullable = false),
    StructField("field2", StringType, nullable = false)
  ))

val empty: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], my_schema)

Maybe this may help

Emiliano Martinez
  • 4,073
  • 2
  • 9
  • 19
1

Depending on your Spark version, you can use the reflection way.. There is a private method in SchemaConverters which does the job to convert the Schema to a StructType.. (not sure why it is private to be honest, it would be really useful in other situations). Using scala reflection you should be able to do it in the following way

import scala.reflect.runtime.{universe => ru}
import org.apache.avro.Schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

var schemaStr = "{\n \"type\": \"record\",\n \"namespace\": \"com.example\",\n \"name\": \"FullName\",\n \"fields\": [\n { \"name\": \"first\", \"type\": \"string\" },\n      { \"name\": \"last\", \"type\": \"string\" }\n  ]\n }"
val schema = new Schema.Parser().parse(schemaStr);

val m = ru.runtimeMirror(getClass.getClassLoader)
val module = m.staticModule("com.databricks.spark.avro.SchemaConverters")
val im = m.reflectModule(module)
val method = im.symbol.info.decl(ru.TermName("toSqlType")).asMethod

val objMirror = m.reflect(im.instance)
val structure = objMirror.reflectMethod(method)(schema).asInstanceOf[com.databricks.spark.avro.SchemaConverters.SchemaType]
val sqlSchema = structure.dataType.asInstanceOf[StructType]
val empty = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], sqlSchema)

empty.printSchema
hlagos
  • 7,690
  • 3
  • 23
  • 41