0

I have a dictionary of folder names that I would like to process in parallel. Under each folder, there is an array of file names that I would like to process in series:

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

Ultimately, I will be creating a folder named folder_name which contains the files with names len(folder_file_dict[folder_name][file_names_key]). I have a method like so:

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

Note the time_consuming_method() above, which takes a long time due to calls over a UDP port. I am also limited to using the UDP ports in the array above. Thus, I have to wait for time_consuming_method to complete on a UDP port before I can use that UDP port again. This means that I can only have len(udp_ports) threads running at a time.

Thus, I will ultimately create len(folder_file_dict.keys()) threads, with len(folder_file_dict.keys()) calls to process_files_in_series. I also have a MAX_THREAD count. I am trying to use the Queue and Threading modules, but I am not sure what kind of design I need. How can I do this using Queues and Threads, and possibly Conditions as well? A solution that uses a thread pool may also be helpful.

NOTE

I am not trying to increase the read/write speed. I am trying to parallelize the calls to time_consuming_method under process_files_in_series. Creating these files is just part of the process, but not the rate limiting step.

Also, I am looking for a solution that uses Queue, Threading, and possible Condition modules, or anything relevant to those modules. A threadpool solution may also be helpful. I cannot use processes, only threads.

I am also looking for a solution in Python 2.7.

modulitos
  • 14,737
  • 16
  • 67
  • 110
  • the simplest solution (codewise) is to use a thread pool e.g., `multiprocessing.dummy.Pool.map()`, here's [code example](http://stackoverflow.com/a/14594205/4279). Why do you want to process files in parallel? If all filenames are on the same physical disk; the parallel processing might not improve the time performance (may be the opposite). On the other hand if the process is CPU-bound then you should use processes instead of threads (if `process_files_in_series()` doesn't release GIL). – jfs Oct 29 '14 at 17:06
  • Processing each file, inside the `process_files_in_series` method, takes a long time. Thus I am trying to parallelize the calls on `process_files_in_series` by calling it once for each folder, in parallel. – modulitos Oct 29 '14 at 17:16
  • 1
    If your disk can read/write only 100MB/s then no amount of threads will make your code faster if it is already reading/writing at 100MB/s. – jfs Oct 29 '14 at 17:19
  • My post is corrected above. My writing speeds are not near the maximum because there is a long method that I need to execute within `process_files_in_series`. Sorry if it was misleading, but I am not trying to increase my read/write speed. – modulitos Oct 29 '14 at 17:27
  • Raspberry Pi's only have one core, don't they? You're not going to be able to do anything in parallel in that case. – dano Oct 29 '14 at 17:28
  • if you can use Queues and Threads then you can use a thread pool. The latter just uses the former to provide an easy to use interface. Here's [how threads, queues could be used directly](http://stackoverflow.com/a/9874484/4279). – jfs Oct 29 '14 at 17:33
  • @dano Thus I am trying to perform multi-threading, but not multi-processing. AFAIK, spinning up threads is all software. – modulitos Oct 29 '14 at 17:33
  • @J.F.Sebastian I believe you are correct about the use of a thread pool. However, at least for learning purposes, I am trying to use Queues and Threads/Conditions. Is is ridiculous to do so? – modulitos Oct 29 '14 at 17:37
  • @Lucas: no it is useful to look below the abstraction level of the code you use i.e., if you use a thread pool then (once in a life time) you could implement your own (and throw it away and use the already written/tested/bug-fixed/optimized etc thread pool implementations instead such as `multiprocess.dummy.Pool`, `concurrent.futures.ThreadPoolExecutor`). – jfs Oct 29 '14 at 17:39
  • 1
    If there is only one core then CPU-bound code (computations) might not run faster if you run it in parallel. Neither threads nor processes would help here. – jfs Oct 29 '14 at 17:43
  • Right, two threads with only one core is only going to help if one of the threads is spending a bunch of time doing blocking I/O, which isn't really the case here. reading the files from disk isn't going to block, it's going to constantly return data that Python needs to do something with, which requires the CPU. – dano Oct 29 '14 at 17:45
  • @J.F.Sebastian My rate-limiting method, ie `time_consuming_method` above, takes a long time due to calls over a UDP port. I don't believe that it is CPU-bound, and I would benefit from the parallization. Thanks for the observation. – modulitos Oct 29 '14 at 18:03
  • @Lucas: [edit] your question and include that crucial info: *"`time_consuming_method()` above, takes a long time due to calls over a UDP port."* -- it implies that your code is I/O-bound and the bottleneck might be network I/O, not disk. What happens if you run `from multiprocessing.pool import ThreadPool`? – jfs Oct 29 '14 at 18:22
  • @J.F.Sebastian I just edited my answer. I also included some relevant information about the number of UDP ports available. I tried abstracting the UDP port limit info initially, but I think it may be crucial to the problem. Any suggestions would be great. I am also using Python 2.7 – modulitos Oct 29 '14 at 18:39
  • @J.F.Sebastian running `from multiprocessing.pool import ThreadPool` works for me. Does this mean that `ThreadPool` can provide a suitable answer? – modulitos Oct 29 '14 at 18:59
  • @Lucas: yes. `ThreadPool` and `dummy.Pool` is the same thing. See [my answer](http://stackoverflow.com/a/26638490/4279). – jfs Oct 29 '14 at 19:10

3 Answers3

1

Using a thread pool:

#!/usr/bin/env python2
from multiprocessing.dummy import Pool, Queue # thread pool

folder_file_dict = {
    folder_name: {
        file_names_key: file_names_array
    }
}

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"
         ...

def mp_process(filenames):
    udp_port = free_udp_ports.get() # block until a free udp port is available
    args = filenames, udp_port
    try:
        return args, process_files_in_series(*args), None
    except Exception as e:
        return args, None, str(e)
    finally:
        free_udp_ports.put_nowait(udp_port)

free_udp_ports = Queue() # in general, use initializer to pass it to children
for port in udp_ports:
    free_udp_ports.put_nowait(port)
pool = Pool(number_of_concurrent_jobs) #
for args, result, error in pool.imap_unordered(mp_process, get_files_arrays()):
    if error is not None:
       print args, error

I don't think you need to bind number of threads to number of udp ports if the processing time may differ for different filenames arrays.

If I understand the structure of folder_file_dict correctly then to generate the filenames arrays:

def get_files_arrays(folder_file_dict=folder_file_dict):
    for folder_name_dict in folder_file_dict.itervalues():
        for filenames_array in folder_name_dict.itervalues():
            yield filenames_array
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • @J.F.Sebastion Thanks for the help, but how can I know which udp port is available? I think it would be best to switch ports only when the thread is finished...I may not have a way to detect when which port is available otherwise. – modulitos Oct 29 '14 at 20:18
  • @Lucas: you could use a queue of free udp ports if there are no other methods to detect (such as to perform a non-blocking operation that succeeds if the port is available). Call `udp_port = queue.get()` to get port, call `queue.put_nowait(udp_port)` in a `finally` clause at the end of `mp_process` to free it. There is `multiprocessing.dummy.Queue` class. – jfs Oct 29 '14 at 20:24
  • @J.F.Sebastion You are awesome. This solution has been extremely helpful. Thank you! – modulitos Oct 30 '14 at 03:53
0

Use the multiprocessing.pool.ThreadPool. It handles queue / thread management for you and can be easily changed to do multiprocessing instead.

EDIT: Added example

Here's an example... multiple threads may end up using the same udp port. I'm not sure if that's a problem for you.

import multithreading
import multithreading.pool
import itertools

def process_files_in_series(file_names_array, udp_port):
    for file_name in file_names_array:
         time_consuming_method(file_name, udp_port)
         # create "file_name"

udp_ports = [123, 456, 789]

folder_file_dict = {
         folder_name : {
                         file_names_key : [file_names_array]
                       }
        }

def main(folder_file_dict, udp_ports):
    # number of threads - here I'm limiting to the smaller of udp_ports,
    # file lists to process and a cap I arbitrarily set to 4
    num_threads = min(len(folder_file_dict), len(udp_ports), 4)
    # the pool
    pool = multithreading.pool.ThreadPool(num_threads)
    # build files to be processed into list. You may want to do other
    # Things like join folder_name...
    file_arrays = [value['file_names_key'] for value in folder_file_dict.values()]
    # do the work
    pool.map(process_files_in_series, zip(file_arrays, itertools.cycle(udp_ports))
    pool.close()
    pool.join()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Can you outline a solution using the ThreadPool? (ideally with Python 2.7?). I have an issue where I am limited to only a few threads due to UDP port limitations (updated answer above) – modulitos Oct 29 '14 at 18:43
  • 1
    If multiple threads use the same UDP port; it effectively serializes them (only one thread may run at a time): *"Thus, I have to wait for time_consuming_method to complete on a UDP port before I can use that UDP port again"*. You could use `threading.local()` if you want to assign each thread its own udp port. Or write a function that pick the first available port [as I've suggested](http://stackoverflow.com/a/26638490/4279). – jfs Oct 29 '14 at 19:32
  • @J.F.Sebastion Great suggestion, I'll look into `threading.local()` as well. – modulitos Oct 30 '14 at 03:53
0

This is kind of a blue print to how you could use multiprocessing.Process with JoinableQueue 's to deliver Jobs to Workers. You will still be bound by I/O but with Process you do have true concurrency, which may prove to be useful, since threading may even be slower than a normal script processing the files.

(Be aware that this will also prevent you from doing anything else with your Laptop if you dare to start too many processes at once :P).

I tried to explain the code as much as possible with comments.

import traceback

from multiprocessing import Process, JoinableQueue, cpu_count

# Number if CPU's on your PC
cpus = cpu_count()

# The Worker Function. Could also be modelled as a class
def Worker(q_jobs):
    while True:
        # Try / Catch / finally may be necessary for error-prone tasks since the processes 
        # may hang forever if the task_done() method is not called.
        try:
            # Get an item from the Queue
            item = q_jobs.get()

            # At this point the data should somehow be processed

        except:
            traceback.print_exc()
        else:
            pass

        finally:
            # Inform the Queue that the Task has been done
            # Without this. The processes can not be killed
            # and will be left as Zombies afterwards
            q_jobs.task_done()


# A Joinable Queue to end the process
q_jobs = JoinableQueue()

# Create process depending on the number of CPU's
for i in range(cpus):

    # target function and arguments
    # a list of multiple arguments should not end with ',' e.g.
    # (q_jobs, 'bla')
    p = Process(target=Worker,
                args=(q_jobs,)
                )
    p.daemon = True
    p.start()

# fill Queue with Jobs
q_jobs.put(['Do'])
q_jobs.put(['Something'])

# End Process
q_jobs.join()

Cheers

EDIT

I wrote this with Python 3 in mind. Taking the parenthesis from the print function

print item

should make this work for 2.7.

Nima Mousavi
  • 1,601
  • 2
  • 21
  • 30
  • the code as written may hang forever if processing of any item may lead to an error. You shouldn't print to the same place from multiple threads/processes without synchronization. Both issues could be avoided if a [simpler pool-based code is used](http://stackoverflow.com/a/14594205/4279): notice that the code captures and reports exceptions and it prints in the main thread only. – jfs Oct 29 '14 at 18:26
  • Would a try block with a finally task_done not be enough to remedy this? I can't say I see your point just yet. This worked fine for me until now. – Nima Mousavi Oct 29 '14 at 18:28
  • well I shall edit this the print won't fail though. Since the messages will be buffered thread safe I guess :P. Therefor the flush, since the prints may not be displayed. Well thanks for the update though. :) With this there shouldn't be any problem. – Nima Mousavi Oct 29 '14 at 18:41
  • reraising the exception may also lead to the eventual deadlock (as I understand the uncaught exceptions kill the thread). If you are reimplementing a thread pool; you need to care about such things (e.g., respawn the killed threads). btw. to reraise an exception inside the `except` block it is enough to write: `raise` (no parens, no argument). – jfs Oct 29 '14 at 19:15
  • Flushing the internal buffers helps only if you [print data that is less than `PIPE_BUF` in size](http://stackoverflow.com/questions/9743838/python-subprocess-in-parallel?#comment29770255_9745864). – jfs Oct 29 '14 at 19:26
  • Well yes! I thank you for your helpful comments. This is just an example with 2 Strings/Items in the Queue for (at least on my PC) 4 Process (CPU's). Turns out this is not enough to fill the Buffer. Furthermore the prints should be replaced with something useful like the work to be done. And logging Errors and stuff should be done well... with logging. So the use imports and prints should not be there in the Code. If you could now pls stop referencing yourself. Thank you very much. – Nima Mousavi Oct 29 '14 at 23:35
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/63900/discussion-between-nimi-and-j-f-sebastian). – Nima Mousavi Oct 30 '14 at 02:04
  • The code works. Exceptions should not prevent the process from stopping. The try catch block may not be necessary though. I like this version of multiprocessing best... exceptions would include working on shared memory. Guess I won't share next time since it did not seem to suit your tastes. – Nima Mousavi Oct 30 '14 at 02:27