0

I am new to multiprocessing in python and am trying to update a shared dictionary in parallel using the apply_async method. But, when I call the class method that does this (fill_contig_matrix) with the apply_async method, it does not seem to do anything. The print statement in the beginning of the method doesn't even print out (not sure if this is normal behavior). How do I use multiprocessing to update the dictionary? I've removed some helper methods to shorten the code:

import numpy as np
import multiprocessing as mp
from multiprocessing import Manager
import os


class FeatureMatrix:
    index_dict = {'A': (0, 10), 'T': (1, 11), 'G': (2, 12), 'C': (3, 13),
                  'a': (4, 14), 't': (5, 15), 'g': (6, 16),
                  'c': (7, 17), '*': (8, 18), '#': (9, 19)}

    def __init__(self, pileup_file='calls_to_draft_pileup_test_2.txt',
                 sam_file='calls_to_draft_sorted_test.sam'):
        self.pileup_file = pileup_file
        self.sam_file = sam_file
        self.score_count = 0
        self.average_score_dict = {'A': [0, 0], 'T': [0, 0], 'G': [0, 0],
                                   'C': [0, 0], 'a': [0, 0], 't': [0, 0],
                                   'g': [0, 0], 'c': [0, 0], '*': [0, 0],
                                   '#': [0, 0]}
        self.contig_dict = self.contig_matrix()

    def contig_matrix(self):
        contig_dict = {}
        m = Manager()
        shared_contig_dict = m.dict()
        with open(self.sam_file, 'r', encoding='utf-8') as file:
            for line in file:
                if '@SQ' in line:
                    line = line.strip()
                    line_list = line.split('\t')
                    contig = line_list[1].replace('SN:', '')
                    length = int(line_list[2].replace('LN:', ''))
                    contig_dict[contig] = self.initialize_matrices(length)
                if '@PG' in line:
                    break
        shared_contig_dict.update(contig_dict)
        return shared_contig_dict
    
    def file_task_generator(self):
        """
        """
        with open(self.pileup_file, 'r', encoding='utf-8') as file:
            line_list = []
            for line in file:
                line = line.strip()
                line = line.split('\t')
                if len(line_list) == 0 or line[0] == line_list[-1][0]:
                    line_list.append(line)
                else:
                    task = line_list
                    line_list = []
                    line_list.append(line)
                    yield task
            if line_list:
                task = line_list
                yield task

    def fill_contig_matrix(self, contig_list):
        """
        """
        print(f"Process {os.getpid()} is processing a task.")
        for line in contig_list:
            # Update dictionary...

        file_name = f"array_{contig}.npz"
        np.savez(file_name, self.contig_dict[contig][0], self.contig_dict[contig][1]))


def main():
    matrix = FeatureMatrix()
    num_cpus = mp.cpu_count()
    pool = mp.Pool(processes=num_cpus)

    for contig_list in matrix.file_task_generator():
        pool.apply_async(matrix.fill_contig_matrix, args=(contig_list))

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()
  • Which OS do you use? – Michael Butscher Jul 22 '23 at 14:06
  • 1
    You want `args = (contig_list,)`. This is a common problem in Python. You think you're making a list of size 1, and it's really just parentheses. – Frank Yellin Jul 22 '23 at 15:57
  • Please trim your code to make it easier to find your problem. Follow these guidelines to create a [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example). – Community Jul 22 '23 at 16:57

0 Answers0