8

My script loops through each line of an input file and performs some actions using the string in each line. Since the tasks performed on each line are independent of each other, I decided to separate the task into threads so that the script doesn't have to wait for the task to complete to continue with the loop. The code is given below.


def myFunction(line, param):
    # Doing something with line and param
    # Sends multiple HTTP requests and parse the response and produce outputs
    # Returns nothing

param = arg[1]   
with open(targets, "r") as listfile:
    for line in listfile:
        print("Starting a thread for: ",line)
        t=threading.Thread(target=myFunction, args=(line, param,)) 
        threads.append(t)
        t.start()

I realized that this is a bad idea as the number of lines in the input file grew large. With this code, there would be as many threads as the number of lines. Researched a bit and figured that queues would be the way.

I want to understand the optimal way of using queues for this scenario and if there are any alternatives which I can use.

hax
  • 282
  • 1
  • 17
  • 2
    If task performed on lines are not includes any I/O, threading seems meaningless here. See https://stackoverflow.com/q/20939299/12291742 – Ekrem Dinçel Sep 29 '20 at 16:20
  • The task inside myFunction involves multiple HTTP requests and parsing of the response. From the post you linked, my understanding is that I would not see any meaningful performance increase with threading even if that is the case. Did I get that right? – hax Sep 29 '20 at 16:28
  • Read this "Python threading is great for creating a responsive GUI, or for handling multiple short web requests where I/O is the bottleneck more than the Python code.". – hax Sep 29 '20 at 16:32
  • 1
    So for my use case, threading would work as it isn't computationally intensive. Most of the delay is because of http request and response delays. – hax Sep 29 '20 at 16:33
  • Yeah, you are doing right then. I already said it is meaningless if tasks do not include any I/O. If you want to limit the number of running threads, I think one of the alternatives is using [ThreadPoolExecutor](https://docs.python.org/dev/library/concurrent.futures.html#threadpoolexecutor) – Ekrem Dinçel Sep 29 '20 at 16:58
  • Thanks. I will explore that option. – hax Sep 29 '20 at 17:48
  • You say your function _sends multiple HTTP requests and parse the response and produce outputs_, but returns nothing. What do you mean by "produce outputs"? `print`ing from multiple threads can result in unintended, interleaved, output. – GordonAitchJay Oct 03 '20 at 13:15
  • 1
    @GordonAitchJay Each thread will write to a separate file, but the function returns no value. – hax Oct 05 '20 at 16:06
  • Sweet, nothing to worry about then. – GordonAitchJay Oct 07 '20 at 05:52

4 Answers4

7

To go around this problem, you can use the concept of Thread Pools, where you define a fixed number of Threads/workers to be used, for example 5 workers, and whenever a Thread finishes executing, an other Future(ly) submmited thread would take its place automatically.

Example :

import concurrent.futures

def myFunction(line, param):
    print("Done with :", line, param)

param = "param_example"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    with open("targets", "r") as listfile:
        for line in listfile:
            print("Starting a thread for: ", line)
            futures.append(executor.submit(myFunction, line=line, param=param))

    # waiting for the threads to finish and maybe print a result :
    for future in concurrent.futures.as_completed(futures):
        print(future.result()) # an Exceptino should be handled here!!!
Pixel_teK
  • 793
  • 5
  • 16
5

Queues are one way to do it. The way to use them is to put function parameters on a queue, and use threads to get them and do the processing.

The queue size doesn't matter too much in this case because reading the next line is fast. In another case, a more optimized solution would be to set the queue size to at least twice the number of threads. That way if all threads finish processing an item from the queue at the same time, they will all have the next item in the queue ready to be processed.

To avoid complicating the code threads can be set as daemonic so that they don't stop the program from finishing after the processing is done. They will be terminated when the main process finishes.

The alternative is to put a special item on the queue (like None) for each thread and make the threads exit after getting it from the queue and then join the threads.

For the examples bellow the number of worker threads is set using the workers variable.

Here is an example of a solution using a queue.

from queue import Queue
from threading import Thread

queue = Queue(workers * 2)
def work():
    while True:
        myFunction(*queue.get())
        queue.task_done()

for _ in range(workers):
    Thread(target=work, daemon=True).start()

with open(targets, 'r') as listfile:
    for line in listfile:
        queue.put((line, param))
queue.join()

A simpler solution might be using ThreadPoolExecutor. It is especially simple in this case because the function being called doesn't return anything that needs to be used in the main thread.

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=workers) as executor:
    with open(targets, 'r') as listfile:
        for line in listfile:
            executor.submit(myFunction, line, param)

Also, if it's not a problem to have all lines stored in memory, there is a solution which doesn't use anything other than threads. The work is split in such a way that the threads read some lines from a list and ignore other lines. A simple example with two threads is where one thread reads odd lines and the other reads even lines.

from threading import Thread

with open(targets, 'r') as listfile:
    lines = listfile.readlines()

def work_split(n):
    for line in lines[n::workers]:
        myFunction(line, param)

threads = []
for n in range(workers):
    t = Thread(target=work_split, args=(n,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

I have done a quick benchmark and the Queue is slightly faster than the ThreadPoolExecutor, but the solution with the split work is faster than both.

GitFront
  • 894
  • 3
  • 11
0

From the code you have reported, has no sense the use of thread. This because there aren't any I/O operations, and so the threads are executed in a linear way without multithread. The GIL (Global Interpreter Lock) is never released by a thread in this case, so the application is only apparently using multithreading, in reality the interpreter is using only one CPU for the program and one thread at time. In this way you don't have any advantages on use of thread, on the contrary you can have a performance degradation for this scenario, due to the switch context, and to the thread initialization overhead when a thread starts.

The only way to have better performance in this scenario, if applicable in this case, is a multiprocess program. But pay attention on the number of process that you start, remember that every process has its own interpreter.

Zig Razor
  • 3,381
  • 2
  • 15
  • 35
0

It was a good answer by GitFront. This answer just adds one more option using the multiprocessing package. Using concurrent.futures or multiprocessing depends on particular requirements. Multiprocessing has a lot more options comparatively but for the given question the results should be near identical in the simplest case.

from multiprocessing import cpu_count, Pool
PROCESSES = cpu_count() # Warning: uses all cores

def pool_method(listfile, param):
    p = Pool(processes=PROCESSES)
    checker = [p.apply_async(myFunction, (line, param)) for line in listfile]

...

There are various other methods too other than "apply_async", but this should work well for your needs.

nnarenraju
  • 11
  • 2