1
for og_raw_file in de_core.file.rglob(raw_path_object.url):
        with de_core.file.open(og_raw_file, mode="rb") as raw_file, de_core.file.open(
            staging_destination_path + de_core.aws.s3.S3FilePath(raw_file.name).file_name, "wb"
        ) as stager_file, concurrent.futures.ThreadPoolExecutor() as executor:
            logger.info("Submitting file to thread to add metadata", raw_file=raw_file)
            executor.submit(
                <long_length_metadata_function_that_I_want_to_parallize>,
                raw_path_object,
                <...rest of arguments to function>            
            )

I want every file to be processed in a separate thread all at once and for the submit not to be blocking. What am I doing wrong? What happens is that each file is submitted one at a time but the next file isn't submitted until the previous one finishes... how do I parallelize this properly?

I would expect the "Submitting file to thread to add metadata" to appear quickly for every file at the beginning since the threads should be submitted and then forgot, but that's not what's happening.

Do I need to do something like this? Why?

future_mapping = {executor.submit(predicate, uri): uri for uri in uris}

    for future in concurrent.futures.as_completed(future_mapping):

The metadata function is basically adding columns to a parquet file. Is this not something I can use threads for given the Python gil?

Jwan622
  • 11,015
  • 21
  • 88
  • 181

1 Answers1

1

Start by reading the docs for Executor.shutdown(), which is called by magic with wait=True when the with block ends.

For the same reason, if you run this trivial program you'll see that you get no useful parallelism either:

def worker(i):
    from time import sleep
    print(f"working on {i}")
    sleep(2)

if __name__ == "__main__":
    from concurrent.futures import ThreadPoolExecutor
    for i in range(10):
        with ThreadPoolExecutor() as ex:
            ex.submit(worker, i)

An executor is intended to by used "many" times after it's created, not just once. You can use it just once, but you don't want to do that ;-)

To "repair" my toy program, swap the lines:

    with ThreadPoolExecutor() as ex:
        for i in range(10):
Tim Peters
  • 67,464
  • 13
  • 126
  • 132