3
import numpy as np
import pandas as pd
from multiprocessing import Pool
import threading

#Load the data
df = pd.read_csv('crsp_short.csv', low_memory=False)

def funk(date):
    ...
    # for each date in df.date.unique() do stuff which gives sample dataframe
    # as an output
    #then write it to file

    sample.to_csv('crsp_full.csv', mode='a')

def evaluation(f_list):
    with futures.ProcessPoolExecutor() as pool:
        return pool.map(funk, f_list)

# list_s is a list of dates I want to calculate function funk for   

evaluation(list_s)

I get a csv file as an output with some of the lines messed up because python is writing some pieces from different threads at the same time. I guess I need to use Queues, but I was not able to modify the code so that it worked. Ideas how to do it?Otherwise it takes ages to get the results.

Anna Ignashkina
  • 467
  • 4
  • 16
  • 1
    You are correct there is some multithreading issues going on here, writing to a csv in a multithreaded env is not atomic (specifically using `to_csv` here). Think of the `funk` method as a "publisher" rather than as a publish + writer. Rearchitect to have a "app queue", have your worker function `funk` publish its "sample" to the queue, and have another thread "consume" that queue, pulling something from the queue each time a sample is written and having that thread be the only thing writing to the csv – Selecsosi Dec 27 '18 at 20:17
  • 1
    ^ Not the greatest article but a start https://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Producer_Consumer_using_Queue.php – Selecsosi Dec 27 '18 at 20:18
  • 1
    Also last comment, you don't need to do the stuff in that article about "producer thread" since you are already doing that, but it is helpful to have consistent design around producers and consumers – Selecsosi Dec 27 '18 at 20:19

1 Answers1

3

That solved the problem (Pool does the queue for you)

Python: Writing to a single file with queue while using multiprocessing Pool

My version of the code that didn't mess up the output csv file:

import numpy as np
import pandas as pd
from multiprocessing import Pool
import threading

#Load the data
df = pd.read_csv('crsp_short.csv', low_memory=False)

def funk(date):
    ...
    # for each date in df.date.unique() do stuff which gives sample dataframe
    # as an output

    return sample

# list_s is a list of dates I want to calculate function funk for   

def mp_handler():
# 28 is a number of processes I want to run
    p = multiprocessing.Pool(28)
    for result in p.imap(funk, list_s):
        result.to_csv('crsp_full.csv', mode='a')


if __name__=='__main__':
    mp_handler()
Anna Ignashkina
  • 467
  • 4
  • 16