0

I would like to run concurrently a simple function that writes the output of a process into a txt.file and then stores it to DBFS (Databricks filesystem). In my example I use both the ThreadPoolExecutor class() and the ProcessPoolExecutor class() although the ThreadPoolExecutor class runs successfully while the second class generates a pickling error. I would like to run my function with both classes. How can I resolve the PicklingError?

Please find below the code I run to replicate my issue,

If you run it locally and not in a databricks cluster

from pyspark.sql import SparkSession

spark =  SparkSession.builder.appName("test").getOrCreate()

sc = spark.sparkContext

Create spark df and arguments

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
from itertools import cycle
from datetime import datetime, timedelta
import time
import os
import pandas as pd

date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)

pandas_df = pd.DataFrame({  'id' : ['001', '001', '001', '001', '001', '002', '002', '002', '002', '002'],
                            'PoweredOn':[0, 0, 0, 1, 0, 0, 0, 1, 0, 0]
                        })
spark_df=spark.createDataFrame(pandas_df)

device_ids=list(pandas_df['id'].unique())
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)

Approach 1 | Using ThreadPoolExecutor class - Works perfectly

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

if __name__ == "__main__":
    
    #main function
    def testing_function_map(iterables_tuple):

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()

        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()

        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])

        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")

        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))
    
    #wait function
    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)
    
    executor = ThreadPoolExecutor(max_workers=2)
#     executor = ProcessPoolExecutor(max_workers=2)

    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
    
    list(executor.map(wait_on_device, tasks))

Approach 2 | Using ProcessPoolExecutor class - Generates pickling Error for the wait_on_device() function

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

if __name__ == "__main__":

    def testing_function_map(iterables_tuple):

        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()

        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()

        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])

        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")

        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))

    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)
    
#     executor = ThreadPoolExecutor(max_workers=2)
    executor = ProcessPoolExecutor(max_workers=2)

    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
    
    list(executor.map(wait_on_device, tasks))

With the ProcessPoolExecutor class I get a PicklingError: enter image description here

In general testing this application of the ProcessPoolExecutor, it keeps giving me a pickle Error on the function wait_on_device()

How can I resolve the pickling error? I have search for various approaches like making a global call of the main function using a class or by creating a function with the import copyreg as copy_reg although none of them could resolve my problem, probably because I don't create them correctly.

My approach so far
As presented here by @Steven Bethard

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    
if __name__ == "__main__":

# The rest of my code already presented above

But the PicklingError still exists.

[UPDATE]---The above PicklingError generated when I run the code on Databricks...Running the same code locally on my machine in Jupyter Notebook I got the following error with the ProcessPoolExecutor only,

enter image description here enter image description here enter image description here

Other related questions I have search yet couldn't apply their solutions. Related question 1
Related question 2
Related question 3
Related question 4

NikSp
  • 1,262
  • 2
  • 19
  • 42
  • @tdelaney I would really appreciate your input on this question! – NikSp Jun 17 '20 at 13:34
  • 1
    Thread executor does not need any pickling 'cos all the variables are shared, right? So let's look at the process executor. If you could provide MRE (https://stackoverflow.com/help/minimal-reproducible-example) I'd debug it locally may be. Currently it gets too many dependencies like dbutils and more... It feels like the problem is in the pickling your zipped arguments. Try exclude them one after one, find the problematic(s) and serialize them manually. – LiMar Jun 17 '20 at 14:05
  • @LiMar I would like to pickle only the ProcessPoolExecutor. As you can see ThreadPoolExecutor runs successfully, but ProcessPoolExecutor doesn't due to the PicklingError. If you copy paste my code above you will successfully replicate my issue. Why you take dependencies errors since I use Python packages. It would help if you have spark locally on somewhere in the cloud since I use Spark Code also. – NikSp Jun 17 '20 at 14:13
  • @LiMar One of my argumernts is a spark Dataframe and I guess there might be a problem when pickling a Spark Dataframe....I will check it out – NikSp Jun 17 '20 at 14:20
  • 1
    Code above won't run because dbutils is undefined... But this is not where the code fails so... Try excluding the arguments or provide MRE. – LiMar Jun 17 '20 at 14:25
  • @LiMar sorry for the dbutils....It's a Databricks File System command to save files there...let me change this to a local file command – NikSp Jun 17 '20 at 14:26
  • same regarding spark – LiMar Jun 17 '20 at 14:28
  • @LiMar check my update on the code example...I composed it locally – NikSp Jun 17 '20 at 14:53
  • @LiMar please write in the comments if the spark implementation bothers you in order to change it to Pandas implementation. I am using spark df because this is my real case scenario. – NikSp Jun 17 '20 at 15:19
  • @LiMar I really need your input on this because literally 3 days now I cannot find a solution on how to make ProcessPoolExecutor running....ThreadPoolExecutor runs fine but I want the ProcessPool to run – NikSp Jun 17 '20 at 17:22
  • @LiMar as you can see from my errors the function wait_on_device(), which inherits the main function testing_function_map(), is not picklable...Which I don't know why. I followed your recommendation to check one argument at a time but still the function wait_on_device() is not pickling. – NikSp Jun 18 '20 at 06:25
  • unfortunately I'm neither familiar with spark nor posess its environment. Cannot help with it, sorry. – LiMar Jun 18 '20 at 10:57
  • @LiMar Ok, ty in advance for the willing to help :) – NikSp Jun 18 '20 at 11:04

0 Answers0