0

task not serializable gives us a Serialization stack. I'm sure this will tell me what the issue is, but I don't know how to read this stack. Can anyone help?

here are the official spark docs for udfs

My failing code:

object Godaddy {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Simple Application")
      .config("spark.driver.extraJavaOptions", "-Dsun.io.serialization.extendedDebugInfo=true")
      .config("spark.executor.extraJavaOptions", "-Dsun.io.serialization.extendedDebugInfo=true")
      .getOrCreate()
    ...
    // I believe it's failing to serialize these udfs?
    val oneWord = udf((domain: String) => {
      if (words.contains(domain)) {
        true
      } else {
        false
      }
    })
    val twoWords = udf((domain: String) => {
      for (i <- 1 to words.size) {
        val head = domain.slice(0,i)
        val tail = domain.slice(i, domain.length)
        if (words.contains(head) && words.contains(tail)) {
          return true
        }
      }
      false
    })
    spark.udf.register("oneWord", oneWord)
    spark.udf.register("twoWords", twoWords)
    val oneWordCsv = onlyLetters
      .filter(oneWord(col("Domain")))
      .filter(col("tld") === "com")
      .filter(col("Backlinks").isNotNull)
      .withColumn("Backlinks", col("Backlinks").cast("int"))
      .filter(col("Backlinks") > 0)
      .sort(col("Backlinks").desc)

    val twoWordsCsv = onlyLetters
      .filter(twoWords(col("Domain")))
      .filter(col("tld") === "com")
      .filter(col("Backlinks").isNotNull)
      .withColumn("Backlinks", col("Backlinks").cast("int"))
      .filter(col("Backlinks") > 0)
      .sort(col("Backlinks").desc)

    oneWordCsv.coalesce(1)
      .write.mode(SaveMode.Overwrite).json("oneWord.json")
    twoWordsCsv.coalesce(1)
      .write.mode(SaveMode.Overwrite).json("twoWords.json")


    spark.stop()
  }
}

error:

22/11/18 01:22:31 ERROR FileFormatWriter: Aborting job 9c85a949-c16a-4ce0-b3db-04045fc54701.
org.apache.spark.SparkException: Task not serializable
        ...
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: java.lang.Object@757b8c47)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 2)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class langenderfer.Godaddy$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic langenderfer/Godaddy$.$anonfun$main$3$adapted:(Lscala/collection/immutable/Map;Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=2])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class langenderfer.Godaddy$$$Lambda$3114/0x0000000801321040, langenderfer.Godaddy$$$Lambda$3114/0x0000000801321040@4a9d0c6f)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 4)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=4])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3117/0x0000000801323840, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3117/0x0000000801323840@345f429)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(split(input[1, string, false], \., -1)[0]))
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 2)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$catalystConverter$3:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3119/0x0000000801325040, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3119/0x0000000801325040@2c9542eb)
        - element of array (index: 1)
        - array (class [Lscala.Function1;, size 2)
        - element of array (index: 2)
        - array (class [Ljava.lang.Object;, size 8)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 3)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2225/0x0000000800fdf040, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2225/0x0000000800fdf040@661fe6c4)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
        ... 92 more
Exception in thread "main" org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:763)
        at langenderfer.Godaddy$.main(Godaddy.scala:66)
        at langenderfer.Godaddy.main(Godaddy.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2477)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:753)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
        at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
        at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:350)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:186)
        ... 44 more
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: java.lang.Object@757b8c47)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 2)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class langenderfer.Godaddy$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic langenderfer/Godaddy$.$anonfun$main$3$adapted:(Lscala/collection/immutable/Map;Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=2])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class langenderfer.Godaddy$$$Lambda$3114/0x0000000801321040, langenderfer.Godaddy$$$Lambda$3114/0x0000000801321040@4a9d0c6f)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 4)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=4])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3117/0x0000000801323840, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3117/0x0000000801323840@345f429)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(split(input[1, string, false], \., -1)[0]))
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 2)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$catalystConverter$3:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3119/0x0000000801325040, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$3119/0x0000000801325040@2c9542eb)
        - element of array (index: 1)
        - array (class [Lscala.Function1;, size 2)
        - element of array (index: 2)
        - array (class [Ljava.lang.Object;, size 8)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 3)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2225/0x0000000800fdf040, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2225/0x0000000800fdf040@661fe6c4)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
        ... 92 more

here's some more text to get around the silly "not enough details" error from SO.

github pr that added serialization debugger

useful stack trace info

Geoff Langenderfer
  • 746
  • 1
  • 9
  • 21
  • 3
    Hmm I'm wondering what that `words` object is that you're doing `words.contains()` and `words.size` on. For each serialized task you will need to ship over that entire object. Could that be the object that is not serializable? – Koedlt Nov 18 '22 at 10:17
  • [I think you are right](https://stackoverflow.com/questions/32900862/map-can-not-be-serializable-in-scala). Looks like it's a known scala bug. – Geoff Langenderfer Nov 18 '22 at 10:36
  • @GeoffLangenderfer It’s not actually a bug, the answer of the link you provided only mentions one part of the actual thing that is happening. That’s because doing operations such as mapValues on a Map object, returns a view over the map, which is not serializable. You can use .toMap or something on the view to make it a Map object again. – AminMal Nov 18 '22 at 14:09
  • 1
    Also, if(words contains domain) true else false is redundant, words contains domain would be enough :) – AminMal Nov 18 '22 at 14:10

0 Answers0