0

I have a list with a length of 100. and I am running it concurrently with threadpool. I can add the time delay inside the executing function, but I would like to have a code that automatically sleeps for X seconds when 10 successful thread execution happen.

import time
from concurrent.futures import ThreadPoolExecutor

user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]

def parse(user):
    return str(user) + "parsed!"

with ThreadPoolExecutor(max_workers=10) as exe:
   start = time.time()
   result = exe.map(parse,user_list)
   output = list(result)
   end = time.time()
   print('taken time' end-start)

I want to add a time delay between each 10 successful thread executions. I hope my question is clear, and it can be solved with some sheduling

Aditya Rajgor
  • 953
  • 8
  • 14

2 Answers2

2

The straightforward way is to just submit up to 10 jobs at a time, then sleep between each chunk:

import itertools
import time
from concurrent.futures import ThreadPoolExecutor


# See https://stackoverflow.com/a/8991553/51685
def chunker(n, iterable):
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, n))
        if not chunk:
            return
        yield chunk


def parse(user):
    return f"{user} parsed!"


def main():
    user_list = list(range(100))
    with ThreadPoolExecutor(max_workers=10) as exe:
        for chunk in chunker(10, user_list):
            start = time.time()
            result = exe.map(parse, chunk)
            output = list(result)
            end = time.time()
            print(output, "taken time", end - start)
            time.sleep(1)


if __name__ == "__main__":
    main()

This prints out e.g.

['0 parsed!', '1 parsed!', '2 parsed!', '3 parsed!', '4 parsed!', '5 parsed!', '6 parsed!', '7 parsed!', '8 parsed!', '9 parsed!'] taken time 0.0006809234619140625
['10 parsed!', '11 parsed!', '12 parsed!', '13 parsed!', '14 parsed!', '15 parsed!', '16 parsed!', '17 parsed!', '18 parsed!', '19 parsed!'] taken time 0.0008037090301513672
['20 parsed!', '21 parsed!', '22 parsed!', '23 parsed!', '24 parsed!', '25 parsed!', '26 parsed!', '27 parsed!', '28 parsed!', '29 parsed!'] taken time 0.0008540153503417969
...

EDIT for tqdm progress

To use tqdm with this approach so it gets updated on each parse step, you'll need something like the below (bits identical to the above replaced with ...).

(tqdm won't update the screen unless enough time has passed since the last time it did, hence the random sleep to represent work done.)

def parse(user, prog):
    time.sleep(random.uniform(.1, 1.3))  # Do work here...
    prog.update()  # Step the progress bar.
    return f"{user} parsed!"


def main():
    # ...
    with ThreadPoolExecutor(max_workers=10) as exe, tqdm.tqdm(total=len(user_list)) as prog:
        for chunk in chunker(10, user_list):
            # ...
            result = exe.map(parse, chunk, [prog] * len(chunk))
            # ...
AKX
  • 152,115
  • 15
  • 115
  • 172
  • Thanks for the answer, I have one small query, How can I track how many have succeeded. I have used `tqdm` library it does work with each chunk but it's not interactive, any help on that. @AKX – Aditya Rajgor Nov 22 '21 at 06:39
  • Sure – added an example on how to use tqdm with this. – AKX Nov 22 '21 at 06:51
0

I made a global variable which counts the number of times the parse() function has run and whenever it is completely divisible by 10(no_of_times_run % 10 == 0) it sleeps for a certain amount of time.

Code attached for your reference.

import time
from concurrent.futures import ThreadPoolExecutor

user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]

no_of_times_run = 0

def parse(user):
    global no_of_times_run

    no_of_times_run += 1
    if no_of_times_run % 10 == 0:
        time.sleep(1) # Every 10th time it sleeps for a certain time.

    return str(user) + "parsed!"

with ThreadPoolExecutor(max_workers=10) as exe:
   start = time.time()
   result = exe.map(parse,user_list)
   output = list(result)
   end = time.time()
   print('taken time', end-start)