1

Looking at an example of using a pandas UDF in Spark (pySpark). The link to this snippet: HERE.

In this example, a model that was previously fit with scikit learn (on the driver node) scores new data that is distributed.

My question about Pandas UDF on Spark is if any library( e.g. scikit) or object ("model") used in the UDF automatically gets passed to the worker nodes? It seems like it from this example.

If one does need to supply the library and object, how is that done?

# define a schema for the result set, the user ID and model prediction
schema = StructType([StructField('user_id', LongType(), True),
                     StructField('prediction', DoubleType(), True)])  

# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_pd):

    # run the model on the partitioned data set 
    ids = sample_df['user_id']
    x_train = sample_df.drop(['label', 'user_id', 'partition_id'], axis=1)
    pred = model.predict_proba(x_train)

    return pd.DataFrame({'user_id': ids, 'prediction': pred[:,1]})

# partition the data and run the UDF  
results = spark_df.groupby('partition_id').apply(apply_model)
display(results)
B_Miner
  • 1,840
  • 4
  • 31
  • 66

1 Answers1

2

If a user defined function is referencing objects outside of it`s scope, spark will automatically try to pickle those objects and ship them with each parallel task.

Multiple spark tasks can run in parallel on one machine on a separate core.

Check the doc about shared variables.

These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.

There is also an option to broadcast the variable to each node, so that it exist only once per executor jvm.

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

dre-hh
  • 7,840
  • 2
  • 33
  • 44
  • Great! How about libraries though? They are somehow packaged up for the worker nodes? – B_Miner Dec 21 '19 at 02:49
  • 1
    python packages either need to be installed on each worker. or they can be submitted with `spark-submit --py-files` argument. https://stackoverflow.com/questions/36461054/i-cant-seem-to-get-py-files-on-spark-to-work It can be done with default tools as described above. But there is also `venv-pack`, `conda-pack` and `pex` – dre-hh Dec 21 '19 at 09:44