0

has anyone a working example of the dataframe's mapPartitions function?

Please Note: I'm not looking RDD examples.

Update:

The example that has been posted by MasterBuilder if theoretically is ok, but practically has some issue. Please try to get a stream of structured data like a Json

val df = spark.load.json("/user/cloudera/json")
val newDF = df.mapPartitions(
  iterator => {

    val result = iterator.map(data=>{/* do some work with data */}).toList
    //return transformed data
    result.iterator
    //now convert back to df
  }

 ).toDF()

Ends with this error:

<console>:28: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

Is there a way to have this working? What's wrong with the above code?

ozw1z5rd
  • 3,034
  • 3
  • 32
  • 49

2 Answers2

1

You need an encoder. If your final Dataframe has the same schema as the input Dataframe, then it's just as easy as

import org.apache.spark.sql.catalyst.encoders.RowEncoder

implicit val encoder = RowEncoder(df.schema)

If not, you need to "redefine" the schema and create your encoder.

For more info on the encoder issue, refer to Encoder error while trying to map dataframe row to updated row question

Iraj Hedayati
  • 1,478
  • 17
  • 23
-3
 import sqlContext.implicits._

    val newDF = df.mapPartitions(
      iterator => {

        val result = iterator.map(data=>{/* do some work with data */}).toList
        //return transformed data
        result.iterator
        //now convert back to df
      }

).toDF()
Masterbuilder
  • 499
  • 2
  • 12
  • 24