I am attempting to add a column containing List[Annotation]
to a Spark DataFrame using the below code (I've reformatted everything so this can be reproduced by directly copying and pasting).
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
case class Annotation(
field1: String,
field2: String,
field3: Int,
field4: Float,
field5: Int,
field6: List[Mapping]
)
case class Mapping(
fieldA: String,
fieldB: String,
fieldC: String,
fieldD: String,
fieldE: String
)
object StructTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
val annotationStruct =
StructType(
Array(
StructField("field1", StringType, nullable = true),
StructField("field2", StringType, nullable = true),
StructField("field3", IntegerType, nullable = false),
StructField("field4", FloatType, nullable = false),
StructField("field5", IntegerType, nullable = false),
StructField(
"field6",
ArrayType(
StructType(Array(
StructField("fieldA", StringType, nullable = true),
StructField("fieldB", StringType, nullable = true),
StructField("fieldC", StringType, nullable = true),
StructField("fieldD", StringType, nullable = true),
StructField("fieldE", StringType, nullable = true)
))),
nullable = true
)
)
)
val df = List(1).toDF
val annotation = Annotation("1", "2", 1, .5f, 1, List(Mapping("a", "b", "c", "d", "e")))
val schema = df.schema
val newSchema = schema.add("annotations", ArrayType(annotationStruct), false)
val rdd = df.rdd.map(x => Row.fromSeq(x.toSeq :+ List(annotation)))
val newDF = spark.createDataFrame(rdd, newSchema)
newDF.printSchema
newDF.show
}
}
However, I'm getting an error when running this code.
Caused by: java.lang.RuntimeException: Annotation is not a valid external type for schema of struct<field1:string,field2:string,field3:int,field4:float,field5:int,field6:array<struct<fieldA:string,fieldB:string,fieldC:string,fieldD:string,fieldE:string>>>
The schema I am passing in (ArrayType(annotationStruct)
) appears to be of the incorrect form when creating a dataFrame using createDataFrame
, but it seems to match schemas for DataFrames that contain only List[Annotation]
.
Edit: Example of modifying a DF schema in this fashion with a simple type instead of a case class.
val df = List(1).toDF
spark.createDataFrame(df.rdd.map(x => Row.fromSeq(x.toSeq :+ "moose")), df.schema.add("moose", StringType, false)).show
+-----+-----+
|value|moose|
+-----+-----+
| 1|moose|
+-----+-----+
Edit 2: I've parsed this down a bit more. Sadly, I don't have the option of creating a DataFrame directly from the case class, which is why I am trying to mirror it as a Struct using ScalaReflection. In this case, I am not altering a previous schema, just attempting to create an DataFrame from an RDD of Rows which contain lists of my case class. Spark had an issue in 1.6 which impacts parsing arrays of structs which may be empty or null - I'm wondering if these are linked.
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val annotationSchema = ScalaReflection.schemaFor[Annotation].dataType.asInstanceOf[StructType]
val annotation = Annotation("1", "2", 1, .5, 1, List(Mapping("a", "b", "c", "d", "e")))
val testRDD = spark.sparkContext.parallelize(List(List(annotation))).map(x => Row(x))
val testSchema = StructType(
Array(StructField("annotations", ArrayType(annotationSchema), false)
))
spark.createDataFrame(testRDD, testSchema).show