0

I am using Set() to do rate limiting feature in my program. But there are some weird behaviours.

  1. there are some residue in task_set which are not suppose to happen. I have added add_done_callback that the task will remove itself from task_set after it's done.

  2. KeyError are sometimes thrown. But I have no ideas why, the task in task_set should not be removed more than 1 time

I have tried different approach (semaphore, queue...) but none of them worked as expected... I will be appreciated if anyone can share some code snippet on how to implement rate limiting in async programming

import asyncio


def main():
    async def task(acc_id):
        print(f"acc_id {acc_id} received the task")
        await asyncio.sleep(5)
        print(f"acc_id {acc_id} finished the task")

    async def loop_tasks():
        task_set = set()

        for acc_id in range(1000000):
            # pause when reached the limit
            while len(task_set) > 10:
                await asyncio.sleep(0)

            t = event_loop.create_task(task(acc_id))
            task_set.add(t)
            t.add_done_callback(lambda _: task_set.remove(t))

    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(loop_tasks())


main()
zevcc
  • 25
  • 4

1 Answers1

2

I looked at the program output exception and found that t was the same value.

A review of the documentation shows that when callback is called, the Future object is its only argument.

After making the following changes, the program runs normally.

t.add_done_callback(lambda current_task: task_set.remove(current_task))
pppig
  • 1,215
  • 1
  • 6
  • 12