I have a science task with a large (~10 GB) file of data. I need to do some processing on each line of this file separately and print results in different file. Since processing is heavy and I have an access to a cluster, I would like to do it parallel. I use multiprocessing, I created process to read file, process to write and working process:
def GetData(filename,in_queue):
with open(filename, "r") as lines:
for line in lines:
in_queue.put(line,block = True)
def WriteData(file,out_queue):
while True:
file.write(out_queue.get(block=True))
out_queue.task_done()
def ProcessData(in_queue,out_queue):
while True:
line = in_queue.get(block=True)
#processing
in_queue.task_done()
And I set it up like this:
work = JoinableQueue(maxsize = 250)
results = JoinableQueue(maxsize = 250)
g = multiprocessing.Process(target = GetData, args = (FileName,work))
g.daemon = False
g.start()
w = multiprocessing.Process(target = WriteData, args = (f_out,results))
w.daemon = True
w.start()
for i in range(2):
t = multiprocessing.Process(target = ProcessData, args = (work,results))
t.daemon = True
t.start()
work.join()
results.join()
But I get an error:
TypeError: cannot serialize '_io.TextIOWrapper' object
After some googling I guess the problem is that open object can not be pickled. But how do I sidestep it?