0

I'd like to implement a CSP procedure in Python alternating between consuming items from a networked queue and doing health checks on the main execution fiber.

async def lifecycle_consumer(config: LifecycleConfig, pool: AsyncConnectionPool, redis: redis.Redis[Any]):
    await open_pg_pool(pool)

    async def check_pool():
        await pool.check()
        await anyio.sleep(10)

    consumer = Consumer(redis, config.consumer)

    # TODO: what I want
    # while True:
    #     await either(check_pool, bind(do_work, consumer.consume))

    # what I have
    async for job_id, job in consumer.consume():
        # "normal code"
        await do_work(job_id, job)

similar to what I do in Concurrent ML / F# here.

Maybe I start them in parallel and implement negative acknowledgements myself with events and/or cancellations? https://anyio.readthedocs.io/en/stable/synchronization.html#events

Henrik
  • 9,714
  • 5
  • 53
  • 87

0 Answers0