0

I have around 10 mil entries that I want to process. Currently, I go over entry by entry and run a subroutine, and when, let's say, 1000 entries are processed, I open a csv file and save results to it.

for num, i in enumerate (some iterator)
       function(i)
       if num is multiple of 1000
              open file and save

How can I do the saving part while utilizing queue and threading? Now, I put the 10 mill entries in the queue, and fire up threads to run the subroutine. It works, but I cannot wrap my head around the saving part.

put all entries in queue
for i in number of threads
    run function
    start thread
Louis
  • 1,265
  • 1
  • 15
  • 22

3 Answers3

0

So a couple things. You will want each thread to write to a separate file, then merge the files at the end. Using a locking mechanism will work, but can potentially force your applications performance back to a single thread depending on how much you write to the CSV.

A great tutorial to create pools and queues is located here:

https://www.metachris.com/2016/04/python-threadpool/

And:

Threading pool similar to the multiprocessing Pool?

At the end you will want to merge your files (if needed), it's best to do this at the OS level, but in python you can do:

filenames = ['file1.txt', 'file2.txt', ...]
with open('path/to/output/file', 'w') as outfile:
    for fname in filenames:
        with open(fname) as infile:
            for line in infile:
                outfile.write(line)
eatmeimadanish
  • 3,809
  • 1
  • 14
  • 20
0

This is assuming you have all of the other threading set up

Where you initialize your threads you need to create a thread lock object

threadLock = threading.Lock()

Then in the function where you are writing you do something essentially like this:

for num, i in enumerate (some iterator)
    function(i)
    if num is multiple of 1000

        threadLock.acquire()
        #open file with an append
        #save
        #close file
        threadLock.release()

The threadLock.acquire() might need to go before the if statement

Locking "closes the door" on the other threads for a certain part of code or accessing a shared variable like a file and they have to wait there turn to go through the door while another thread is already using it

SPYBUG96
  • 1,089
  • 5
  • 20
  • 38
0

Use the "secret sauce" of CPython threading -- Queues!

Writing to a file is inherently sequential, so you might as well put one single thread in charge of all the writing. Have all the worker threads push their results into a common output queue. Have the single writer thread read from this output queue and write to the csv every 1000 entries or when all the worker threads are done.

By doing it this way you avoid the headache of needing locks, or merging partial files afterwards.


Here is the basic structure I am suggesting. It creates 2500 entries, precesses them with 5 threads, and outputs after every 10 results:

import queue
import threading
SENTINEL = None

def worker(in_queue, out_queue):
    for n in iter(in_queue.get, SENTINEL):
        # print('task called: {n}'.format(n=n))
        out_queue.put(n*2)

def write(out_queue, chunksize=10):
    results = []
    for n in iter(out_queue.get, SENTINEL):
        results.append(n)
        if len(results) >= chunksize:
            print(results)
            results = []
    if len(results):
        # SENTINEL signals the worker threads are done.
        # print the remainder of the results
        print(results)

in_queue = queue.Queue()
out_queue = queue.Queue()
num_threads = 5

N = 2500
for i in range(N):
    in_queue.put(i)

for i in range(num_threads):
    # ad a SENTINEL to tell each worker to end
    in_queue.put(SENTINEL)

writer = threading.Thread(target=write, args=(out_queue,))
writer.start()
threads = [threading.Thread(target=worker, args=(in_queue, out_queue))
           for n in range(num_threads)]

for t in threads:
    t.start()

for t in threads:
    t.join()

# tell the writer to end
out_queue.put(SENTINEL)            
writer.join()

which prints

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
[20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
[40, 42, 44, 46, 48, 50, 52, 54, 56, 58]
...
[4940, 4942, 4944, 4946, 4948, 4950, 4952, 4954, 4956, 4958]
[4960, 4962, 4964, 4966, 4968, 4970, 4972, 4974, 4976, 4978]
[4980, 4982, 4984, 4986, 4988, 4990, 4992, 4994, 4996, 4998]

Note that the values printed may not appear in sorted order. It depends on the order in which the concurrent threads push results into out_queue.

unutbu
  • 842,883
  • 184
  • 1,785
  • 1,677