7

Samples records in the data file (SAM file):

M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
......
  • The data files are line-by-line based
  • The size of the data files are varies from 1G - 5G.

I need to go through the record in the data file line by line, get a particular value (e.g. 4th value, 66439384) from each line, and pass this value to another function for processing. Then some results counter will be updated.

the basic workflow is like this:

# global variable, counters will be updated in search function according to the value passed. 
counter_a = 0    
counter_b = 0
counter_c = 0

open textfile:
    for line in textfile:
        value = line.split()[3]
        search_function(value)    # this function takes abit long time to process

def search_function (value):
    some conditions checking:
        update the counter_a or counter_b or counter_c

With single process code and about 1.5G data file, it took about 20 hours to run through all the records in one data file. I need much faster code because there are more than 30 of this kind data file.

I was thinking to process the data file in N chunks in parallel, and each chunk will perform above workflow and update the global variable (counter_a, counter_b, counter_c) simultaneously. But I don't know how to achieve this in code, or wether this will work.

I have access to a server machine with: 24 processors and around 40G RAM.

Anyone could help with this? Thanks very much.

Xiangwu
  • 715
  • 3
  • 13
  • 25

4 Answers4

4

The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)

If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c

That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.

Anthony Towns
  • 2,864
  • 1
  • 19
  • 23
  • hi, thanks for you helps. same again, i didn't fully understand this code, but will look into the details later. thanks. – Xiangwu Feb 24 '15 at 00:51
  • hi there, i made the code working with restructured checking function. just wondering about `pool = multiprocessing.Pool(50)`, how does 50 is defined? thanks. – Xiangwu Feb 26 '15 at 17:50
  • 1
    50 is the number of jobs to try to do at once. If you don't specify it, it should default to the number of cores your machine has, which is probably what you want. – Anthony Towns Mar 04 '15 at 10:53
1

Read one file at a time, use all CPUs to run search_function():

#!/usr/bin/env python
from multiprocessing import Array, Pool

def init(counters_): # called for each child process
    global counters
    counters = counters_

def search_function (value): # assume it is CPU-intensive task
    some conditions checking:
        update the counter_a or counter_b or counter_c
        counter[0] += 1 # counter 'a'
        counter[1] += 1 # counter 'b'
    return value, result, error

if __name__ == '__main__':
    counters = Array('i', [0]*3)
    pool = Pool(initializer=init, initargs=[counters])
    values = (line.split()[3] for line in textfile)
    for value, result, error in pool.imap_unordered(search_function, values,
                                                    chunksize=1000):
        if error is not None:
            print('value: {value}, error: {error}'.format(**vars()))
    pool.close()
    pool.join()
    print(list(counters))

Make sure (for example, by writing wrappers) that exceptions do not escape next(values), search_function().

jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • hi sorry, i cannot fix this problem `counters = counter_ NameError: global name 'counter_' is not defined`, and don't understand what it means. – Xiangwu Feb 23 '15 at 15:19
  • Hi, I modified your code with my own search function, now it is running. Hope that finished soon. Thanks. – Xiangwu Feb 24 '15 at 00:47
  • would `chunksize=1000` means chop the array of values every 1000 as a chunk to process in parallel? – Xiangwu Feb 26 '15 at 17:05
  • @Xiangwu: `search_function()` *always* receives a single value whatever `chunksize` is. `chunksize` controls how different `values` are distributed between child processes: it is more efficient to have `chunksize > 1` as a rule. – jfs Feb 26 '15 at 17:11
1

This solution works on a set of files.

For each file, it divides it into a specified number of line-aligned chunks, solves each chunk in parallel, then combines the results.

It streams each chunk from disk; this is somewhat slower, but does not consume nearly so much memory. We depend on disk cache and buffered reads to prevent head thrashing.

Usage is like

python script.py -n 16 sam1.txt sam2.txt sam3.txt

and script.py is

import argparse
from io import SEEK_END 
import multiprocessing as mp

#
# Worker process
#
def summarize(fname, start, stop):
    """
    Process file[start:stop]

    start and stop both point to first char of a line (or EOF)
    """
    a = 0
    b = 0
    c = 0

    with open(fname, newline='') as inf:
        # jump to start position
        pos = start
        inf.seek(pos)

        for line in inf:
            value = int(line.split(4)[3])

            # *** START EDIT HERE ***
            #

            # update a, b, c based on value

            #
            # *** END EDIT HERE ***

            pos += len(line)
            if pos >= stop:
                break

    return a, b, c

def main(num_workers, sam_files):
    print("{} workers".format(num_workers))
    pool = mp.Pool(processes=num_workers)

    # for each input file
    for fname in sam_files:
        print("Dividing {}".format(fname))
        # decide how to divide up the file
        with open(fname) as inf:
            # get file length
            inf.seek(0, SEEK_END)
            f_len = inf.tell()
            # find break-points
            starts = [0]
            for n in range(1, num_workers):
                # jump to approximate break-point
                inf.seek(n * f_len // num_workers)
                # find start of next full line
                inf.readline()
                # store offset
                starts.append(inf.tell())

        # do it!
        stops = starts[1:] + [f_len]
        start_stops =  zip(starts, stops)
        print("Solving {}".format(fname))
        results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]

        # collect results
        results = [sum(col) for col in zip(*results)]
        print(results)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Parallel text processor')
    parser.add_argument('--num_workers', '-n', default=8, type=int)
    parser.add_argument('sam_files', nargs='+')
    args = parser.parse_args()
    main(args.num_workers, args.sam_files)
    main(args.num_workers, args.sam_files)
Hugh Bothwell
  • 55,315
  • 8
  • 84
  • 99
  • thanks for you helps. Unfortunately, I didn't understand this code so I didn't now how to modify it with my own function. – Xiangwu Feb 24 '15 at 00:49
  • 1
    If you call it from the command-line, the `if __name__ == "__main__":` section runs; this parses the arguments and calls `main`. `main` decides how to divide the text file up and runs a bunch of copies of `summarize` in parallel. All you need to change is the per-value action in `summarize` (the bit that starts with `if value < 65000`). Update `a`, `b`, and `c` appropriately and return them. – Hugh Bothwell Feb 24 '15 at 01:36
  • I used this code with python 2.7, but I could not get parallel processing. I should change something in python 2.7? I already changed " with open(fname, newline='') as inf:" to " with open(fname, "rb") as inf:". Thank you. – Mohamad Kouhi Moghadam Jun 20 '16 at 08:19
0

What you don't want to do is hand files to invidual CPUs. If that's the case, the file open/reads will likely cause the heads to bounce randomly all over the disk, because the files are likely to be all over the disk.

Instead, break each file into chunks and process the chunks.

Open the file with one CPU. Read in the whole thing into an array Text. You want to do this is one massive read to prevent the heads from thrashing around the disk, under the assumption that your file(s) are placed on the disk in relatively large sequential chunks.

Divide its size in bytes by N, giving a (global) value K, the approximate number of bytes each CPU should process. Fork N threads, and hand each thread i its index i, and a copied handle for each file.

Each thread i starts a thread-local scan pointer p into Text as offset i*K. It scans the text, incrementing p and ignores the text until a newline is found. At this point, it starts processing lines (increment p as it scans the lines). Tt stops after processing a line, when its index into the Text file is greater than (i+1)*K.

If the amount of work per line is about equal, your N cores will all finish about the same time.

(If you have more than one file, you can then start the next one).

If you know that the file sizes are smaller than memory, you might arrange the file reads to be pipelined, e.g., while the current file is being processed, a file-read thread is reading the next file.

Ira Baxter
  • 93,541
  • 22
  • 172
  • 341