3

I am trying to measure the performance impact on having to copy a dataframe from scala to python and back in a large pipeline. For that purpose I have created this rather artificial transformer:

from pyspark.ml.pipeline import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import keyword_only
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

import random

class RandomColAdderTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__self(self, inputCol=None, outputCol=None, bogusarg=None):
        super(RandomColAdderTransformer, self).__init__()
        self.bogusarg = None
        self._setDefault(bogusarg=set())
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        cur_col = self.getInputCol()
        def randGet(col): # UDF crashes with no arguments
            a = col*random.random() # Ensure we are reading and copying to python space 
            return a            # It runs only once?

        sparktype = FloatType()
        return dataset.withColumn("randFloat", udf(randGet, sparktype)(cur_col))

The goal of this transformer is to ensure that there are some numbers which are generated from python, it accesses the dataframe and does a multiplication (in python) and then for the next stage of the pipeline it will have to add a column to the dataframe

However I am having some weirdness. When testing my code the same random number is generated for all columns:

df = sqlContext.createDataFrame([(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
myTestTransformer = RandomColAdderTransformer()
myTestTransformer.setInputCol("x3")
transformedDF = myTestTransformer.transform(df)
transformedDF.show()

+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+

And then consecutive invocations of transformedDF.show() actually change the values!?

transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [3]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0|  2.9191132|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [4]: transformedDF.show()
+---+---+-----+-----------+
| x1| x2|   x3|  randFloat|
+---+---+-----+-----------+
|  1|  a| 23.0| 0.95878977|
|  3|  B|-23.0|-0.95878977|
+---+---+-----+-----------+


In [5]: transformedDF.show()
+---+---+-----+----------+
| x1| x2|   x3| randFloat|
+---+---+-----+----------+
|  1|  a| 23.0| 16.033003|
|  3|  B|-23.0|-2.9191132|
+---+---+-----+----------+

Is this behavior expected? Does .show() actually trigger the computation start? AFAIK I should be using a single node, sure they would run in a single thread so they would be sharing the random seed? I know a builtin pyspark rng exists, but it is not suitable for my purpose as it wouldn't actually be generating the data from python space.

zero323
  • 322,348
  • 103
  • 959
  • 935
XapaJIaMnu
  • 1,408
  • 3
  • 12
  • 28
  • What kind of answer do you expect here? Explanation what is going on or a full working replacement for a given code? – zero323 Jun 21 '16 at 15:04
  • Explanation for what's going on. AFAIK the code does what I want it to do (a value from python space gets multiplied by a value from a `dataframe` and then you append the column to the `dataframe` to process it further. Correct me if I am wrong, I am trying to understand what's happening and make sure it will force py4j to copy data to the jvm and back. – XapaJIaMnu Jun 21 '16 at 15:09
  • You could use identity (`lambda x: x`) and it would work just fine as well if your only goal is moving data. – zero323 Jun 21 '16 at 15:37
  • @zero323 I wasn't sure if that would actually do anything as I think evalution is happening on demand, but thank you for the explanation. – XapaJIaMnu Jun 21 '16 at 18:21
  • If it is required for downstream processing it will be executed independent of version. In 1.x it is computed even if it is not used at all. In general you should consider not only data movement cost. There is significant impact on GC as well. Not to mention that Python UDFs are particularly bad for execution plan. – zero323 Jun 21 '16 at 18:24
  • If the solution provided answers your question, please accept it to close the issue ! – eliasah Jun 22 '16 at 05:52
  • @zero323 this is precisely the worst case scenario that I want to benchmark: UDF in python space. – XapaJIaMnu Jun 22 '16 at 08:48
  • @eliasah I have yet to try it. I will accept the answer when I am done with it. – XapaJIaMnu Jun 22 '16 at 08:50

1 Answers1

5

Well, expected is rather relative here but it is not something that cannot be explained. In particular the state of the RNG is inherited from the parent process. You can easily prove that by running following simple snippet in the local mode:

import random 

def roll_and_get_state(*args):
    random.random()
    return [random.getstate()]

states = sc.parallelize([], 10).mapPartitions(roll_and_get_state).collect()
len(set(states))
## 1

As you can see each partition has is using its own RNG but all have the same state.

In general ensuring correct Python RNG behavior in Spark without a serious performance penalty, especially if you need reproducible results, is rather tricky.

One possible approach is to instantiate separate Random instance per partition with seed generated using cryptographically safe random data (os.urandom).

If you need reproducible results you can generate RNG seeds based on global state and partition data. Unfortunately this information is not easily accessible on runtime from Python (ignoring special cases like mapPartitionsWithIndex).

Since partition level operations are not always applicably (like in case of UDF) you can achieve similar result by using singleton module or Borg pattern to initialize RNG for each executor.

See also:

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you for the detailed answer. – XapaJIaMnu Jun 21 '16 at 18:21
  • You're welcome. If it helpful please don't forget to upvote / accept :) – zero323 Jun 21 '16 at 18:34
  • Can you also tell me why the `show()` calls later produce different results sometimes and sometimes the same? Isn't the dataframe supposed to be transformed already? – XapaJIaMnu Jun 21 '16 at 20:17
  • If data is not cached then everything is computed from scratch every time you execute an action. Also, as far as I remember, show returns arbitrary set of rows. – zero323 Jun 21 '16 at 21:15
  • in this case I have only 2 rows. I would understand it being recomputed every time, but why would I get sometimes different random numbers? – XapaJIaMnu Jun 22 '16 at 08:51
  • I won't give an answer for that. I cannot reproduce this. You'll have to look at the interpreter lifecycle I guess but this stuff is not really documented and changed significantly in time. It will also differ between configurations and modes. – zero323 Jun 22 '16 at 19:45
  • Maybe I don't understand what should I look for. Do you use local mode or any specific cluster manager? – zero323 Jun 23 '16 at 13:44
  • I use some inhouse version of spark 2.10. I can't more, but it's not running on a cluster but on a local machine (1 node) – XapaJIaMnu Jun 23 '16 at 15:05