0

Below is my code and when I try to iterate through each row:

val df: DataFrame = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", true) // Use first line of all files as header
  .option("delimiter", TILDE)
  .option("inferSchema", "true") // Automatically infer data types
  .load(fileName._2)

val accGrpCountsIds: DataFrame = df.groupBy("accgrpid").count()
LOGGER.info(s"DataFrame Count - ${accGrpCountsIds.count()}")
accGrpCountsIds.show(3)

//switch based on file names and update the model.
accGrpCountsIds.foreach(accGrpRow => {
  val accGrpId = accGrpRow.getLong(0)
  val rowCount = accGrpRow.getInt(1)
}

When I try to interate through the dataframe above using foreach, I get an task not serializable error. How can I do this?

Shaido
  • 27,497
  • 23
  • 70
  • 73
user3897533
  • 417
  • 1
  • 8
  • 24

1 Answers1

0

Do you have any other types in your foreach that you didn't share? or that's all you do and it doesn't work?

accGrpCountsIds.foreach(accGrpRow => {
  val accGrpId = accGrpRow.getLong(0)
  val rowCount = accGrpRow.getInt(1)
}

Also, you may find that useful? Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

  • Thanks , this fixed the problem. I am using foreach like above - val accGrpCountsIds = df.groupBy("accgrpid").count().collect() – user3897533 Jun 26 '19 at 17:43
  • That's good news! just one thing. .collect() will collect (who would've thought!) all the data into one executor, which, depending on the amount of data you're dealing with, may cause a OOM exception. – Skycaptain Jun 27 '19 at 07:41