5

I use this method to launch a few dozen (less than thousand) of calls of do_it at different timings in the future:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds

The runtime of each do_it call is very fast (much less than 0.1 ms) and non-blocking. I would like to avoid spawning hundreds of new threads for such a simple task.

How could I do this with only one additional thread for all do_it calls?

Is there a simple way to do this with Python, without third party library and only standard library?

Will Da Silva
  • 6,386
  • 2
  • 27
  • 52
Basj
  • 41,386
  • 99
  • 383
  • 673
  • So you want to ensure you don't overload your system with too many [parallel] threads? This may be relevant for you: https://stackoverflow.com/questions/20886565/using-multiprocessing-process-with-a-maximum-number-of-simultaneous-processes – Adam Smooch Sep 17 '21 at 14:22
  • @AdamSmooch, no I don't want to use multiprocessing. To the contrary, I'd like to use a single thread dedicated to all these `do_it` calls. It's possible because each one of these calls are non-blocking are nearly instantaneous. – Basj Sep 17 '21 at 14:25
  • 2
    So the pattern you're looking for is a single `worker` thread, and your code would publish `jobs` to the `worker's queue`, instead of spawning distinct threads. – Adam Smooch Sep 17 '21 at 14:26
  • @AdamSmooch Yes, how would you do this in a simple way in this context? Each `job` should be cancellable, and should happen at a specific timing, not just one after another. – Basj Sep 17 '21 at 14:32
  • In Python I found concurrent.futures library to be really simple and highly useful. Set the `max_workers` value to 1 and you'll have a single thread for all of the async do_it calls. https://docs.python.org/3/library/concurrent.futures.html – Gui LeFlea Sep 23 '21 at 20:36
  • @Basj is there anything in your question that the current answers have not covered? If so, could you please clarify. – Will Da Silva Sep 27 '21 at 19:38

5 Answers5

6

As I understand it, you want a single worker thread that can process submitted tasks, not in the order they are submitted, but rather in some prioritized order. This seems like a job for the thread-safe queue.PriorityQueue.

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()

This code assumes you want to run forever. If not, you can add a timeout to the q.get in thread_worker, and return when the queue.Empty exception is thrown because the timeout expired. Like that you'll be able to join the queue/thread after all the jobs have been processed, and the timeout has expired.

If you want to wait until some specific time in the future to run the tasks, it gets a bit more complicated. Here's an approach that extends the above approach by sleeping in the worker thread until the specified time has arrived, but be aware that time.sleep is only as accurate as your OS allows it to be.

from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
    when: datetime
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
    while True:
        when, item = astuple(q.get())
        sleep_time = (when - datetime.now()).total_seconds()
        if sleep_time > 0:
            sleep(sleep_time)
        do_it(item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    now = datetime.now()
    for i in range(20):
        q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
    wait_for_something_else()

To address this problem using only a single extra thread we have to sleep in that thread, so it's possible that new tasks with higher priority could come in while the worker is sleeping. In that case the worker would process that new high priority task after it's done with the current one. The above code assumes that scenario will not happen, which seems reasonable based on the problem description. If that might happen you can alter the sleep code to repeatedly poll if the task at the front of the priority queue has come due. The disadvantage with a polling approach like that is that it would be more CPU intensive.

Also, if you can guarantee that the relative order of the tasks won't change after they've been submitted to the worker, then you can replace the priority queue with a regular queue.Queue to simplify the code somewhat.

These do_it tasks can be cancelled by removing them from the queue.

The above code was tested with the following mock definitions:

def do_it(x):
    print(x)

def wait_for_something_else():
    sleep(5)

An alternative approach that uses no extra threads would be to use asyncio, as pointed out by smcjones. Here's an approach using asyncio that calls do_it at specific times in the future by using loop.call_later:

import asyncio


def do_it(x):
    print(x)


async def wait_for_something_else():
    await asyncio.sleep(5)


async def main():
    loop = asyncio.get_event_loop()
    while True:
        for i in range(20):
            loop.call_later(i * 0.010, do_it, i)
        await wait_for_something_else()

asyncio.run(main())

These do_it tasks can be cancelled using the handle returned by loop.call_later.

This approach will, however, require either switching over your program to use asyncio throughout, or running the asyncio event loop in a separate thread.

Will Da Silva
  • 6,386
  • 2
  • 27
  • 52
2

It sounds like you want something to be non-blocking and asynchronous, but also single-processed and single-threaded (one thread dedicated to do_it).

If this is the case, and especially if any networking is involved, so long as you're not actively doing serious I/O on your main thread, it is probably worthwhile using asyncio instead.

It's designed to handle non-blocking operations, and allows you to make all of your requests without waiting for a response.

Example:

import asyncio


def main():
    while True:
        tasks = []
        for i in range(20):
            tasks.append(asyncio.create_task(do_it(i)))  
        await wait_for_something_else()
        for task in tasks:
            await task

asyncio.run(main())

Given the time spent on blocking I/O (seconds) - you'll probably waste more time managing threads than you will save on generating a separate thread to do these other operations.

smcjones
  • 5,490
  • 1
  • 23
  • 39
  • The question specifies that they want to make "calls of do_it at different timings in the future", but this approach will call them as soon as it gets around to them, likely running them earlier than desired. – Will Da Silva Sep 29 '21 at 13:44
0

As you have said that in your code each series of 20 do_it calls starts when wait_for_something_else is finished, I would recommend calling the join method in each iteration of the while loop:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds
    for t in timers[-20:]:
        t.join()
Red
  • 26,798
  • 7
  • 36
  • 58
  • Every use of `threading.Timer` creates a thread, but the goal stated by the question is to avoid starting many threads for the `do_it` tasks. Specifically it asks "How could I do this with only one additional thread for all do_it calls?" – Will Da Silva Sep 24 '21 at 20:22
0

I don't have a ton of experience with threading in Python, so please go easy on me. The concurrent.futures library is a part of Python3 and it's dead simple. I'm providing an example for you so you can see how straightforward it is.

Concurrent.futures with exactly one thread for do_it() and concurrency:

import concurrent.futures
import time

def do_it(iteration):
  time.sleep(0.1)
  print('do it counter', iteration)
 
def wait_for_something_else():
    time.sleep(1)
    print('waiting for something else')

def single_thread():
  with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    futures = (executor.submit(do_it, i) for i in range(20))
    for future in concurrent.futures.as_completed(futures):
        future.result()
 
def do_asap():
  wait_for_something_else()

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(single_thread), executor.submit(do_asap)] 
    for future in concurrent.futures.as_completed(futures):
        future.result()

The code above uses max_workers=1 threads to execute do_it() in a single thread. On line 13, do_it() is constrained to a single thread using the option max_workers=1 to limit the work to exactly one thread.

On line 22, both methods are submitted to the concurrent.futures thread pool executor. The code from lines 21-24 enables both methods to run in a thread pool and do_it runs on a single non-blocking thread.

The concurrent.futures doc describes how to control the number of threads. When max_workers is not specified, the total number of threads assigned to both processes is max_workers = min(32, os.cpu_count() + 4).

Gui LeFlea
  • 795
  • 3
  • 12
  • This will block the main thread when you call `concurrent.futures.as_completed`, so `wait_for_something_else` won't run until all of the `do_it` tasks have finished. – Will Da Silva Sep 24 '21 at 20:20
  • My understanding is that the question author wants to run `wait_for_something_else` while all of the `do_it` calls are executing in the background at their set times. `concurrent.futures.as_complete` blocks until all of the `do_it` calls have finished, so `wait_for_something_else` won't be able to run while the `do_it` calls do. – Will Da Silva Sep 26 '21 at 02:01
  • 1
    That's a good explanation of the difference @WillDaSilva. Now I see that `concurrent.futures` makes it possible to control how many threads are assigned to the `do_it` process while it is running. `Concurrent.futures` does not enable the first method and the second method to run in *separate concurrent* threads. – Gui LeFlea Sep 27 '21 at 20:56
  • I revised my answer so that `do_it()` runs in a single non-blocking thread concurrent with `wait_for_something_else()` based on @WillDaSilva observation. Thanks! – Gui LeFlea Sep 28 '21 at 14:31
0

do_it run in order and cancellable

run all do_it in one thread and sleep for the specific timing (may not with sleep)

use a variable "should_run_it" to check the do_it should run or not (cancellable?)

it's that something like this?

import threading
import time

def do_it(i):
    print(f"[{i}] {time.time()}")

should_run_it = {i:True for i in range(20)}

def guard_do_it(i):
    if should_run_it[i]:
        do_it(i)

def run_do_it():
    for i in range(20):
        guard_do_it(i)
        time.sleep(0.010)

if __name__ == "__main__":
    t = threading.Timer(0.010, run_do_it)
    start = time.time()
    print(start)
    t.start()
    #should_run_it[5] = should_run_it[10] = should_run_it[15] = False # test
    t.join()
    end = time.time()
    print(end)
    print(end - start)
Alan
  • 61
  • 1
  • 4