-1

Moving from Spark 1.6 to Spark 2.2* has brought the error “error: Unable to find encoder for type stored in a 'Dataset'. Primitive types (Int, String, etc)” when trying to apply a method to a dataset returned from querying a parquet table. I have oversimplified my code to demonstrate the same error. The code queries a parquet file to return the following datatype: 'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' I apply a function to extract a string and integer , returning a string. Returning the following datatype: Array[String] Next, I need to perform extensive manipulations requiring a separate function. In this test function, I try to append a string producing the same error as my detailed example. I have tried some encoder examples and use of the ‘case’ but have not come up with a workable solution. Any suggestions/ examples would be appreciated

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

To distill the problem further, i believe this scenario (though I have not tried all possible solutions to) can be simplified further to the following code:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }
  • 1
    Your function `dd()` is returning Unit for `d2.map{s=> dd(s) }` – koiralo Mar 26 '20 at 11:23
  • To simplify further I am trying to map a function to the output of my query (dataset). This style of code worked perfect under spark 1.6. I had tried a number of the techniques mentioned in the following post: https://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row including use of 'case' and 'getAs' to no avail. – John Daniels Mar 27 '20 at 14:52
  • 1
    hello koiralo: you are right i fixed by being more explicit by def f(s: String): String = in my function definition to resolve, though the key was trying test.rdd.map. – John Daniels Mar 27 '20 at 21:14

2 Answers2

0

I have a solution at least to my simplified Problem (below). I will be testing more....

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 | val r = s + "hi"
 | return r
 | }
f: (s: String)String

scala> var test2 = test.rdd.map{ s => f(s) }
test2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:43

scala> test2.take(1)
res9: Array[String] = Array(just some wordshi)
  • current attempts do not scale to my application, resulting in a related error: org.apache.spark.SparkException: Task not serializable. If I take a subset of the data and perform an explicit toDF() it will work, but not with a complete data set – John Daniels Apr 30 '20 at 15:27
0

The first solution does not work on my initial (production) data set, rather producing the error "org.apache.spark.SparkException: Task not serializable" (interestingly though both stored as the same data type (org.apache.spark.sql.Dataset[String] = [value: string]) which I believe to be related. I included yet another solution to my test data set that eliminates the initial Encoder error and as shown actually works on my toy problem, does not ramp to a production data set. A bit confused as to exactly why my application is sidelined in the movement from 1.6 to 2.3 version spark as I didn't have to make any special accommodations to my application for years and have run it successfully for calculations that most likely count in the trillions. Other explorations have included wrapping my method as Serializable, explorations of the @transient keyword, leveraging the "org.apache.spark.serializer.KryoSerializer", writing my methods as functions and changing all vars to 'vals' (following related posts on 'stack').

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

scala>