2

I am looping through a set of large files, and using multiprocessing for manipulation/writing. I create an iterable out of my dataframe and pass it to multiprocessing's map function. The processing is fine for the smaller files, but when I hit the larger ones (~10g) I get the error:

python struct.error: 'i' format requires -2147483648 <= number <= 2147483647

the code:

    data = np.array_split(data, 10)        
    with mp.Pool(processes=5, maxtasksperchild=1) as pool1:
                    pool1.map(write_in_parallel, data)
                    pool1.close()
                    pool1.join()

Based on this answer I thought the problem is the file I am passing to map is too large. So I tried first splitting the dataframe into 1.5g chunks and passing each independently to map, but I am still receiving the same error.

Full traceback:

Traceback (most recent call last):
  File "_FNMA_LLP_dataprep_final.py", line 51, in <module>
    write_files()
  File "_FNMA_LLP_dataprep_final.py", line 29, in write_files
    '.txt')
  File "/DATAPREP/appl/FNMA_LLP/code/FNMA_LLP_functions.py", line 116, in write_dynamic_columns_fannie
    pool1.map(write_in_parallel, first)
  File "/opt/Python364/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/opt/Python364/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/opt/Python364/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/opt/Python364/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/opt/Python364/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
James Steele
  • 645
  • 1
  • 6
  • 22

1 Answers1

4

In the answer you mentioned was also another gist: the data should be loaded by the child function. In your case, it's function write_in_parallel. What I recommend you is to alter your child function in the next way:

def write_in_parallel('/path/to/your/data'):
    """ We'll make an assumption that your data is stored in csv file""" 

    data = pd.read_csv('/path/to/your/data')
    ...

Then your "Pool code" should look like this:

with mp.Pool(processes=(mp.cpu_count() - 1)) as pool:
    chunks = pool.map(write_in_parallel, ('/path/to/your/data',))
df = pd.concat(chunks)

I hope that will help you.

Oleh
  • 61
  • 5