4

I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.

I have a single thread script with collections.Counter and gzip.open, it takes hours to finish.

Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counters to get the word occurrence.

However, I cannot find an example for ProcessPoolExecutor to send a queue to Executor, they just map single item from a list. There are only single threaded examples for asyncio.Queue.

  • It is a huge file, so I cannot read the whole file and get the list before counting, thus I cannot use concurrent.futures.Executor.map. But all examples I read use a fixed list as start.

  • The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the map can merge Counters, so I cannot use chunksize>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and use chunksize=1000 to reduce fork times.

Would you write an example for me ?

I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.


My real case is for a more specific file format:

chr1    10011   141     0       157     4       41      50
chr1    10012   146     1       158     4       42      51
chr1    10013   150     0       163     4       43      53
chr1    10014   164     3       167     4       44      54

I need to count each histogram for single columns form column 3 to 8. So I take word frequencies as an easier example.

My code is:

#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
    import math

    if len(sys.argv) < 3 :
        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
        exit(0)
    try:
        verbose = int(sys.argv[3])
    except: # `except IndexError:` and `except ValueError:`
        verbose = 0

    inDepthFile = sys.argv[1]
    outFile = sys.argv[2]
    print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
    RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
    for k in SamplesList:
        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])   # E(X^2)-E(X)^2
    tsvout = open(outFile, 'wt')
    print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)
    #RecordCntLength = len(str(RecordCnt))
    print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
    for depth in range(0,MaxDepth+1):
        print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
    tsvout.close()
    pass

def inStat(inDepthFile,verbose):
    import gzip
    import csv
    from collections import Counter
    # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
    cDepthCnt = {key:Counter() for key in SamplesList}
    cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
    RecordCnt = 0
    MaxDepth = 0
    with gzip.open(inDepthFile, 'rt') as tsvin:
        tsvin = csv.DictReader(tsvin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
        try:
            for row in tsvin:
                RecordCnt += 1
                for k in SamplesList:
                    theValue = int(row[k])
                    if theValue > MaxDepth:
                        MaxDepth = theValue
                    cDepthCnt[k][theValue] += 1  # PyPy3:29.82 ns, Python3:30.61 ns
                    cDepthStat[k][0] += theValue
                    cDepthStat[k][1] += theValue * theValue
                #print(MaxDepth,DepthCnt)
        except KeyboardInterrupt:
            print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
            pass
        print('[!]Lines Read:[{}], MaxDepth is [{}].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
    main()  # time python3 ./samdepthplot.py t.tsv.gz 1

csv.DictReader takes most time.

cProfile


My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.

So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.

Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor with the Queue class ?

Galaxy
  • 1,862
  • 1
  • 17
  • 25
  • 1
    I know you lightly touched on how you're doing this but can you include the code you're using currenty? – Jab Mar 09 '19 at 01:09
  • I wonder if you might not get better performance by just using a shell pipeline? See [Command-line Tools can be 235x Faster than your Hadoop Cluster](https://adamdrake.com/command-line-tools-can-be-235x-faster-than-your-hadoop-cluster.html). This problem sounds like a great fit for `xargs` and `uniq -c`, with maybe some `awk` scripting to glue it all together. – Daniel Pryden Mar 09 '19 at 01:12
  • Have you looked into using `io.BufferedReader`? As explained in [Reading & Writing GZIP Files Faster in Python](https://gist.github.com/theJollySin/6eeda4a44db830a35365503178f88788) – Jab Mar 09 '19 at 01:14
  • You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this [answer](https://stackoverflow.com/a/54620098/355230) only with a `mmap` instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together. – martineau Mar 09 '19 at 02:09
  • I included my code now. – Galaxy Mar 09 '19 at 03:34
  • My problem is counting being slower than reading. So no need to speed up reading. – Galaxy Mar 09 '19 at 03:43
  • Ok, after reading your edits, I would also like to know what the data looks like, I did some tests on my own before seeing the edits [you can see here, but forewarning it's a little hairy](https://repl.it/repls/LazyBadSpof), and I can say that my attempts at multithreading using zlib were fastest although just using zlib was faster than the others without threading. **But** Now seeing your code I'd like to see a sample of the data as well. There could be some optimizations to what you're doing like maybe using the `json` library instead of `csv` and a couple other things. – Jab Mar 09 '19 at 06:08
  • the data file is a gziped text file with 3 billion lines of "chr1 10011 141 0 157 4 41 50", numbers are splitted by tab. – Galaxy Mar 09 '19 at 06:35

5 Answers5

1

A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.

LoMaPh
  • 1,476
  • 2
  • 20
  • 33
0

I've never tested this code, but should work.

The first thing is to check the number of lines

f =('myfile.txt')
def file_len(f):
    with open(f) as f:
        for i, l in enumerate(f):
            pass
    return i + 1
num_lines = file_len(f)

split the data in n partitions

n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]

And now start the jobs:

from multiprocessing import Process
import linecache
jobs = []

for part in range(len(parts)):
    p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
    jobs.append(p)
    p.start()

for p in jobs:
    p.join()

An example of the function

def function_here(your_file_name, line_number, split_size):

    for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))

Still, you will need to check the number of lines before doing any operation

kaihami
  • 815
  • 7
  • 18
  • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor? – Galaxy Mar 09 '19 at 04:58
0

The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter. Finally merge the counters.

from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os

NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10


def slice_huge_file():
    cnt = 0
    with open(INPUT_FILE) as f:
        while True:
            next_n_lines = list(islice(f, NUM_OF_LINES))
            cnt += 1
            if not next_n_lines:
                break
            with open('sub_huge_{}.txt'.format(cnt), 'w') as out:
                out.writelines(next_n_lines)


def count_file_words(input_file):
    with open(input_file, 'r') as f:
        return Counter([w.strip() for w in f.readlines()])


if __name__ == '__main__':
    slice_huge_file()
    pool = Pool(POOL_SIZE)
    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
    results = pool.map(count_file_words, sub_files)
    final_counter = Counter()
    for counter in results:
        final_counter += counter
    print(final_counter)
balderman
  • 22,927
  • 7
  • 34
  • 52
0

just some pseudocode:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback


WORKER_POOL_SIZE = 10  # you should set this as the number of your processes
QUEUE_SIZE = 100       # 10 times to your pool size is good enough


def main():
    with Manager() as manager:
        q = manager.Queue(QUEUE_SIZE)

        # init worker pool
        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

        # start producer
        run_producer(q)

        # wait to done
        for f in workers_pool:
            try:
                f.result()
            except Exception:
                traceback.print_exc()


def run_producer(q):
    try:
        with open("your file path") as fp:
            for line in fp:
                q.put(line)
    except Exception:
        traceback.print_exc()
    finally:
        q.put(None)



def worker(i, q):
    while 1:
        line = q.get()
        if line is None:
            print(f'worker {i} is done')
            q.put(None)
            return

        # do something with this line
        # ...
Laisky
  • 55
  • 5
0

I have learned the multiprocessing lib on weekend.

The stop on Ctrl+C and write current result function is still not working.

The main function is fine now.

#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16

def main():
    import math

    if len(sys.argv) < 3 :
        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
        exit(0)
    try:
        verbose = int(sys.argv[3])
    except: # `except IndexError:` and `except ValueError:`
        verbose = 0

    inDepthFile = sys.argv[1]
    outFile = sys.argv[2]
    print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
    RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
    for k in SamplesList:
        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2])   # E(X^2)-E(X)^2
    tsvout = open(outFile, 'wt')
    print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)
    #RecordCntLength = len(str(RecordCnt))
    print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
    for depth in range(0,MaxDepth+1):
        #print( '{}\t{}'.format(depth,'\t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
        #print( '{}\t{}'.format(depth,'\t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
        print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
        #pass
    #print('#MaxDepth={}'.format(MaxDepth),file=tsvout)
    tsvout.close()
    pass

def CallStat(inDepthFile):
    import gzip
    import itertools
    RecordCnt = 0
    MaxDepth = 0
    cDepthCnt = {key:Counter() for key in SamplesList}
    cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
    #lines_queue = Queue()
    manager = Manager()
    lines_queue = manager.Queue()
    stater_pool = Pool(Nworkers)
    TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
    #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
    #MapResult = stater_pool.map_async(iStator,TASKS,1)
    AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
    try:
        with gzip.open(inDepthFile, 'rt') as tsvfin:
            while True:
                lines = tsvfin.readlines(ChunkSize)
                lines_queue.put(lines)
                if not lines:
                    for i in range(Nworkers):
                        lines_queue.put(b'\n\n')
                    break
    except KeyboardInterrupt:
        print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
        for i in range(Nworkers):
            lines_queue.put(b'\n\n')
        pass
    #for results in ApplyResult:
        #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
    #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
    for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
        RecordCnt += iRecordCnt
        if iMaxDepth > MaxDepth:
            MaxDepth = iMaxDepth
        for k in SamplesList:
            cDepthCnt[k].update(icDepthCnt[k])
            cDepthStat[k][0] += icDepthStat[k][0]
            cDepthStat[k][1] += icDepthStat[k][1]
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

#def iStator(inQueue,inSamplesList):
def iStator(args):
    (inQueue,inSamplesList) = args
    import csv
    # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
    cDepthCnt = {key:Counter() for key in inSamplesList}
    cDepthStat = {key:[0,0] for key in inSamplesList} # x and x^2
    RecordCnt = 0
    MaxDepth = 0
    for lines in iter(inQueue.get, b'\n\n'):
        try:
            tsvin = csv.DictReader(lines, delimiter='\t', fieldnames=('ChrID','Pos')+inSamplesList )
            for row in tsvin:
                #print(', '.join(row[col] for col in inSamplesList))
                RecordCnt += 1
                for k in inSamplesList:
                    theValue = int(row[k])
                    if theValue > MaxDepth:
                        MaxDepth = theValue
                    #DepthCnt[k][theValue] += 1  # PyPy3:30.54 ns, Python3:22.23 ns
                    #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                    cDepthCnt[k][theValue] += 1  # PyPy3:29.82 ns, Python3:30.61 ns
                    cDepthStat[k][0] += theValue
                    cDepthStat[k][1] += theValue * theValue
                #print(MaxDepth,DepthCnt)
        except KeyboardInterrupt:
            print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
            pass
        #print('[!]{} Lines Read:[{}], MaxDepth is [{}].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
    main()  # time python3 ./samdepthplot.py t.tsv.gz 1
Galaxy
  • 1,862
  • 1
  • 17
  • 25