Here is an example :
import threading
import time
import typing
MAX_NUMBER = 57 # assumed to be inclusive
JOB_SIZE = 10
indexes = tuple(
tuple(range(0, MAX_NUMBER + 1, JOB_SIZE)) + (MAX_NUMBER + 1,)
)
jobs_spans = tuple(zip(indexes, indexes[1:])) # cf https://stackoverflow.com/a/21303286/11384184
print(jobs_spans)
# ((0, 10), (10, 20), (20, 30), (30, 40), (40, 50), (50, 58))
jobs_left = list(jobs_spans) # is thread-safe thanks to the GIL
def process_user(user_id: int) -> None:
sleep_duration = ((user_id // JOB_SIZE) % 3) * 0.4 + 0.1 # just to add some variance to each job
time.sleep(sleep_duration)
def process_users() -> typing.NoReturn:
while True:
try:
job = jobs_left.pop()
except IndexError:
break # no job left
else:
print(f"{threading.current_thread().name!r} processing users from {job[0]} to {job[1]} (exclusive) ...")
for user_id in range(job[0], job[1]):
process_user(user_id)
print(f"user {user_id} processed")
print(f"{threading.current_thread().name!r} finished")
if __name__ == "__main__":
thread1 = threading.Thread(target=process_users)
thread2 = threading.Thread(target=process_users)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
I started by computing the spans that the jobs will cover, using only the number of users and the size of each job.
I use it to define a queue of jobs left. It is actually a list that the threads will pop
onto.
I have two different functions :
- one to process a user given its id, which has nothing to do with threading, i could use it the exact same way in a completely sequential program
- one to handle the threading. It is the
target
of the threads, which means which code will get executed by each threads once it is started
ed. It is an infinite loop, which try to get a new job until there is no more.
I join
each thread to wait for its completion, before the script exits.