1

I have a science task with a large (~10 GB) file of data. I need to do some processing on each line of this file separately and print results in different file. Since processing is heavy and I have an access to a cluster, I would like to do it parallel. I use multiprocessing, I created process to read file, process to write and working process:

def GetData(filename,in_queue):
    with open(filename, "r") as lines:
        for line in lines:
            in_queue.put(line,block = True)

def WriteData(file,out_queue):
    while True:
        file.write(out_queue.get(block=True))
        out_queue.task_done()

def ProcessData(in_queue,out_queue):
    while True:
        line = in_queue.get(block=True)
        #processing
        in_queue.task_done()

And I set it up like this:

work = JoinableQueue(maxsize = 250)
results = JoinableQueue(maxsize = 250)

g = multiprocessing.Process(target = GetData, args = (FileName,work))
g.daemon = False
g.start()

w = multiprocessing.Process(target = WriteData, args = (f_out,results))
w.daemon = True
w.start()

for i in range(2):
    t = multiprocessing.Process(target = ProcessData, args = (work,results))
    t.daemon = True
    t.start()

work.join()
results.join()

But I get an error:

TypeError: cannot serialize '_io.TextIOWrapper' object

After some googling I guess the problem is that open object can not be pickled. But how do I sidestep it?

  • 4
    Multiprocessing is not going to help you with I/O. Reading one file with several processes will instead result in a lot of seeks and slow everything down. You should read in one process and do the processing in multiple processes. There are also systems like Celery for distributed processing of tasks. – Klaus D. Mar 14 '18 at 13:49
  • As Klaus wrote, you are doing this wrong. Read only once and parallelise processing. This is where you can expect performance boost from multiprocessing. Just out of curiosity I ran your code and it works fine without exceptions. Which line causes the exception? And I assume your FileName is just a string with a path to your source file, not a file object. – Hannu Mar 14 '18 at 13:56
  • I made 1 process that reads file and several processes that do the processing. Exception is caused on line 'g.start()'. I run this on Windows if this matters. And yes, FileName is just a string. – Sergei Dulikov Mar 14 '18 at 14:38
  • I edited the question to make it more clear. After I removed the GetData function and copied its body to main after initializations of all processes, I keep getting the same error but now at line 'w.start()'. – Sergei Dulikov Mar 14 '18 at 14:58
  • This could be a Windows related issue. Is your code snippet perhaps part of a larger program? https://stackoverflow.com/questions/26249442/can-i-use-multiprocessing-pool-in-a-method-of-a-class and https://stackoverflow.com/questions/30081961/multiprocessing-works-in-ubuntu-doesnt-in-windows – Hannu Mar 14 '18 at 15:30
  • It's basically the whole program. Just some constants, imports and the processing itself I left aside. Also I got 'if __name__ == "__main__":' before the main body of the program. – Sergei Dulikov Mar 14 '18 at 16:27

0 Answers0