3

I'm trying to create a Dataset from a RDD y

Pattern: y: RDD[(MyObj1, scala.Iterable[MyObj2])]

So I created explicitly encoder :

implicit def tuple2[A1, A2](
                              implicit e1: Encoder[A1],
                              e2: Encoder[A2]
                            ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) 
//Create Dataset
val z = spark.createDataset(y)(tuple2[MyObj1, Iterable[MyObj2]]) 

When I compile this code I don't have an Error but when I try to run it I get this Error :

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for scala.Iterable[org.bean.input.MyObj2]
- field (class: "scala.collection.Iterable", name: "_2")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:625)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:619)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:607)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:607)
        at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:438)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
        at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
        at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:233)
        at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:33)

Some explanation for my objects (MyObj1 & MyObj2)
- MyObj1 :

case class MyObj1(
                      id:String,
                      type:String
                  ) 

- MyObj2 :

trait MyObj2 {
  val o_state:Option[String]

  val n_state:Option[String]

  val ch_inf: MyObj1

  val state_updated:MyObj3
}

Any Help please

zero323
  • 322,348
  • 103
  • 959
  • 935
G.Saleh
  • 509
  • 1
  • 11
  • 29
  • Have you imported `spark.implicits._` (where `spark` is the name of your `SparkSession`)? I think that will give you access to an encoder for tuples as well. You may also need to supply encoders for `MyObj` and `MyObj2`. – hoyland Feb 16 '18 at 12:17
  • @hoyland Probably did, otherwise it would be complication error I guess. – tmucha Feb 16 '18 at 12:30
  • yes I imported `spark.implicits._ – G.Saleh Feb 16 '18 at 13:07

2 Answers2

2

Spark doesn't provide Encoder for Iterables, so unless you want to use Encoder.kryo or Encoder.java this won't work.

The closest subclass of Iterable for which Spark provides Encoders is Seq, so this is probably the one you should use here. Otherwise refer to How to store custom objects in Dataset?

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • yes I tried the kryo encoder : `implicit def chargingStatEncoder [A] (implicit cs: ClassTag [A]) = org.apache.spark.sql.Encoders.kryo[A](cs)`. But My dataset is : `Pattern: z: Dataset[(MyObj1, scala.Iterable[MyObj2])]`. How can I implement it ? – G.Saleh Feb 16 '18 at 13:11
1

Try to change declaration to: val y: RDD[(MyObj1, Seq[MyObj2])] and it would work. I checked it for my classes:

case class Key(key: String) {}
case class Value(value: Int) {}

For:

val y: RDD[(Key, Seq[Value])] = sc.parallelize(Map(
  Key("A") -> List(Value(1), Value(2)),
  Key("B") -> List(Value(3), Value(4), Value(5))
).toSeq)

val z = sparkSession.createDataset(y)
z.show()

I got:

+---+---------------+
| _1|             _2|
+---+---------------+
|[A]|     [[1], [2]]|
|[B]|[[3], [4], [5]]|
+---+---------------+

If I change to Iterable I got exception you got.

tmucha
  • 689
  • 1
  • 4
  • 19
  • When I work with seq I get another error because I have **another Objects** in MyObj2 *MyObj1 & MyObj3* – G.Saleh Feb 16 '18 at 15:39