0

I run the following code:

import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.{Encoders, Row}
import org.scalatest.FlatSpec

case class Inner(i: Int)

case class Outer(in: Inner)

class MyTest extends FlatSpec with DatasetSuiteBase {
  behavior of "Engine"
  it should "work" in {
    import spark.implicits._
    val input = Seq("alice", "bob").toDF("name")
    val schema = Encoders.product[Outer].schema
    implicit val enc = Encoders.kryo[Row]
    val processed = input
      .map { row =>
        new GenericRowWithSchema(Array(Outer(Inner(row.getString(0).length))), schema): Row
      }
    processed.printSchema() //1
    processed.show //2
    val withSchema = spark.createDataFrame(processed.rdd, schema)
    withSchema.printSchema //3
    withSchema.show // throws exception
  }
}

Result from 1

root
 |-- value: binary (nullable = true)

Result from 2

+--------------------+
|               value|
+--------------------+
|[01 00 6F 72 67 2...|
|[01 00 6F 72 67 2...|
+--------------------+

Result from 3

root
 |-- in: struct (nullable = true)
 |    |-- i: integer (nullable = false)

4 throws an exception

Outer is not a valid external type for schema of struct<i:int>

Does anyone know what is wrong here? Is it even possible in Spark?

@edit Reimplemented

  it should "find minimal example" in {
    import spark.implicits._
    val input = Seq("alice", "bob").toDF("name")
    val schema = Encoders.product[Outer].schema
    implicit val enc = RowEncoder(schema)
    val processed = input.map { row => Row(Outer(Inner(row.getString(0).length))) }
    processed.printSchema()
    processed.show
  }
zero323
  • 322,348
  • 103
  • 959
  • 935
pcejrowski
  • 603
  • 5
  • 15
  • 2
    what are you trying to achieve with the code? – Ramesh Maharjan Apr 30 '18 at 14:42
  • Sorry, cannot run into too many details (NDA), but if Inner is a primitive type (`Int` / `Double` / `String`) this code works as expected, but it doesn't work for inner being `case class` – pcejrowski Apr 30 '18 at 14:47
  • 2
    The fundamental problem here is that you are trying to mix things, which are not indented to be mixed, You either go with `Row` (and schema or `RowEncoders`) or with typed objects and binary or (if no other option is applicable) generic (Kryo, Java) `Encoders`. You don't mix both. In all of that `Encoders.kryo[Row]` is close to as bad as you can get here. NDA or not, unless you can provide general motivation behind this code, you might not get help you expect. – zero323 Apr 30 '18 at 14:48
  • Thanks @user6910411. Will try to fix it. – pcejrowski Apr 30 '18 at 14:50
  • Don't mention it. Overall - it should be either `Rows` (I wouldn't bother with `GenericRowWithSchema`, we expect interface not concrete implementation) all the way down, or `Products` all the way down, or some things that cannot be encoded otherwise and ugly BLOB. – zero323 Apr 30 '18 at 14:53
  • @user6910411 I reimplemented only with `RowEncoder` and `Row` (see edited version ^) but I'm still getting the same error. – pcejrowski Apr 30 '18 at 15:06
  • I guess I wasn't explicit enough. If there is `Row`, there is no place for `Inner` or `Outer`. If you want `Inner` or `Outer` there is no place for `Row`. In the latter case `input.map { row => Outer(Inner(row.getString(0).length)) }` and `spark.implicits._` is all you need. In the former `input.map { row => Row(Row(Row(row.getString(0).length))) }(rowEncoder)` where `rowEncoder` [matches schema](https://stackoverflow.com/q/39433419) (give or take one enclosing `Row`). – zero323 Apr 30 '18 at 15:09
  • Can you post the schema of the data that you are trying to process? the schema example above is incomplete – Yayati Sule Jun 16 '20 at 09:18
  • @YayatiSule - thank you for looking at this, but I no longer have access to that code. AFAIR I solved that issue using zero323's last suggestion. – pcejrowski Jun 16 '20 at 11:03

0 Answers0