0

I have my program which list and read all the files in a directory and counts total number of records present in the files concurrently.

When i'm runnning the below code i get some list of worker thread names with counts coming in chunk as the counting of records from multiple files are also going parallel.

import multiprocessing as mp
import time
import os
path = '/home/vaibhav/Desktop/Input_python'

def process_line(f):
    print(mp.current_process())
    #print("process id = " , os.getpid(f))
    print(sum(1 for line in f))

for filename in os.listdir(path):
    print(filename)

    if __name__ == "__main__":

        with open('/home/vaibhav/Desktop/Input_python/'+ filename, "r+") as source_file:
            # chunk the work into batches

            p = mp.Pool()
            results = p.map(process_line, source_file)

start_time = time.time()
print("My program took", time.time() - start_time, "to run")

Current Output

<ForkProcess(ForkPoolWorker-54, started daemon)>
73
<ForkProcess(ForkPoolWorker-55, started daemon)>
<ForkProcess(ForkPoolWorker-56, started daemon)>
<ForkProcess(ForkPoolWorker-53, started daemon)>
73
1
<ForkProcess(ForkPoolWorker-53, started daemon)>
79
<ForkProcess(ForkPoolWorker-54, started daemon)>
<ForkProcess(ForkPoolWorker-56, started daemon)>
<ForkProcess(ForkPoolWorker-55, started daemon)>
79
77
77

Is there a way around so that i can get the total records count of files like

File1.Txt Total_Recordcount
...
Filen.txt  Total_Recordcount

UPDATE I got the solution and pasted the answer in the comments section.

user7422128
  • 902
  • 4
  • 17
  • 41

2 Answers2

0

Counting lines in a text file should not be CPU-bound, therefore it is not a good candidate for threading. You might want to use a thread pool for processing multiple independent files, but for a single file, here's a way to count lines which should be very fast:

import pandas as pd
data = pd.read_table(source_file, dtype='S1', header=None, usecols=[0])
count = len(data)

What this does is to parse the first character (S1) into a DataFrame, and then check the length. The parser is implemented in C, so there is no slow Python loop required. This should provide close to the best possible speed, limited only by your disk subsystem.

This sidesteps the original problem completely, because now you get a single count per file.

John Zwinck
  • 239,568
  • 38
  • 324
  • 436
  • does that mean I cannot use multiprocessing module to read multiple files concurrently in this case? – user7422128 Aug 19 '17 at 05:21
  • Your original code would only process one file at a time, correct? If you want to process multiple files at a time, you can do so by something like `pool.map(countlines, source_files)`, i.e. give one file at a time to each thread. Whether this makes the program faster or not is hard to predict--it depends on your particular computer. – John Zwinck Aug 19 '17 at 05:29
  • My program is already processing multiple files at a time and I'm using pool.map(). You can check in the output as well multiple fork processes are there. – user7422128 Aug 19 '17 at 05:40
  • I got the solution and pasted it in the comment section. Anyways thanks for your help. – user7422128 Aug 19 '17 at 19:19
0

Earlier i was reading the files and spawning multiple processes at a time for a single file which was causing records count of chunks of file.

But now i changed my approach, Currently i'm passing a list of files as a iterable to the pool.map() function which releases multiple processes for all the different files in the list and giving me better results in terms of run time. Here is link from where i took refrence and below is the pasted and corrected code.

import multiprocessing  as mp
from multiprocessing import Pool
import os
import time
folder = '/home/vaibhav/Desktop/Input_python'

fnames = (name for name in os.listdir(folder))
def file_wc(fname):
    with open('/home/vaibhav/Desktop/Input_python/'+ fname) as f:
        count = sum(1 for line in f)
    return (fname,count)   
pool = Pool()    
print(dict(pool.map(file_wc, list(fnames))))
pool.close()
pool.join()
start_time = time.time()
print("My program took", time.time() - start_time, "to run")
user7422128
  • 902
  • 4
  • 17
  • 41