I'm sorry but i need to ask a question again. I hope this one is not duplicated. I edited the last one, but I think nobody saw the edited version. This is a short example of the problem:
val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()
val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)
val new_df = //Add hundred of new columns
//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)
The error:
error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
What I want to do is, to modify each row. In this case I know, that there is only 1 column and I could handle it like Encoder error while trying to map dataframe row to updated row. But how can I solve the Problem if I've hundred of columns? I want to remove some rows if they do not satisfy a condition. At the moment I use:
val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)
But i dont think, that this is the best solution. I also run in a StackoverflowError:
Exception in thread "main" java.lang.StackOverflowError
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
TY for help :)