I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.
I have a single thread script with collections.Counter
and gzip.open
, it takes hours to finish.
Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counter
s to get the word occurrence.
However, I cannot find an example for ProcessPoolExecutor
to send a queue to Executor
, they just map
single item from a list.
There are only single threaded examples for asyncio.Queue
.
It is a huge file, so I cannot read the whole file and get the
list
before counting, thus I cannot useconcurrent.futures.Executor.map
. But all examples I read use a fixed list as start.The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the
map
can mergeCounter
s, so I cannot usechunksize
>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and usechunksize=1000
to reducefork
times.
Would you write an example for me ?
I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.
My real case is for a more specific file format:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
I need to count each histogram for single columns form column 3 to 8. So I take word frequencies as an easier example.
My code is:
#!/usr/bin/env pypy3
import sys
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[{}], To:[{}].\nVerbose: [{}].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#{}\t{}'.format('Depth','\t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N={},SD:\t{}'.format(RecordCnt,'\t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( '{}\t{}'.format(depth,'\t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass
def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('\n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[{}], MaxDepth is [{}].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
csv.DictReader
takes most time.
My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.
So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.
Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor
with the Queue
class ?