1

I'm trying to create a dataset with some geo data using spark and esri. If Foo only have Point field, it'll work but if I add some other fields beyond a Point, I get ArrayIndexOutOfBoundsException.

import com.esri.core.geometry.Point
import org.apache.spark.sql.{Encoder, Encoders, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  case class Foo(position: Point, name: String)

  object MyEncoders {
    implicit def PointEncoder: Encoder[Point] = Encoders.kryo[Point]

    implicit def FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
  }

  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("app").setMaster("local"))
    val sqlContext = new SQLContext(sc)
    import MyEncoders.{FooEncoder, PointEncoder}
    import sqlContext.implicits._
    Seq(new Foo(new Point(0, 0), "bar")).toDS.show
  }
}

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.execution.Queryable$$anonfun$formatString$1$$anonfun$apply$2.apply(Queryable.scala:71) at org.apache.spark.sql.execution.Queryable$$anonfun$formatString$1$$anonfun$apply$2.apply(Queryable.scala:70) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.sql.execution.Queryable$$anonfun$formatString$1.apply(Queryable.scala:70) at org.apache.spark.sql.execution.Queryable$$anonfun$formatString$1.apply(Queryable.scala:69) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at org.apache.spark.sql.execution.Queryable$class.formatString(Queryable.scala:69) at org.apache.spark.sql.Dataset.formatString(Dataset.scala:65) at org.apache.spark.sql.Dataset.showString(Dataset.scala:263) at org.apache.spark.sql.Dataset.show(Dataset.scala:230) at org.apache.spark.sql.Dataset.show(Dataset.scala:193) at org.apache.spark.sql.Dataset.show(Dataset.scala:201) at Main$.main(Main.scala:24) at Main.main(Main.scala)

Mehraban
  • 3,164
  • 4
  • 37
  • 60

1 Answers1

0

Kryo create encoder for complex data types based on Spark SQL Data Types. So check the result of schema that kryo create:

val enc: Encoder[Foo] = Encoders.kryo[Foo]
println(enc.schema)  // StructType(StructField(value,BinaryType,true))
val numCols = schema.fieldNames.length // 1

So you have one column data in Dataset and it's in Binary format. But It's strange that why Spark attempting to show Dataset in more than one column (and that error occurs). To fix this, upgrade Spark version to 2.0.0.

By using Spark 2.0.0, you still have problem with columns data types. I hope writing manual schema works if you can write StructType for esri Point class:

val schema = StructType(
   Seq(
     StructField("point", StructType(...), true), 
     StructField("name", StringType, true)
   )
)

val rdd = sc.parallelize(Seq(Row(new Point(0,0), "bar")))

sqlContext.createDataFrame(rdd, schema).toDS
Milad Khajavi
  • 2,769
  • 9
  • 41
  • 66