2

I have four queues that each have multiple processes/threads that are interdependent in the following way:

  1. Queue 1 is reading a file from disk and copying to RAM
  2. Queue 2 takes the file in RAM and performs an operation on it
  3. Queue 3 takes the result of Queue 2 and performs a separate operation on it
  4. Queue 4 writes the end result back to disk

I would like these 4 queues to operate in parallel as much as possible with the caveat that Queue 2 has to wait for Queue 1 to place at least one process/thread on it (and similarly queue 2 has to place items on queue 3, and queue 3 on 4).

What is the best way in Python to go about implementing this (both for the queue and for the thread/process implementation)?

Will queue 2 and queue 3 block each other due to GIL if I use threads? I read that I/O and compute can still happen in parallel so I am ok even if Queue 1/2/4 can work in parallel, and queue 3 is sequential with queue 2.

user3487187
  • 89
  • 1
  • 7
  • My use case is kind of like the "Multiple queues data mining websites" one: https://www.ibm.com/developerworks/aix/library/au-threadingpython/ I am not sure how they make sure that DatamineThread will always have an item from the ThreadUrl queue available though? Maybe it is not an issue there, but what if there were cases where it is an issue? – user3487187 Dec 18 '14 at 00:04

1 Answers1

1

Is there any particular reason you actually need each of those 4 steps be separate threads/processes? Personally I'd just implement all 4 steps in one function/callable class, and then use multiprocessing.Pool's map to invoke the function in parallel over the filenames of interest.

Simpler example of this sort of pattern (just reading and processing) discussed in this Q&A. As the answer notes, if it appears to bottleneck on I/O rather than processing, just create more processes in the pool.

Community
  • 1
  • 1
timday
  • 24,582
  • 12
  • 83
  • 135
  • Thanks for that pointer. I need to iterate on the files in batches. During Queue 3 there is an operation which aggregates across the batches in a non-trivial manner, which requires some shared memory access and needs to be done sequentially. – user3487187 Dec 16 '14 at 23:20
  • IMHO multiprocessing is the way to go with Python; all the GIL worries evaporate, and if you end up needing to serialize too much between processes you can shift to memory mapped interprocess sharing instead. – timday Dec 17 '14 at 22:00
  • "memory mapped interprocess sharing instead": is there a reference for this? I looked at the example [here](http://www.reddit.com/r/Python/comments/j3qjb/parformatlabpool_replacement/) but it seems to restrict me to just one shared global variable. If I wanted to have many shared globals (say process 1-10 shared var a, proc 11-20 var b, etc..) is there a way to do that? Thanks! – user3487187 Dec 18 '14 at 06:47
  • For my purposes, numpy's memory mapping http://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html has always worked well for sharing data which would be inefficient to pickle. If you have more complicated stuff than arrays, http://blog.schmichael.com/2011/05/15/sharing-python-data-between-processes-using-mmap/ shows a more "hand rolled" approach. Note that if you just end up reinventing pickle via shared memory you might as well just use multiprocessing's own IPC; shared memory pays off when the shared data is fairly constant. – timday Dec 18 '14 at 13:41
  • Thanks. Is there an example of using numpy.memmap for sharing data with the function called in pool.map? Can I send the memmap array as an argument? Or are you suggesting loading the file as a memmap array in the called function? – user3487187 Dec 18 '14 at 22:48
  • 1
    I ended up using shmarray from [here:](https://bitbucket.org/cleemesser/numpy-sharedmem/src) – user3487187 Jan 02 '15 at 22:09