I am new to multiprocessing in python and am trying to update a shared dictionary in parallel using the apply_async method. But, when I call the class method that does this (fill_contig_matrix) with the apply_async method, it does not seem to do anything. The print statement in the beginning of the method doesn't even print out (not sure if this is normal behavior). How do I use multiprocessing to update the dictionary? I've removed some helper methods to shorten the code:
import numpy as np
import multiprocessing as mp
from multiprocessing import Manager
import os
class FeatureMatrix:
index_dict = {'A': (0, 10), 'T': (1, 11), 'G': (2, 12), 'C': (3, 13),
'a': (4, 14), 't': (5, 15), 'g': (6, 16),
'c': (7, 17), '*': (8, 18), '#': (9, 19)}
def __init__(self, pileup_file='calls_to_draft_pileup_test_2.txt',
sam_file='calls_to_draft_sorted_test.sam'):
self.pileup_file = pileup_file
self.sam_file = sam_file
self.score_count = 0
self.average_score_dict = {'A': [0, 0], 'T': [0, 0], 'G': [0, 0],
'C': [0, 0], 'a': [0, 0], 't': [0, 0],
'g': [0, 0], 'c': [0, 0], '*': [0, 0],
'#': [0, 0]}
self.contig_dict = self.contig_matrix()
def contig_matrix(self):
contig_dict = {}
m = Manager()
shared_contig_dict = m.dict()
with open(self.sam_file, 'r', encoding='utf-8') as file:
for line in file:
if '@SQ' in line:
line = line.strip()
line_list = line.split('\t')
contig = line_list[1].replace('SN:', '')
length = int(line_list[2].replace('LN:', ''))
contig_dict[contig] = self.initialize_matrices(length)
if '@PG' in line:
break
shared_contig_dict.update(contig_dict)
return shared_contig_dict
def file_task_generator(self):
"""
"""
with open(self.pileup_file, 'r', encoding='utf-8') as file:
line_list = []
for line in file:
line = line.strip()
line = line.split('\t')
if len(line_list) == 0 or line[0] == line_list[-1][0]:
line_list.append(line)
else:
task = line_list
line_list = []
line_list.append(line)
yield task
if line_list:
task = line_list
yield task
def fill_contig_matrix(self, contig_list):
"""
"""
print(f"Process {os.getpid()} is processing a task.")
for line in contig_list:
# Update dictionary...
file_name = f"array_{contig}.npz"
np.savez(file_name, self.contig_dict[contig][0], self.contig_dict[contig][1]))
def main():
matrix = FeatureMatrix()
num_cpus = mp.cpu_count()
pool = mp.Pool(processes=num_cpus)
for contig_list in matrix.file_task_generator():
pool.apply_async(matrix.fill_contig_matrix, args=(contig_list))
pool.close()
pool.join()
if __name__ == '__main__':
main()