1

Objective

I am processing some files then uploading them to the cloud. My objective is to have an async producer and consumer. The producer copies data from a database, and compresses files, while the consumer uploads files to the cloud. My objective is to produce tasks as soon as they are available but only consume them one by one (I am trying to avoid sending too many files to s3 as I get throttled by aws).

What I have tried to do

I have successfully created a queue of a consumer and producer; however, my producer seems to be producing one item at a time. Here is the code I have tried (please let me know if I can add any further code):

import asyncio
import concurrent.futures
import logging
from asyncio import Queue

import aiobotocore
import asyncpg
import uvloop
from asyncpg.pool import Pool

from aws_async import upload_to_s3
from helpers import gather_with_concurrency, chunk_query, build_query, parquet_data
from pg_async import pg_copy
from resources.config import ARGS, PG_CONFIG, AWS_S3_CONFIG

# Defaults to CPU count and spawns up to 32 threads
EXECUTOR = concurrent.futures.ThreadPoolExecutor()


async def producer(query: str, pool: Pool, q: Queue):
    # I expect all that loop to run async but it runs 1 by 1..
    for qr in query:
        # Process query and place (asynchronously) into queue
        query_data = await pg_copy(pool, qr)
        await q.put(query_data)


async def consume(queue, client, bucket):
    while True:
        item = await queue.get()
        if item is None:
            break
        await upload_to_s3(client, bucket, item)
        queue.task_done()


async def main():
    # Define parameters based on parsed args
    queries = build_query(ARGS.query, bool(ARGS.chunks), chunk_query, chunks=ARGS.chunks, chunk_col=ARGS.chunk_col)

    # Define pool
    pool = await asyncpg.create_pool(**PG_CONFIG)

    # Define Aiobotocore async client
    async with aiobotocore.get_session().create_client(**AWS_S3_CONFIG) as client:
        # Prepare tasks from queries
        queue = asyncio.Queue()
        consumer = asyncio.create_task(consume(queue, client, bucket=ARGS.bucket))
        await gather_with_concurrency(ARGS.parallelism, producer(queries, pool, queue))
        # wait for the remaining tasks to be processed
        await queue.join()
        consumer.cancel()


uvloop.install()
asyncio.run(main())

Thanks to this answer, my gather_with_concurrency (which I use to throttle concurrent tasks running at a time) looks like:

async def gather_with_concurrency(n: int, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))

What I am trying to achieve

The current behavior is that the producer is producing items 1 by 1. I want to have my producer processing all tasks asynchronously and place them into the queue as soon as they are made available (while still keeping the throttle that I have created), but my consumer consuming 1 task at a time.

Happy to add any further information if needed = )

Edit: Thanks to dirn for helping me clarify my question earlier

alt-f4
  • 2,112
  • 17
  • 49
  • Similar to before, you only have one producer. You’ll need to pass more into `gather_with_concurrency`. – dirn Feb 28 '21 at 13:15
  • I believe you completely misunderstood the intended usage of `gather_with_concurrency`. It doesn't _provide_ concurrency, it _limits_ concurrency to the specified amount. It is designed for when you have a bunch of tasks to do, and just calling `gather(*jobs)` would try to run them all at once, so you need to throttle them. If you use the producer-consumer pattern, you get concurrency by creating multiple producers, or multiple consumers, or both. In that case you don't need to limit the concurrency because it's naturally limited by producer/consumer count. – user4815162342 Feb 28 '21 at 19:38
  • In other words, you need to change `await gather_with_concurrency(ARGS.parallelism, producer(queries, pool, queue))` to something like `await asyncio.gather(*[producer(query, pool, queue) for query in queries])`. – user4815162342 Feb 28 '21 at 19:40

0 Answers0