1

I have been trying to figure out Python threading more and more and I got stuck when it comes to queue.

My idea is to have a CSV file that reads (lets say a line of 1000 row of csv lines). what I want to do is to read the information for each line in the CSV but I want it to do it thread-way. by that I would like to have a amount of x threading running simultaneously which means if I want 5 threading to run at the same time. It should only be 5 threads that should run.

Once one of the 5 threads is finished it should imminently run a new line from the csv (and stop if there is nothing more to read).

What I have done so far is:

import sys
import csv
import threading
import queue


totalThreadAtTime = 5

def threadingTest(row):
    print(row.get('Sales Start Date'))


def main():
    with open('test.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:

            threading.Thread(
                target=threadingTest,
                args=(row,)
            ).start()


if __name__ == '__main__':
    main()

and right now it just starts each line in the csv to have each thread and I want to "limit" it to have only 5 threads running at the same time. Once one is finished then start a new one.

How can I do that?

And please, if there is anything I have missed. Please let me know! :)

EDIT:

CSV:

Home Furnishing Business No.,Product Range Area No.,Product Area No.,No.,Description,Unit Price Including VAT,045 Sellable Stock,022 Sellable Stock,Sales Method,Range Code,Sales Start Date,End Date Sales,Range Status,Replenishment Code
07,071,0711,10290396,ME rnfrcd vent top rl 60 galvanised AP CN,8.00,"1,000.",949.,F,K,6/1/2015,,Released,10
07,073,0731,379172,FO N drwr low 80x60 white AP,38.00,"1,000.",963.,F,K,2/1/2019,,Released,10
07,073,0731,80379173,FO N drwr med 40x60 white AP,30.00,"1,000.",964.,F,K,2/1/2019,,Released,10
07,073,0731,40379170,FO N drwr low 40x60 white AP,26.00,"1,000.",966.,F,K,2/1/2019,,Released,10
07,073,0731,20379171,FO N drwr low 60x60 white AP,32.00,"1,000.",967.,F,K,2/1/2019,,Released,10
07,073,0731,60379174,FO N drwr med 60x60 white AP,36.00,"1,000.",967.,F,K,2/1/2019,,Released,10
10,101,1015,70420173,SUNNEBY cord set 1.8 m dark yellow textile,9.90,"1,665.",983.,M,K,8/1/2019,,Released,10
02,021,0211,10444351,GLASSVIK gls dr 60x64 drk rbr/clear glass AP,25.00,663.,996.,S,K,4/1/2020,,Released,10
02,021,0211,50444387,SELSVIKEN door/drawer front 60x38 hi-gl drk rbr AP,10.00,666.,999.,S,K,4/1/2020,,Released,10
09,093,0935,90311229,KURA NN  bed tent pink AP,30.00,666.,999.,S,K,8/1/2015,,Released,10
12,121,1211,80459221,GUNRID air purify crtn 1 pair 145x250 lgrey AP,49.90,666.,999.,M,K,4/1/2020,,Released,10
16,163,1633,451832,VANLIGEN vase 18 grey AP,14.90,666.,999.,M,K,4/1/2020,,Released,10
18,181,1813,70261230,BRADA laptop support 42x31 pink AP CN,9.90,666.,999.,M,K,10/1/2013,,Released,10
07,075,0752,10247181,HALLVIKEN in sin 1 bwl 56x50 blk quartz comp AP CN,350.00,"1,000.",999.,F,K,2/1/2014,,Released,10
10,102,1023,10390701,FOTO NN pend lmp 38 aluminium,29.90,"1,666.",999.,M,K,4/1/2018,,Released,10
10,104,1042,50426166,LILLHULT USB type C t USB crd 1.5 m AP,7.90,"1,666.",999.,M,K,10/1/2018,,Released,10
06,061,0611,20392276,GO high cabinet 40x32x192 Kasjon light grey AP,295.00,"1,000.","1,000.",F,K,2/1/2018,,Released,10
06,062,0621,60381285,TISKEN soap dish w suction cup white AP,6.90,"1,000.","1,000.",M,K,2/1/2019,,Released,10
11,113,1131,20432574,OTTSJON hand towel 40x70 white/blue AP,5.90,"1,665.","1,000.",M,K,4/1/2019,,Released,10
11,111,1112,10412595,VARBRACKA qc/2pwc 150x200/50x80 beige/white AP,29.90,"1,666.","1,000.",M,K,10/1/2018,,Released,10
11,111,1112,60412606,VARBRACKA qc/4pwc 200x200/50x80 beige/white AP,39.90,"1,666.","1,000.",M,K,10/1/2018,,Released,10
06,061,0611,30387646,GO wash-stnd w 2 drws 80x47x58 Kasjon lgrey AP,325.00,"2,000.","1,000.",F,K,2/1/2018,,Released,10
02,021,0211,30363990,SINDVIK gls dr 60x38 light grey/clear glass AP,25.00,"1,666.","1,001.",S,K,4/1/2017,,Released,10
11,111,1112,40412607,VARBRACKA qc/4pwc 240x220/50x80 beige/white AP,49.90,"1,666.","1,002.",M,K,10/1/2018,,Released,10
12,121,1211,343404,SPARVORT sheer crtn 1 pair 145x250 white AP,39.90,"1,666.","1,002.",M,K,2/1/2017,,Released,10

def main():

    pool = ThreadPool(processes=5)  # argument name is inherited from process pool, a bit confusing

    def process_row(row):
        print(row)
        # pass  # do something

    # file handler can be directly iterated instead
    # then, you'll get a line instead of a parsed CSV row
    reader = csv.reader(open('test.csv'))

    # pool.map is faster but doesn't guarantee order of results
    pool.imap(process_row, reader)

if __name__ == '__main__':
    main()
PythonNewbie
  • 1,031
  • 1
  • 15
  • 33
  • Does this answer your question? [python multithreading wait till all threads finished](https://stackoverflow.com/questions/11968689/python-multithreading-wait-till-all-threads-finished) – azro Mar 16 '20 at 19:41
  • Save each Thread in an array, then iterate the array and call join on each – azro Mar 16 '20 at 19:41
  • 1
    @azro - Are you thinking about this one: https://stackoverflow.com/a/11968881/13019246 ? – PythonNewbie Mar 16 '20 at 19:42
  • Re, "Once one is finished then start a new one." Start a new what? A new thread? That would be wasteful. Creating and destroying threads is expensive. If you want a limit of five threads, then you should create no more than five threads _total_. Each of the five threads then should operate on many rows of data. – Solomon Slow Mar 16 '20 at 20:27
  • I believe Marat down here wrote is what I was looking for :) – PythonNewbie Mar 16 '20 at 20:30
  • 1
    Yes. Using a thread pool is a good way to avoid creating many threads when you have many small _tasks_ that you want to perform in-parallel/in-the-background. – Solomon Slow Mar 16 '20 at 20:31

2 Answers2

2

The join method is to wait fot the thread to end, just call it on each thread that you start

def main():
    threads = []
    with open('test.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:    
            th = threading.Thread(target=threadingTest, args=(row,))
            threads.append(th)
            th.start()

    for th in threads:
        th.join()
azro
  • 53,056
  • 7
  • 34
  • 70
  • But instead of waiting for one thread to end, is it possible to make it to run a pool of 5? so meaning that the amount of running threads should be 5? if you get what I mean? – PythonNewbie Mar 16 '20 at 19:45
  • @ProtractorNewbie Look at multiprocessing.pool and map (form pool) https://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments . This is a different question , and it is a better idea than a thread per line yep – azro Mar 16 '20 at 19:46
  • Oh, what kind of question am I looking for in that case? I thought it was possible with threading – PythonNewbie Mar 16 '20 at 19:49
1

There is an existing implementation of a ThreadPool included in multiprocessing. Here is an example of how to use it:

import csv
from multiprocessing.pool import ThreadPool

# argument name is inherited from process pool, and is a bit confusing
# will use <number of CPUs> if omitted
pool = ThreadPool(processes=max_threads)

def process_row(row):
    pass  # do something

# file handler can be directly iterated instead
# then, you'll get a line instead of a parsed CSV row
reader = csv.reader(open(filename))

# pool.map is faster but doesn't guarantee order of results
pool.imap(process_row, reader)

UPD: pool.imap is an iterator. It will be automatically evaluated in console, but in a standalone script it must be evaluated explicitly. Fix:

result = list(pool.imap(process_row, reader))
Marat
  • 15,215
  • 2
  • 39
  • 48
  • Hmm im not sure if I understood how it works.. Like are we supposed to read something inside the process_row? because right now if I do print(row) inside the process_row it wouldn't print anything. – PythonNewbie Mar 16 '20 at 20:01
  • @ProtractorNewbie There isn't anything else to add, just put your code inside `process_row`. Reading and parsing of CSV is done by `csv.reader`. I just tried it on a small file with a single `print` statement and it works – Marat Mar 16 '20 at 20:04
  • That is odd. I tried to print out `print(row)` inside the `def process_row(row):` but that doesn't return me anything. just an empty. :O – PythonNewbie Mar 16 '20 at 20:08
  • @ProtractorNewbie indeed, it is odd. the only two variables here are max_threads and filename. Can you share a test file and number of threads used? – Marat Mar 16 '20 at 20:12
  • @ProtractorNewbie just tried it, this exact code works on my machine. What is the environment? – Marat Mar 16 '20 at 20:16
  • Windows 10, Python 3.7.4. – PythonNewbie Mar 16 '20 at 20:18
  • @ProtractorNewbie It is not the environment. Please see the update – Marat Mar 16 '20 at 20:22
  • Ohh there we go! I see the fix now. A question though, is there a possibility to like actually see how this works? Like how do I know for example it is running only 5 at the same time? - I guess by having sleep in the process_row? – PythonNewbie Mar 16 '20 at 20:23
  • @ProtractorNewbie `ThreadPool` takes care of this. Internally it creates a queue and instantiates the specified number of threads to consume from the queue. There is no way it can launch more threads than requested – Marat Mar 16 '20 at 20:29
  • Awesome! It was exactly what I was looking for and to kill the "thread" i guess by using sys.exit()? – PythonNewbie Mar 16 '20 at 20:36
  • Awesome! I appreciate it! It was exactly what I was looking for! – PythonNewbie Mar 16 '20 at 20:38