2

We are trying to evaluate if multiprocessing really benefits within Spark framework especially using Pyspark. Currently this setup exists on a single master/slave node EMR cluster.

In trying so, our standalone script works fine processing for example a day's worth of transaction file. We wanted to run this same script for multiple days in PARALLEL. So the expectation is that if a single day's data takes 5 mins to process, then when run in parallel 2 days' worth of data should approximately complete processing in about 5 to 7 mins and not 10 mins.

However we are running into mutliple issues, with errors thrown for dataframe operations like groupBy:-

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/hadoop/./script/spotify/spt_gras_matching_mp.py", line 27, in process_daily_track_file
    isrc_upc_prod_no_df = a_gras_data_df.groupBy("isrc_cd", "upc_cd").agg(max("product_no")).withColumnRenamed("max(product_no)", "product_no")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1268, in groupBy
    jgd = self._jdf.groupBy(self._jcols(*cols))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 998, in _jcols
    return self._jseq(cols, _to_java_column)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 985, in _jseq
    return _to_seq(self.sql_ctx._sc, cols, converter)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/column.py", line 66, in _to_seq
    cols = [converter(c) for c in cols]
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/column.py", line 48, in _to_java_column
    jcol = _create_column_from_name(col)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/column.py", line 41, in _create_column_from_name
    return sc._jvm.functions.col(name)
  File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1559, in __getattr__
    raise Py4JError("{0} does not exist in the JVM".format(name))
Py4JError: functions does not exist in the JVM

Before aiming to resolve the above issues, the basic question that we have is, does parallel processing benefit from developer's side. Is it a redundant activity that we are trying to attempt, which probably Spark is doing for us?

Any suggestions would be highly appreciated.

  • 1
    `multiprocessing` - on the driver not, because [you cannot use it with gateway](https://stackoverflow.com/q/38048068), as you've already learned, on the executors probably not, because it makes more sense to increase number of executors. `multithreading` - on the driver maybe (for async submission in interactive mode), on the executors maybe, depending on the actual code. However if you have completely independent jobs you're almost always better off parameterizing the app, and submit each part separately. – zero323 Sep 17 '18 at 22:51

0 Answers0