0

I can't using pandas.DataFrame.itertuples inside a process pool.

Take a look at this code:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator

class Data:
    def __init__(self, *args, **kwargs) -> None:
        print(f'args: {args}\nkwargs: {kwargs}\n', end='')

    @classmethod
    def from_iterrows(cls, *args, **kwargs) -> 'Data':
        *args, (index, series) = args
        return cls(*args, **kwargs, params=series.to_dict())

    @classmethod
    def from_itertuples(cls, *args, **kwargs) -> 'Data':
        *args, named_tuple = args
        return cls(*args, **kwargs, params=named_tuple._asdict())

def process_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
    with ProcessPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

def thread_pool(function: Callable, iterable: Iterable, workers: int, /, *args, **kwargs) -> Iterator[Any]:
    with ThreadPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

df = pd.DataFrame(data={'id': [1, 2],
                    'param1': [11, 111],
                    'param2': [22, 222],
                    'param3': [33, 333]})
print(df)

#    id  param1  param2  param3
# 0   1      11      22      33
# 1   2     111     222     333

from_iterrows using thread_pool:

thread_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'id': 1, 'param1': 11, 'param2': 22, 'param3': 33}}
# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'id': 2, 'param1': 111, 'param2': 222, 'param3': 333}}

from_iterrows using process_pool:

process_pool(Data.from_iterrows, df.iterrows(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'id': 1, 'param1': 11, 'param2': 22, 'param3': 33}}
# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'id': 2, 'param1': 111, 'param2': 222, 'param3': 333}}

from_itertuples using thread_pool:

thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)

# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'Index': 0, 'id': 1, 'param1': 11, 'param2': 22, 'param3': 33}}
# args: (100, 200, 300)
# kwargs: {'a': 10, 'b': 20, 'c': 30, 'params': {'Index': 1, 'id': 2, 'param1': 111, 'param2': 222, 'param3': 333}}

from_itertuples using process_pool:

process_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)

# print nothing...

Someone can explain me why that happening?

Idan Hazan
  • 116
  • 9

1 Answers1

0

The problem can be demonstrated in the following program:

import pandas as pd
from multiprocessing import Process

def worker(p):
    pass

# Required for Windows:
if __name__ == '__main__':
    df = pd.DataFrame(data={'id': [1, 2],
                        'param1': [11, 111],
                        'param2': [22, 222],
                        'param3': [33, 333]})

    for name in [None, 'Pandas']:
        print('Using name:', 'None' if name is None else name)
        it = df.itertuples(name=name)
        print(type(it))
        # convert to a list:
        l = list(it)
        print(type(l[0]), l[0])
        p = Process(target=worker, args=(l[0],))
        p.start()
        p.join()
        print('Process successfully ended.')
        print()

Prints:

Using name: None
<class 'zip'>
<class 'tuple'> (0, 1, 11, 22, 33)
Process successfully ended.

Using name: Pandas
<class 'map'>
<class 'pandas.core.frame.Pandas'> Pandas(Index=0, id=1, param1=11, param2=22, param3=33)
Traceback (most recent call last):
  File "C:\Ron\test\test.py", line 22, in <module>
    p.start()
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed

C:\Ron\test>Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\spawn.py", line 102, in spawn_main
    source_process = _winapi.OpenProcess(
OSError: [WinError 87] The parameter is incorrect

So depending on whether name=None is specified or not for df.itertuples (the default is name='Pandas'), the class that gets generated varies. The key line in the above exception is:

_pickle.PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed

Now as to why when using concurrent.futures.ProcessPoolExecutor the exception is not being printed, I cannot say. But if you switch to using multiprocessing.pool.Pool, you will certainly get the pickle error message.

You can get this to work, more or less, by specifying the name=None argument for df.itertuples. But then you will not be able to use the _asdict method on the resulting named_tuple variable as it will now just be of type tuple:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Iterable, Iterator
import pandas as pd

class Data:
    def __init__(self, *args, **kwargs) -> None:
        print(f'args: {args}\nkwargs: {kwargs}\n', end='')

    @classmethod
    def from_iterrows(cls, *args, **kwargs) -> 'Data':
        *args, (index, series) = args
        return cls(*args, **kwargs, params=series.to_dict())

    @classmethod
    def from_itertuples(cls, *args, **kwargs) -> 'Data':
        *args, named_tuple = args
        return cls(*args, **kwargs)

def process_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
    with ProcessPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

def thread_pool(function: Callable, iterable: Iterable, workers: int, *args, **kwargs) -> Iterator[Any]:
    with ThreadPoolExecutor(workers) as executor:
        return executor.map(partial(function, *args, **kwargs), iterable)

# Required for Windows:
if __name__ == '__main__':

    df = pd.DataFrame(data={'id': [1, 2],
                        'param1': [11, 111],
                        'param2': [22, 222],
                        'param3': [33, 333]})
    print(df)

    print(list(thread_pool(Data.from_itertuples, df.itertuples(), 2, 100, 200, 300, a=10, b=20, c=30)))
    print(list(process_pool(Data.from_itertuples, df.itertuples(name=None), 2, 100, 200, 300, a=10, b=20, c=30)))

Prints:

   id  param1  param2  param3
0   1      11      22      33
1   2     111     222     333
args: (100, 200, 300)
kwargs: {'a': 10, 'b': 20, 'c': 30}
args: (100, 200, 300)
kwargs: {'a': 10, 'b': 20, 'c': 30}
[<__main__.Data object at 0x0000026CFC34A6D0>, <__main__.Data object at 0x0000026CFC34A0A0>]
args: (100, 200, 300)
kwargs: {'a': 10, 'b': 20, 'c': 30}
args: (100, 200, 300)
kwargs: {'a': 10, 'b': 20, 'c': 30}
[<__main__.Data object at 0x0000026CFC3746D0>, <__main__.Data object at 0x0000026CFC374550>]

This post may be of interest, but not very helpful.

Booboo
  • 38,656
  • 3
  • 37
  • 60