1

I am trying to use this question for my file processing: Python multiprocessing safely writing to a file

This is my modification of the code:

def listener(q):
    '''listens for messages on the q, writes to file. '''
    while 1:
        reads = q.get()
        if reads == 'kill':
            #f.write('killed')
            break
        for read in reads:
            out_bam.write(read)
        out_bam.flush()
    out_bam.close()

def fetch_reads(line, q):
    parts = line[:-1].split('\t')
    print(parts)
    start,end = int(parts[1])-1,int(parts[2])-1
    in_bam = pysam.AlignmentFile(args.bam, mode='rb')
    fetched = in_bam.fetch(parts[0], start, end)
    reads = [read for read in fetched if (read.cigarstring and read.pos >= start and read.pos < end and 'S' not in read.cigarstring)]
    in_bam.close()
    q.put(reads)
    return reads

#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
if not args.threads:
    threads = 1
else:
    threads = int(args.threads)
pool = mp.Pool(threads+1)

#put listener to work first
watcher = pool.apply_async(listener, (q,))

with open(args.bed,'r') as bed:
    jobs = []
    cnt = 0
    for line in bed:
        # Fire off the read fetchings
        job = pool.apply_async(fetch_reads, (line, q))
        jobs.append(job)
        cnt += 1
        if cnt > 10000:
            break

# collect results from the workers through the pool result queue
for job in jobs: 
    job.get()
    print('get')

#now we are done, kill the listener
q.put('kill')
pool.close()

The differences in that I am opening and closing the file in the function since otherwise I get unusual errors from bgzip.

At first, print(parts) and print('get') are interchangeably printed (more or less), then there are less and less prints of 'get'. Ultimately the code hangs, and nothing is printed (all the parts are printed, but 'get' simply doesn't print anymore). The output file remains zero bytes.

Can anyone lend a hand? Cheers!

Cindy Almighty
  • 903
  • 8
  • 20

0 Answers0