I am writing a spark job that the dataset is pretty flexible, it's defined as Dataset[Map[String, java.io.Serializable]]
.
now the problem start to show up, spark runtime complains about No Encoder found for java.io.Serializable
. I've tried kyro serde, still showing the same error message.
the reason why I have to use this weird Dataset type is because I have flexible fields per Row. and the map looks like:
Map(
"a" -> 1,
"b" -> "bbb",
"c" -> 0.1,
...
)
is there anyway in Spark to handle this flexible dataset type?
EDIT: here is the solid code anyone can try.
import org.apache.spark.sql.{Dataset, SparkSession}
object SerdeTest extends App {
val sparkSession: SparkSession = SparkSession
.builder()
.master("local[2]")
.getOrCreate()
import sparkSession.implicits._
val ret: Dataset[Record] = sparkSession.sparkContext.parallelize(0 to 10)
.map(
t => {
val row = (0 to t).map(
i => i -> i.asInstanceOf[Integer]
).toMap
Record(map = row)
}
).toDS()
val repartitioned = ret.repartition(10)
repartitioned.collect.foreach(println)
}
case class Record (
map: Map[Int, java.io.Serializable]
)
the above code will give you error Encoder not found:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.io.Serializable
- map value class: "java.io.Serializable"
- field (class: "scala.collection.immutable.Map", name: "map")