1

My understanding of the mechanics of Spark's code distribution toward the nodes running it is merely cursory, and I fail in having my code successfully run within Spark's mapPartitions API when I wish to instantiate a class for each partition, with an argument.

The code below worked perfectly, up until I evolved the class MyWorkerClass to require an argument:

  val result : DataFrame =
    inputDF.as[Foo].mapPartitions(sparkIterator => {

      // (1) initialize heavy class instance once per partition
      val workerClassInstance = MyWorkerClass(bar)

      // (2) provide an iterator using a function from that class instance
      new CloseableIteratorForSparkMapPartitions[Post, Post](sparkIterator, workerClassInstance.recordProcessFunc)
    }

The code above worked perfectly well up to the point in time when I had (or chose) to add a constructor argument to my class MyWorkerClass. The passed argument value turns out as null in the worker, instead of the real value of bar. Somehow the serialization of the argument fails to work as intended.

How would you go about this?


Additional Thoughts/Comments

I'll avoid adding the bulky code of CloseableIteratorForSparkMapPartitions ― it merely provides a Spark friendly iterator and might even not be the most elegant implementation in that.

As I understand it, the constructor argument is not being correctly passed to the Spark worker due to how Spark captures state when serializing stuff to send for execution on the Spark worker. However instantiating the class does seamlessly make heavy-to-load assets included in that class ― normally available to the function provided on the last line of my above code; And the class did seem to instantiate per partition. Which is actually a valid if not key use case for using mapPartitions instead of map.

It's the passing of an argument to its instantiation, that I am having trouble figuring how to enable or work-around. In my case this argument is a value only known after the program started running (even if always invariant throughout a single execution of my job; it's actually a program argument). I do need it passing along for the initialization of the class.

I tried tinkering to solve, by providing a function which instantiates MyWorkerClass with its input argument, rather than directly instantiating as above, but this did not solve matters.

The root symptom of the problem is not any exception, but simply that the value of bar when MyWorkerClass is instantiated will just be null, instead of the actual value of bar which is known in the scope of the code enveloping the code snippet which I included above!

* one related old Spark issue discussion here

matanster
  • 15,072
  • 19
  • 88
  • 167
  • 1
    `val workerClassInstance: MyWorkerClass(bar)` it is a valid initialisation? It shouldn't be `val workerClassInstance: MyWorkerClass = MyWorkerClass(bar)`? – abiratsis Apr 30 '20 at 16:06
  • Typo entered in sterilizing the code .... fixing it now. We can remove both our comments now. – matanster Apr 30 '20 at 16:08
  • 1
    well in the scope of `mapPartitions` Spark is aware of the constructor of `MyWorkerClass` but what about `bar`? Where do you declare `bar`? Also what is exactly the error that you get? – abiratsis Apr 30 '20 at 16:20
  • `MyWorkerClass` is a class imported into the source file, and `bar` is a value declared in the scope directly enveloping the code I included above. – matanster Apr 30 '20 at 16:22
  • so bar is not a custom class? – abiratsis Apr 30 '20 at 16:23
  • It's an `Option[String]` .... and instead of being passed along to the worker, the value `null` is assigned to the option's value during instantiation in the worker. I can try avoiding an `Option` in breaking ways to my clean program design, but would rather learn what's going on, to avoid similar issues just the same! – matanster Apr 30 '20 at 16:30
  • Is Spark confined to only serializing and shipping _static_ objects and values in its processing of `mapPartitions`? how would one transmit a value for the class initialization nonetheless? – matanster Apr 30 '20 at 16:35
  • 1
    I see, I think the problem is that Spark is not aware of the assignment that occurred during the driver code. For example you could try to assign a dummy value to bar before mapPartitions something like `bar = Some("wow")` or even broadcast it :) – abiratsis Apr 30 '20 at 16:38
  • Why would a dummy value won't solve anything? I assume that broadcasting might be involved in a solution, but these comments insofar pretty much only seem to restate the original question ... – matanster Apr 30 '20 at 17:00
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/212877/discussion-between-matanster-and-alexandros-biratsis). – matanster Apr 30 '20 at 17:01
  • 1
    Spark does some cleaning of the closure before sending it to the executors. Perhaps that messes up the value of your Option[T] instance. – Hristo Iliev Apr 30 '20 at 17:07
  • Thanks but can't say I see a path forward emanating there ... – matanster Apr 30 '20 at 17:16
  • @matanster I was taking a look into the code responsible for cleaning up the closures [here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala). In my understanding Spark will read only the fields/members of an object/class. The methods will be ignored, please take a look [here](https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou/22596875#22596875) for an extensive discussion about possible work arounds. – abiratsis May 24 '20 at 10:01
  • 1
    What I would try is to initialise the objects outside the closure and use almost always `val` instead of `var` (this is discussed in the first [link](https://spark-project.atlassian.net/browse/SPARK-729) that you already sent). Finally don't forget to inherit `Serializable` class. And of course if you come to any conclusion please let us know :) – abiratsis May 24 '20 at 10:01

0 Answers0