0

I have a CSV file parser script in Python to do some stuff with a big CSV file. There is around 1 mil. rows, so the process takes some time.

import csv
import sys

with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile, delimiter=';', quotechar='|')
    for row in reader:
        ParserFunction(row)

def ParserFunction(row):
    #Some logic with row

Is there a way to multi-thread this loop function, to lower the execution time?

Thanks

vdobes
  • 85
  • 4
  • Depending on what exactly you are doing with the file content, you could divide your set of rows in smaller subsets and start seperate Threads to parse them and joining their result together. E.g. t1 = Thread(target=ParserFunction, args=(reader[0,1000], )), ... – AracKnight Oct 06 '22 at 13:45

2 Answers2

1

You can divide each row to be processed with a single thread instead of the main thread waiting for the previous row to finish processing to proceed with the next row:

import csv
import sys
import threading
def ParserFunction(row):
    #Some logic with row
    pass

with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
    reader = csv.reader(csvfile, delimiter=';', quotechar='|')
    for row in reader:
        threading.start_new_thread(ParserFunction, row)
    

But the exact way of doing so requires knowing what is the logic exactly you want to do with each row and whether it depends on other rows or not

Bemwa Malak
  • 1,182
  • 1
  • 5
  • 18
  • 1
    This would most likely work, but keep in mind, that this could result in a large number of launched threads, leading to problem on its own due to RAM limitations etc. Whether this is a good or bad way to do it, mainly depends on the size of the input file. – AracKnight Oct 06 '22 at 13:52
  • I totally agree with you it depends on the situation. – Bemwa Malak Oct 06 '22 at 13:56
0

Thanks, @Bemwa Malak for opening my mind. Yeah, the file is quite a big, around 400MB with over 1 mill. rows. I'm using Python3, so I had to edit your idea a little.

Is this the right idea of limiting the number of threads?

# Threading
def runThreads():
    global data
    global loopParameter
    
    with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile, delimiter=';', quotechar='|')
        threads = []
        for row in reader:
            t = threading.Thread(target=ParserFunction, args=(row,))
            t.start()
            threads.append(t)
            if len(threads) >= 500:
                for thread in threads:
                    thread.join()
                threads = []
        for thread in threads:
            thread.join()

The complete code of my logic is something like this, and from testing on smaller file I got execution time around 60s without threading and around 7s with threading, so It is much faster now.

And I tested it on the original file, and I got an execution time of 45 minutes instead of 4 hours, so somehow It's working.

from sqlalchemy import create_engine
import csv
import sys
import threading
import pandas as pd
import time

# Create a variable to hold the data
engine = create_engine('mssql+pyodbc://SECRET')
data = []
counter = 0
loopParameter = 0

# Starter
def main():
    global data
    global totalRows
    runThreads()
    totalRows = len(data)

    # Save data variable to a new CSV file
    with open('output.csv', 'w', newline='', encoding='utf-8') as f2:
        writer = csv.writer(f2, delimiter=';')
        writer.writerows(data)
        
    # Log
    with open('log2', 'a') as f3:
        sys.stdout = f3 # Change the standard output to the file we created.
        print("Version: 1")
        print("Matched: ", len(data))

# My Parser
def ParserFunction(row):
    global data
    global counter
    
    query = ("SELECT (SELECT Count(*) FROM myTab WHERE myColumn='"+row[5]+"') + "
            "(SELECT Count(*) FROM myTab2 WHERE myColumn='"+row[5]+"') + "
            "(SELECT Count(*) FROM myTab3 WHERE myColumn='"+row[5]+"') + "
            "(SELECT COUNT(*) FROM myTab4 WHERE myColumn='"+row[1]+"')")
        
    with engine.connect() as con:
        rs = con.execute(query)
        # We have match
        if(rs.fetchone()[0] > 0):               
            # Add the row to the data variable
            data.append(row)
    pass

# Threading
def runThreads():
    global data
    global loopParameter
    
    with open('csvfeed.csv', newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile, delimiter=';', quotechar='|')
        threads = []
        for row in reader:
            t = threading.Thread(target=ParserFunction, args=(row,))
            t.start()
            threads.append(t)
            if len(threads) >= 500:
                for thread in threads:
                    thread.join()
                threads = []
        for thread in threads:
            thread.join()

main()
vdobes
  • 85
  • 4
  • You can check this answer here on StackOverFlow it explains one of the right ways to do it in Python which is the producer/consumer pattern : https://stackoverflow.com/a/19369877/14388717 – Bemwa Malak Oct 07 '22 at 08:30
  • You can check the Queue class from Python documentation here: https://docs.python.org/2/library/queue.html – Bemwa Malak Oct 07 '22 at 08:31