The usecase is the following:
I have a large dataframe, with a 'user_id' column in it (every user_id can appear in many rows). I have a list of users my_users which I need to analyse.
Groupby, filter and aggregate could be a good idea, but the available aggregation functions included in pyspark did not fit my needs. In the pyspark ver, user defined aggregation functions are still not fully supported and I decided to leave it for now..
Instead, I simply iterate the my_users list, filter each user in the dataframe, and analyse. In order to optimize this procedure, I decided to use python multiprocessing pool, for each user in my_users
The function that does the analysis (and passed to the pool) takes two arguments: the user_id, and a path to the main dataframe, on which I perform all the computations (PARQUET format). In the method, I load the dataframe, and work on it (DataFrame can't be passed as an argument itself)
I get all sorts of weird errors, on some of the processes (different in each run), that look like:
- PythonUtils does not exist in the JVM (when reading the 'parquet' dataframe)
- KeyError: 'c' not found (also, when reading the 'parquet' dataframe. What is 'c' anyway??)
When I run it without any multiprocessing, everything runs smooth, but slow..
Any ideas where these errors are coming from?
I'll put some code sample just to make things clearer:
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
# ....
def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here
def user_worker_wrapper(args):
users_worker(*args)
def analyse():
# ...
users_worker_args = [(df_path, user_id) for user_id in my_users]
users_pool = Pool(processes=len(my_users))
users_pool.map(users_worker_wrapper, users_worker_args)
users_pool.close()
users_pool.join()