I need to process a huge file of around 30GB containing hundreds of millions of rows. More precisely, I want to perform the three following steps:
Reading the file by chunks: given the size of the file, I don't have the memory to read the file in one go;
Computing stuff on the chunks before aggregating each of them to a more manageable size;
Concatenating the aggregated chunks into a final dataset containing the results of my analyses.
So far, I have coded two threads :
- One thread in charge of reading the file by chunks and storing the chunks in a Queue (step 1);
- One thread in charge of performing the analyses (step 2) on the chunks;
Here is the spirit of my code so far with dummy data:
import queue
import threading
import concurrent.futures
import os
import random
import pandas as pd
import time
def process_chunk(df):
return df.groupby(["Category"])["Value"].sum().reset_index(drop=False)
def producer(queue, event):
print("Producer: Reading the file by chunks")
reader = pd.read_table(full_path, sep=";", chunksize=10000, names=["Row","Category","Value"])
for index, chunk in enumerate(reader):
print(f"Producer: Adding chunk #{index} to the queue")
queue.put((index, chunk))
time.sleep(0.2)
print("Producer: Finished putting chunks")
event.set()
print("Producer: Event set")
def consumer(queue, event, result_list):
# The consumer stops iff queue is empty AND event is set
# <=> The consumer keeps going iff queue is not empty OR event is not set
while not queue.empty() or not event.is_set():
try:
index, chunk = queue.get(timeout=1)
except queue.Empty:
continue
print(f"Consumer: Retrieved chunk #{index}")
print(f"Consumer: Queue size {queue.qsize()}")
result_list.append(process_chunk(chunk))
time.sleep(0.1)
print("Consumer: Finished retrieving chunks")
if __name__=="__main__":
# Record the execution time
start = time.perf_counter()
# Generate a fake file in the current directory if necessary
path = os.path.dirname(os.path.realpath(__file__))
filename = "fake_file.txt"
full_path = os.path.join(path, filename)
if not os.path.exists(full_path):
print("Main: Generate a dummy dataset")
with open(full_path, "w", encoding="utf-8") as f:
for i in range(100000):
value = random.randint(1,101)
category = i%2
f.write(f"{i+1};{value};{category}\n")
# Defining a queue that will store the chunks of the file read by the Producer
queue = queue.Queue(maxsize=5)
# Defining an event that will be set by the Producer when he is done
event = threading.Event()
# Defining a list storing the chunks processed by the Consumer
result_list = list()
# Launch the threads Producer and Consumer
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, queue, event)
executor.submit(consumer, queue, event, result_list)
# Display that the program is finished
print("Main: Consumer & Producer have finished!")
print(f"Main: Number of processed chunks = {len(result_list)}")
print(f"Main: Execution time = {time.perf_counter()-start} seconds")
I know that each iteration of step 1 takes more time than each iteration of step 2 i.e. that the Consumer will always be waiting for the Producer.
How can I speed up the process of reading my file by chunks (step 1) ?