I'd like to implement a CSP procedure in Python alternating between consuming items from a networked queue and doing health checks on the main execution fiber.
async def lifecycle_consumer(config: LifecycleConfig, pool: AsyncConnectionPool, redis: redis.Redis[Any]):
await open_pg_pool(pool)
async def check_pool():
await pool.check()
await anyio.sleep(10)
consumer = Consumer(redis, config.consumer)
# TODO: what I want
# while True:
# await either(check_pool, bind(do_work, consumer.consume))
# what I have
async for job_id, job in consumer.consume():
# "normal code"
await do_work(job_id, job)
similar to what I do in Concurrent ML / F# here.
Maybe I start them in parallel and implement negative acknowledgements myself with events and/or cancellations? https://anyio.readthedocs.io/en/stable/synchronization.html#events