1

I am attempting to add a column containing List[Annotation] to a Spark DataFrame using the below code (I've reformatted everything so this can be reproduced by directly copying and pasting).

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

case class Annotation(
                            field1: String,
                            field2: String,
                            field3: Int,
                            field4: Float,
                            field5: Int,
                            field6: List[Mapping]
                          )

case class Mapping(
                            fieldA: String,
                            fieldB: String,
                            fieldC: String,
                            fieldD: String,
                            fieldE: String
                          )

object StructTest {
  def main(args: Array[String]): Unit = {
    val spark               = SparkSession.builder().master("local[*]").getOrCreate()
    import spark.implicits._
    val annotationStruct =
      StructType(
        Array(
          StructField("field1", StringType, nullable = true),
          StructField("field2", StringType, nullable = true),
          StructField("field3", IntegerType, nullable = false),
          StructField("field4", FloatType, nullable = false),
          StructField("field5", IntegerType, nullable = false),
          StructField(
            "field6",
            ArrayType(
              StructType(Array(
                StructField("fieldA", StringType, nullable = true),
                StructField("fieldB", StringType, nullable = true),
                StructField("fieldC", StringType, nullable = true),
                StructField("fieldD", StringType, nullable = true),
                StructField("fieldE", StringType, nullable = true)
              ))),
            nullable = true
          )
        )
      )

    val df = List(1).toDF
    val annotation = Annotation("1", "2", 1, .5f, 1, List(Mapping("a", "b", "c", "d", "e")))
    val schema = df.schema
    val newSchema = schema.add("annotations", ArrayType(annotationStruct), false)
    val rdd = df.rdd.map(x => Row.fromSeq(x.toSeq :+ List(annotation)))
    val newDF = spark.createDataFrame(rdd, newSchema)
    newDF.printSchema
    newDF.show
  }
}

However, I'm getting an error when running this code.

Caused by: java.lang.RuntimeException: Annotation is not a valid external type for schema of struct<field1:string,field2:string,field3:int,field4:float,field5:int,field6:array<struct<fieldA:string,fieldB:string,fieldC:string,fieldD:string,fieldE:string>>>

The schema I am passing in (ArrayType(annotationStruct)) appears to be of the incorrect form when creating a dataFrame using createDataFrame, but it seems to match schemas for DataFrames that contain only List[Annotation].

Edit: Example of modifying a DF schema in this fashion with a simple type instead of a case class.

val df = List(1).toDF
spark.createDataFrame(df.rdd.map(x => Row.fromSeq(x.toSeq :+ "moose")), df.schema.add("moose", StringType, false)).show
+-----+-----+
|value|moose|
+-----+-----+
|    1|moose|
+-----+-----+

Edit 2: I've parsed this down a bit more. Sadly, I don't have the option of creating a DataFrame directly from the case class, which is why I am trying to mirror it as a Struct using ScalaReflection. In this case, I am not altering a previous schema, just attempting to create an DataFrame from an RDD of Rows which contain lists of my case class. Spark had an issue in 1.6 which impacts parsing arrays of structs which may be empty or null - I'm wondering if these are linked.

val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val annotationSchema = ScalaReflection.schemaFor[Annotation].dataType.asInstanceOf[StructType]
    val annotation       = Annotation("1", "2", 1, .5, 1, List(Mapping("a", "b", "c", "d", "e")))
    val testRDD = spark.sparkContext.parallelize(List(List(annotation))).map(x => Row(x))
    val testSchema = StructType(
      Array(StructField("annotations", ArrayType(annotationSchema), false)
    ))
spark.createDataFrame(testRDD, testSchema).show
mongolol
  • 941
  • 1
  • 13
  • 31
  • Most likely not related to schema. `seq(notesInd).toString` looks like an obvious culprit. Unfortunately lack of [mcve] / [reproducible example](https://stackoverflow.com/q/48427185/6910411) ⇒ voting to close. – zero323 Feb 09 '18 at 00:17
  • Hmm, tried it without generating any annotations, instead just creating an instance of the `Annotation` class and appending it to a DataFrame in a similar fashion. Received the same error, indicating that it's likely not anything inside the map/mappartitions, but the schema itself. – mongolol Feb 09 '18 at 00:36
  • Reformatted everything so it can be reproduced without needing any additional code. – mongolol Feb 09 '18 at 01:13
  • This is a completely different problem, and not related to the previous one (nor resolving the previous one). You cannot mix external and internal representations. If you want to use `Row` all `struct` values have to be `Row`. Otherwise - all values have to be represented with external types (`Product` types for `struct`). – zero323 Feb 09 '18 at 02:30
  • Unfortunately, this the exact issue i was running into before. Before this, I had a `generateAnnotations` function which yielded `List[Annotation]`. I then tried to append this to the original DataFrame by taking it's schema and modifying it with `annotationStruct`. I've verified separately that `seq(notesInd).toString` is not causing the issue, so I removed any distracting code and just left an example of the core of the issue I'm hitting. Can you expand on what you mean by mixing represenations? It's certainly possible to create a DataFrame from an RDD of type `List[Annotation]`. – mongolol Feb 09 '18 at 02:40
  • Where I'm confused is that modifying a schema as shown in the edit works just fine. But then again, that may not be mixing up any representations as @user6910411 mentioned may be the problem. – mongolol Feb 09 '18 at 02:52
  • [The first revision](https://stackoverflow.com/revisions/48696839/1) filed with `java.lang.ArrayIndexOutOfBoundsException` - once again this has nothing to do with schema (ditto). If you resolve the current one (just use consistent representation) the previous one is likely to go back. – zero323 Feb 09 '18 at 03:00
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/164799/discussion-between-mongolol-and-user6910411). – mongolol Feb 09 '18 at 03:02
  • Right, but that exception was being caused by the usage of ++ vs :+. Effectively, I was appending an unknown number of items to a sequence instead of as a singular list. I know it's not kosher to change things as much as I did - both formats were intended to address adding a complex column to a dataframe, and the first had a irrelevant error. – mongolol Feb 09 '18 at 03:14

1 Answers1

1

If you are concerned with adding a complex column to an existing dataframe, then following solution should work for you.

val df = List(1).toDF
val annotation = sc.parallelize(List(Annotation("1", "2", 1, .5f, 1, List(Mapping("a", "b", "c", "d", "e")))))
val newDF = df.rdd.zip(annotation).map(x => Merged(x._1.get(0).asInstanceOf[Int], x._2)).toDF
newDF.printSchema
newDF.show(false)

which should give you

root
 |-- value: integer (nullable = false)
 |-- annotations: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- field2: string (nullable = true)
 |    |-- field3: integer (nullable = false)
 |    |-- field4: float (nullable = false)
 |    |-- field5: integer (nullable = false)
 |    |-- field6: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- fieldA: string (nullable = true)
 |    |    |    |-- fieldB: string (nullable = true)
 |    |    |    |-- fieldC: string (nullable = true)
 |    |    |    |-- fieldD: string (nullable = true)
 |    |    |    |-- fieldE: string (nullable = true)

+-----+---------------------------------------+
|value|annotations                            |
+-----+---------------------------------------+
|1    |[1,2,1,0.5,1,WrappedArray([a,b,c,d,e])]|
+-----+---------------------------------------+

The case classes used are the same as yours with Merged case class created.

case class Merged(value : Int, annotations: Annotation)
case class Annotation(field1: String, field2: String, field3: Int, field4: Float, field5: Int, field6: List[Mapping])
case class Mapping(fieldA: String, fieldB: String, fieldC: String, fieldD: String, fieldE: String)

When case classes are used we don't need to define schema. And the process of creation of column names by using case classes and sqlContext.createDataFrame are different.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • Accepting this as it addresses the problem stated above - thanks Ramesh. I should point out that the scenario outlined in Edit2 still doesn't work, but this may be an issue with Spark itself. – mongolol Feb 14 '18 at 19:01
  • The issue i guess is Annotation is an object created by case class with schema defined and when used in createDataFrame cannot be casted to structType . Thanks for accepting – Ramesh Maharjan Feb 15 '18 at 01:20