3

I'm trying to reduce the processing time of reading a database with roughly 100,000 entries, but I need them to be formatted a specific way, in an attempt to do this, I tried to use python's multiprocessing.map function which works perfectly except that I can't seem to get any form of queue reference to work across them.

I've been using information from Filling a queue and managing multiprocessing in python to guide me for using queues across multiple processes, and Using a global variable with a thread to guide me for using global variables across threads. I've gotten the software to work, but when I check the list/queue/dict/map length after running the process, it always returns zero

I've written a simple example to show what I mean: You have to run the script as a file, the map's initialize function does not work from the interpreter.

from multiprocessing import Pool
from collections import deque

global_q = deque()

def my_init(q):
    global global_q
    global_q = q
    q.append("Hello world")


def map_fn(i):
    global global_q
    global_q.append(i)


if __name__ == "__main__":
    with Pool(3, my_init, (global_q,)) as pool:
        pool.map(map_fn, range(3))
    for p in range(len(global_q)):
        print(global_q.pop())

Theoretically, when I pass the queue object reference from the main thread to the worker threads using the pool function, and then initialize that thread's global variables using with the given function, then when I insert elements into the queue from the map function later, that object reference should still be pointing to the original queue object reference (long story short, everything should end up in the same queue, because they all point to the same location in memory).

So, I expect:

Hello World
Hello World
Hello World
1
2
3

of course, the 1, 2, 3's are in arbitrary order, but what you'll see on the output is ''.

How come when I pass object references to the pool function, nothing happens?

martineau
  • 119,623
  • 25
  • 170
  • 301
iggy12345
  • 1,233
  • 12
  • 31
  • 1
    A `deque` isn't the same thing as a `Queue`, and multiple processes aren't the same thing as multiple threads — you know this, right? Processes each run in their own "memory space" making it impossible for them to directly share global variables. It's possible to do so indirectly for _some_ types using a [`multiprocessing.managers.SyncManager`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.managers.SyncManager) to create proxies that will allow the types documented to be shared. Unfortunately `deque` isn't among them—but you could probably implement one yourself. – martineau Feb 04 '19 at 18:30
  • This is the concept I was talking about, or hoping to show when talking about initializing global variables on different threads, which I only care about doing because I've seen that each process in python has it's own environment/interpreter, so theoretically I figured if I pass in a reference to the original object to the new interpreter, then I should be able to reinitialize its global variables to the old ones, at least that's what I'm guessing – iggy12345 Feb 04 '19 at 22:09
  • 1
    iggy12345: I can understand how you might have gotten that impression, but the fact is you can't pass references to other processes because the object is in another memory space that they can't access. See the answer I just posted which shows how to accomplish something like what you want using a multiprocessing `Manager`. – martineau Feb 04 '19 at 22:57

2 Answers2

4

Here's an example of how to share something between processes by extending the multiprocessing.managers.BaseManager class to support deques.

There's a Customized managers section in the documentation about creating them.

import collections
from multiprocessing import Pool
from multiprocessing.managers import BaseManager


class DequeManager(BaseManager):
    pass

class DequeProxy(object):
    def __init__(self, *args):
        self.deque = collections.deque(*args)
    def __len__(self):
        return self.deque.__len__()
    def appendleft(self, x):
        self.deque.appendleft(x)
    def append(self, x):
        self.deque.append(x)
    def pop(self):
        return self.deque.pop()
    def popleft(self):
        return self.deque.popleft()

# Currently only exposes a subset of deque's methods.
DequeManager.register('DequeProxy', DequeProxy,
                      exposed=['__len__', 'append', 'appendleft',
                               'pop', 'popleft'])


process_shared_deque = None  # Global only within each process.

def my_init(q):
    """ Initialize module-level global. """
    global process_shared_deque
    process_shared_deque = q
    q.append("Hello world")


def map_fn(i):
    process_shared_deque.append(i)  # deque's don't have a "put()" method.


if __name__ == "__main__":
    manager = DequeManager()
    manager.start()
    shared_deque = manager.DequeProxy()

    with Pool(3, my_init, (shared_deque,)) as pool:
        pool.map(map_fn, range(3))

    for p in range(len(shared_deque)):  # Show left-to-right contents.
        print(shared_deque.popleft())

Output:

Hello world
0
1
2
Hello world
Hello world
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Is there any reason not to just make `DequeProxy` inherit from `deque` ie `class DequeProxy(deque):` and then use `DequeManager.register('DequeProxy', DequeProxy, exposed=dir(DequeProxy))`, I'm just curious? – iggy12345 Feb 05 '19 at 02:48
  • You cannot just use `dir` to register the methods, for some reason `register` does not support all methods, but this answer solved my problem and is some 10x faster than the other, thank you! – iggy12345 Feb 05 '19 at 04:02
  • iggy12345 Regarding your first comment, don't know. I used what is shown in the documentation and the way proxy types are implemented in the `managers.SyncManager` class. As for using `dir`, if you look at source file for [`multiprocessing.managers`](https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/managers.py) around line 103, there's a couple of utility functions you might find interesting: `all_methods()` and `public_methods()`. I considered using them in my answer, but neither did quite what I wanted—because well, you know, "Simple is better than complex"... `;¬)` – martineau Feb 05 '19 at 05:19
  • 1
    I would like to see timing of both solutions resulting this one in being 10x faster. Maybe its time to write to multiprocesing developers that their standard solutions sucks even for easy problems and that poor comunity needs to hack the lib in order to pass objects between processes and also use such save statements like global – Martin Feb 05 '19 at 07:36
  • I will try and get you timings later today – iggy12345 Feb 05 '19 at 14:47
  • iggy12345: Seems to me like `dir()` _could_ be used to register the methods. How were you trying to use it? – martineau Feb 05 '19 at 20:32
  • Okay, I finished the timing, here is an image of the results and a link to the source code for it [link](https://imgur.com/a/GRvPh7G) all-in-all manually making the `Processe`s took 10 seconds longer than using `Pool` and the `map` function, I apologize, it's not 10x faster, the motion of the progress bar in the second one fooled me – iggy12345 Feb 05 '19 at 21:48
  • Yeah, a 12.7% improvement seems more believable. Multiprocessing, esp with shared data, adds a fair amount of overhead all by itself. Sometimes using it is even slower than not. Mutlithreading entails its own overhead, but may be faster—but not if the processing being done is compute-bound. – martineau Feb 05 '19 at 23:10
1

You cant use global variable for multiprocesing.

Pass to the function multiprocessing queue.

from multiprocessing import Queue
queue= Queue() 

def worker(q):
    q.put(something)

Also you are propably experiencing that the code is allright, but as the pool create separate processes, even the errors are separeted and therefore you dont see the code not only isnt working, but that it throws error.

The reason why your output is '', is because nothing was appended to your q/global_q. And if it was appended, then only some variable, that may be called global_q, but its totally different one than your global_q in your main thread

Try to print('Hello world') inside the function you want to multiprocess and you will see by yourself, that nothing is actually printed at all. That processes is simply outside of your main thread and the only way to access that process is by multiprocessing Queues. You access the Queue by queue.put('something') and something = queue.get()

Try to understand this code and you will do well:

import multiprocessing as mp

shared_queue = mp.Queue() # This will be shared among all procesess, but you need to pass the queue as an argument in the process. You CANNOT use it as global variable. Understand that the functions kind of run in total different processes and nothing can really access them... Except multiprocessing.Queue - that can be shared across all processes.


def channel(que,channel_num):
    que.put(channel_num)

if __name__ == '__main__':
    processes = [mp.Process(target=channel, args=(shared_queue, channel_num)) for channel_num in range(8)]

    for p in processes:
        p.start()


    for p in processes: # wait for all results to close the pool
        p.join()

    for i in range(8): # Get data from Queue. (you can get data out of it at any time actually)
        print(shared_queue.get())
Martin
  • 3,333
  • 2
  • 18
  • 39
  • A multiprocessing queue isn't quite the same as a deque, which is what the OP is actually trying to share. Also, you've got some extremely long one line comments in your code, which isn't very readable in my opinion. – martineau Feb 04 '19 at 08:25
  • @martineau Are you telling me I should advice him to pour gasoline in a car that needs diesel? There is absolutetly no reason why to use deque. And I am not going hack multiprocessing library just so it fits 'deque', which if you payed attention, he uses only because of not really understanding multiprocessing and threading – Martin Feb 04 '19 at 09:14
  • I'm telling you the OP's question is about how to share a **`deque`**, not a `Queue`. Admittedly, the question's title is a bit misleading in that regard, but it's tagged "deque" and that is what is being used in the sample code it contains. – martineau Feb 04 '19 at 16:29
  • @martineau question is not about deque but about passing objects across processes. You need to read between lines. BTW you can't use deque in his case anyway. – Martin Feb 04 '19 at 17:14
  • @martineau in the sample code is also used ridiculous use of global statement. Do I have to fit in his sample code to provide answer? I wish for him best answer while you are being pedantic. Why don't you post answer too? – Martin Feb 04 '19 at 17:16
  • Martin: All right, I've posted [something](https://stackoverflow.com/a/54525452/355230) I think answers the question asked. – martineau Feb 04 '19 at 22:50
  • @martineau why didnt you posted it in the beginning? Dont forget Python Zen: "Simple is better than complex." I hope you will be able to provide new answer to the OP when your example crashes at something more complicated since deque is just not threadsafe and is not built for multiprocessing – Martin Feb 04 '19 at 23:20
  • Martin: I didn't post an answer before now because—until recently—I felt the OP's question was somewhat confusing and wasn't sure I understood it, but now that's changed. BTW, `deque`s **are** thread-safe according to the [documentation](https://docs.python.org/3/library/collections.html#collections.deque) for them that says "Deques support thread-safe, memory efficient appends and pops from either side of the deque with approximately the same O(1) performance in either direction". – martineau Feb 04 '19 at 23:33
  • Also "Simple is better than complex" is worthless if simple isn't what you want/need. – martineau Feb 04 '19 at 23:43
  • While this code works, I'm trying to stay away from starting individual processes, and stick to using `pool.map` because of its efficiency for allocating threads, and more efficient resource utilization – iggy12345 Feb 05 '19 at 04:05
  • @martineau They are threadsafe in single process, but not in multiprocessing. Thats why you hardly ever see deque in multiprocessing and thats propably why they are not even supported. iggy12345: Its interesting you cant pull together code in multiprocessing, but you know how much resources each multiprocessing method takes. And yet you still dont understand that 'map' doesnt spawn threads but processes. Threading and Processing is different thing. Well... I will leave this topic be. Next time I will be more careful with answers – Martin Feb 05 '19 at 08:29
  • @iggy12345 Read this stackoverflow topic on how much fast and natural is Deque in relation to Queue -> https://stackoverflow.com/questions/27345332/process-vs-thread-with-regards-to-using-queue-deque-and-class-variable-for – Martin Feb 05 '19 at 08:32