0

I need to process a huge file of around 30GB containing hundreds of millions of rows. More precisely, I want to perform the three following steps:

  1. Reading the file by chunks: given the size of the file, I don't have the memory to read the file in one go;

  2. Computing stuff on the chunks before aggregating each of them to a more manageable size;

  3. Concatenating the aggregated chunks into a final dataset containing the results of my analyses.

So far, I have coded two threads :

  • One thread in charge of reading the file by chunks and storing the chunks in a Queue (step 1);
  • One thread in charge of performing the analyses (step 2) on the chunks;

Here is the spirit of my code so far with dummy data:

import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time

def process_chunk(df):
    return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)

def producer(queue, event):
    print("Producer: Reading the file by chunks")
    reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
    for index, chunk in enumerate(reader):
        print(f"Producer: Adding chunk #{index} to the queue")
        queue.put((index, chunk))
        time.sleep(0.2)
    print("Producer: Finished putting chunks")
    event.set()
    print("Producer: Event set")

def consumer(queue, event, result_list):
    # The consumer stops iff queue is empty AND event is set
    # <=> The consumer keeps going iff queue is not empty OR event is not set
    while not queue.empty() or not event.is_set():
        try:
            index, chunk = queue.get(timeout=1)
        except queue.Empty:
            continue
        print(f"Consumer: Retrieved chunk #{index}")
        print(f"Consumer: Queue size {queue.qsize()}")
        result_list.append(process_chunk(chunk))
        time.sleep(0.1)
    print("Consumer: Finished retrieving chunks")

if __name__=="__main__":
    # Record the execution time
    start = time.perf_counter()

    # Generate a fake file in the current directory if necessary
    path = os.path.dirname(os.path.realpath(__file__))
    filename = "fake_file.txt"
    full_path = os.path.join(path, filename)
    if not os.path.exists(full_path):
        print("Main: Generate a dummy dataset")
        with open(full_path, "w", encoding="utf-8") as f:
            for i in range(100000):
                value = random.randint(1,101)
                category = i%2
                f.write(f"{i+1};{value};{category}\n")

    # Defining a queue that will store the chunks of the file read by the Producer
    queue = queue.Queue(maxsize=5)

    # Defining an event that will be set by the Producer when he is done
    event = threading.Event()

    # Defining a list storing the chunks processed by the Consumer
    result_list = list()

    # Launch the threads Producer and Consumer
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, queue, event)
        executor.submit(consumer, queue, event, result_list)

    # Display that the program is finished
    print("Main: Consumer & Producer have finished!")
    print(f"Main: Number of processed chunks = {len(result_list)}")
    print(f"Main: Execution time = {time.perf_counter()-start} seconds")

I know that each iteration of step 1 takes more time than each iteration of step 2 i.e. that the Consumer will always be waiting for the Producer.

How can I speed up the process of reading my file by chunks (step 1) ?

SteeveL
  • 11
  • 2
  • 1
    have you looked into Pyspark or Dask? – Umar.H Feb 20 '20 at 16:24
  • Does this answer your question? [Fastest way to parse large CSV files in Pandas](https://stackoverflow.com/questions/25508510/fastest-way-to-parse-large-csv-files-in-pandas) – filbranden Feb 20 '20 at 16:34
  • Are you sure using two separate threads really makes it faster? Just try calling `process_chunk()` on the same loop that reads them and check whether that's actually slower than what you're doing... Also consider increasing the chunk size to maybe 100k or 1M, that might help... – filbranden Feb 20 '20 at 16:36

0 Answers0