Objective
I am processing some files then uploading them to the cloud. My objective is to have an async producer and consumer. The producer copies data from a database, and compresses files, while the consumer uploads files to the cloud. My objective is to produce tasks as soon as they are available but only consume them one by one (I am trying to avoid sending too many files to s3 as I get throttled by aws).
What I have tried to do
I have successfully created a queue of a consumer and producer; however, my producer seems to be producing one item at a time. Here is the code I have tried (please let me know if I can add any further code):
import asyncio
import concurrent.futures
import logging
from asyncio import Queue
import aiobotocore
import asyncpg
import uvloop
from asyncpg.pool import Pool
from aws_async import upload_to_s3
from helpers import gather_with_concurrency, chunk_query, build_query, parquet_data
from pg_async import pg_copy
from resources.config import ARGS, PG_CONFIG, AWS_S3_CONFIG
# Defaults to CPU count and spawns up to 32 threads
EXECUTOR = concurrent.futures.ThreadPoolExecutor()
async def producer(query: str, pool: Pool, q: Queue):
# I expect all that loop to run async but it runs 1 by 1..
for qr in query:
# Process query and place (asynchronously) into queue
query_data = await pg_copy(pool, qr)
await q.put(query_data)
async def consume(queue, client, bucket):
while True:
item = await queue.get()
if item is None:
break
await upload_to_s3(client, bucket, item)
queue.task_done()
async def main():
# Define parameters based on parsed args
queries = build_query(ARGS.query, bool(ARGS.chunks), chunk_query, chunks=ARGS.chunks, chunk_col=ARGS.chunk_col)
# Define pool
pool = await asyncpg.create_pool(**PG_CONFIG)
# Define Aiobotocore async client
async with aiobotocore.get_session().create_client(**AWS_S3_CONFIG) as client:
# Prepare tasks from queries
queue = asyncio.Queue()
consumer = asyncio.create_task(consume(queue, client, bucket=ARGS.bucket))
await gather_with_concurrency(ARGS.parallelism, producer(queries, pool, queue))
# wait for the remaining tasks to be processed
await queue.join()
consumer.cancel()
uvloop.install()
asyncio.run(main())
Thanks to this answer, my gather_with_concurrency
(which I use to throttle concurrent tasks running at a time) looks like:
async def gather_with_concurrency(n: int, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
What I am trying to achieve
The current behavior is that the producer is producing items 1 by 1. I want to have my producer processing all tasks asynchronously and place them into the queue as soon as they are made available (while still keeping the throttle that I have created), but my consumer consuming 1 task at a time.
Happy to add any further information if needed = )
Edit: Thanks to dirn for helping me clarify my question earlier