3

Say you have this (solution of encoding custom type is brought from this thread):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

When do a ds.show, I got:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

I understand that it's because the contents are encoded into internal Spark SQL binary representation. But how can I display the decoded content like this?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

UPDATE1

Displaying content is not the biggest issue, what's more important is that it could lead to problem when processing the dataset, consider this example:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

Does this mean, kryo-encoded type is not able to do operation like joinWith conveniently?

How do we process custom type on Dataset then?
If we are not able to process it after it's encoded, what's the point of this kryo encoding solution on custom type?!

(Solution provided by @jacek below is good to know for case class type, but it still cannot decode custom type)

jack
  • 1,787
  • 14
  • 30

1 Answers1

3

The following worked for me, but seems like using high-level API to do low-level (deserialization) work.

This is not to say it should be done this way, but shows that it's possible.

I don't know why KryoDeserializer does not deserialize bytes to the object the bytes came from. It is just this way.

One major difference between your class definition and mine is this case that let me using the following trick. Again, no idea exactly why it makes it possible.

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Thanks for the answer. Does it mean each time when displaying a dataset (of custom type, e.g. defined by `case class`), or writing to disk, we have to do the `ks.newInstance.deserialize` first? That quite confuses me about the usage of encoders in `Dataset`. – jack Oct 03 '20 at 20:14
  • Not sure I understood you correctly, but since you've used `kryo` things have changed a bit. Stick to `case class` and `import spark.implicits._` and you should be just fine. In other words, why have you considered `Encoders.kryo`? I rarely see its use. – Jacek Laskowski Oct 03 '20 at 20:20
  • For the type that is not within [the list](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala) of predefined encoders, [it seems](https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset) we have to use `kryo`, no? If we use it, `ds` is in binary format rather than tabular, which prevents further transformation and data processing (imagine you want to do a simple `joinWith` on the column `i` with another `ds`, you have to do `ks.newInstance.deserialize` first, which is really not convenient). – jack Oct 03 '20 at 21:33
  • For unsupported types you have to convert them to the ones that are supported. No need for kryo. – Jacek Laskowski Oct 04 '20 at 08:33
  • In fact, when i pasted your code in spark-shell, i got this error (Spark 3.0.1) in the step of **`.asInstanceOf[MyObj]`**: `java.lang.UnsupportedOperationException: Schema for type MyObj is not supported at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743) at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:69)` – jack Oct 09 '20 at 09:33
  • @jack You may have missed this `// Note that case keyword` in my code, didn't you? – Jacek Laskowski Oct 09 '20 at 09:45
  • That is true! But then it is not the solution i'm looking for, because i wouldn't encode a `case class` object using `kryo`, with `import spark.implicits._` i already got the benefits from Spark's default encoders. I'm looking for solution to decode/deserialize custom type encoded by `kryo` – jack Oct 09 '20 at 11:38
  • I cannot imagine how i can work on the encoded custom objects, if i don't deserialize it. Or say, what's the point of using `kryo` if i cannot decode back? – jack Oct 09 '20 at 11:50