0

I have folder containing about 5 million files and i have to read the content of each file so that i can form dataframe.It take very long time to do that. Is there any way i can optimize the below code to speed up the process below.

new_list = []
file_name=[]
for root, dirs, files in os.walk('Folder_5M'):
    for file in files:
        count+=1
        file_name.append(file)
        with open(os.path.join(root, file), 'rb') as f:
            text = f.read()
            new_list.append(text)
sudojarvis
  • 37
  • 5
  • that is an odd task. First question: can you just do it once and then save the resultant `new_list` in a pickle or other format? If so, who cares how long it takes, just run it overnight 1 time and be done with it. – AirSquid Jun 25 '22 at 16:53
  • I'd suggest you to use some OS tool to form a single file and then read this file with python, it'll be significantly faster. – Olvin Roght Jun 25 '22 at 16:56
  • 1
    Do you really need the contents of all 5 million files in memory concurrently? – DarkKnight Jun 25 '22 at 17:05
  • yes, i need it to do text preprocessing and perform lsa on it. – sudojarvis Jun 25 '22 at 17:19
  • @sudojarvis Why are you opening the files in binary mode if you're doing text processing? Why are you saving the filenames? – DarkKnight Jun 25 '22 at 17:23
  • First you need to profile your code to see what takes the most time! If the `append` takes most of the time then counting the amount of files, creating a list of `None` values with the length of the number of files. If the reading takes the longest (which is probably the reason) then `multi threading` is your answer as it is an io bound process. – Ilya Jun 25 '22 at 17:45

2 Answers2

0

Here's an idea for how you could use multiprocessing for this.

Constructing a list of files resulting from os.walk is likely to be very fast. It's the processing of those files that's going to take time. With multiprocessing you can do a lot of that work in parallel.

Each process opens the given file, processes it and creates a dataframe. When all of the parallel processing has been carried out you then concatenate the returned dataframes. This last part will be CPU intensive and there's no way (that I can think of) that would allow you to share that load.

from pandas import DataFrame, concat
from os import walk
from os.path import join, expanduser
from multiprocessing import Pool

HOME = expanduser('~')

def process(filename):
    try:
        with open(filename) as data:
            df = DataFrame()
            # analyse your data and populate the dataframe here
            return df
    except Exception:
        return DataFrame()

def main():
    with Pool() as pool:
        filenames = []
        for root, _, files in walk(join(HOME, 'Desktop')):
            for file in files:
                filenames.append(join(root, file))
        ar = pool.map_async(process, filenames)
        master = concat(ar.get())
        print(master)

if __name__ == '__main__':
    main()
DarkKnight
  • 19,739
  • 3
  • 6
  • 22
0

This is an IO bound task so multi-threading is the tool for the job. In python there are two ways to implement multi-threads. One using the thread pool and the second is using the asyncio that works with event loop. The event loop usually has better performance the challenge is to limit the number of threads executing at the same time. Fortunately, Andrei wrote a very good solution for this.

This code creates an event loop that reads the files in several threads. The parameter MAX_NUMBER_OF_THREADS defines the amount of thread can execute at the same time. Try to play with this number for better performance as it is affected by the machine that runs it.

import os
import asyncio


async def read_file(file_path: str) -> str:
    with open(file_path, "r") as f:
        return f.read()


async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))


MAX_NUMBER_OF_THREADS = 100
file_name = []
file_path = []
for path, subdirs, files in os.walk("Folder_5M"):
    for name in files:
        file_path.append(os.path.join(path, name))
        file_name.append(name)
count = len(file_name)

tasks = [read_file(file) for file in file_path]
asyncio.run(gather_with_concurrency(MAX_NUMBER_OF_THREADS, *tasks))
Ilya
  • 730
  • 4
  • 16