0

I'm sorry but i need to ask a question again. I hope this one is not duplicated. I edited the last one, but I think nobody saw the edited version. This is a short example of the problem:

val spark = SparkSession
.builder()
.appName("test")
.getOrCreate()

val field = StructField("1", BooleanType, false)
val schema = StructType(field::Nil)
val rowRDD = spark.sparkContext.parallelize(Array(Row(true),Row(false)))
val df = spark.createDataFrame(rowRDD, schema)

val new_df = //Add hundred of new columns

//here is the error
val df_2 = new_df.flatMap(row => if(test(row)) row::Nil else Nil)

The error:

error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

What I want to do is, to modify each row. In this case I know, that there is only 1 column and I could handle it like Encoder error while trying to map dataframe row to updated row. But how can I solve the Problem if I've hundred of columns? I want to remove some rows if they do not satisfy a condition. At the moment I use:

val df_2 = new_df.rdd.flatMap(row => if(test(row)) row::Nil else Nil)

But i dont think, that this is the best solution. I also run in a StackoverflowError:

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

TY for help :)

Community
  • 1
  • 1
Vitali D.
  • 149
  • 2
  • 14
  • I want to use DF because i need the schema. Is there a map-similar function for DF? I want to delete or extend a Row in a DF for some condition. – Vitali D. Dec 06 '16 at 14:39

1 Answers1

0

The withColumn() option of adding new column will work on entire data set. And if you have more columns, it makes things even worse. You can use Spark SQL and have a query in SQL style to add new columns. This will need more sql skills than just spark. And with 100 columns, may be the maintenance would be tough.

You can follow another approach.

You can convert an rdd into dataframe. Then use map on the data frame and process each row as you wish. Inside map method,

a. you can gather new values based on the calculations

b. Add these new column values to main rdd as below

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

Here row, is the reference of row in map method

c. Create new schema as below

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d. Add to the old schema

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. Create new dataframe with new columns

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)
Ramzy
  • 6,948
  • 6
  • 18
  • 30
  • Thnx for your answer, but how can I use map() on DataFrame with a lot of columns? I'm getting the error above. All my columns are booleans. – Vitali D. Dec 05 '16 at 18:03
  • You question referred to adding new and more columns to existing dataframe. So the above steps would help – Ramzy Dec 05 '16 at 19:34
  • The question was, how to use map() with hundred of columns without getting the error? – Vitali D. Dec 05 '16 at 21:27