1

What I'm trying to do is running a list of prime number decomposition in different processes at once. I have a threaded version that's working, but can't seem to get it working with processes.

import math
from Queue import Queue
import multiprocessing

def primes2(n):
    primfac = []
    num = n
    d = 2
    while d * d <= n:
        while (n % d) == 0:
            primfac.append(d) # supposing you want multiple factors repeated
            n //= d
        d += 1
    if n > 1:
        primfac.append(n)
    myfile = open('processresults.txt', 'a')
    myfile.write(str(num) + ":" + str(primfac) + "\n")
    return primfac

def mp_factorizer(nums, nprocs):
    def worker(nums, out_q):
        """ The worker function, invoked in a process. 'nums' is a
            list of numbers to factor. The results are placed in
            a dictionary that's pushed to a queue.
        """
        outdict = {}
            for n in nums:
            outdict[n] = primes2(n)
        out_q.put(outdict)

    # Each process will get 'chunksize' nums and a queue to put his out
    # dict into
    out_q = Queue()
    chunksize = int(math.ceil(len(nums) / float(nprocs)))
    procs = []

    for i in range(nprocs):
        p = multiprocessing.Process(
                target=worker,
                args=(nums[chunksize * i:chunksize * (i + 1)],
                      out_q))
        procs.append(p)
        p.start()

    # Collect all results into a single result dict. We know how many dicts
    # with results to expect.
    resultdict = {}
    for i in range(nprocs):
        resultdict.update(out_q.get())

    # Wait for all worker processes to finish
    for p in procs:
        p.join()

    print resultdict

if __name__ == '__main__':

    mp_factorizer((400243534500, 100345345000, 600034522000, 9000045346435345000), 4)

I'm getting a pickle error shown below:

error image

Any help would be greatly appreciated :)

Ouroborus
  • 16,237
  • 4
  • 39
  • 62
Paul
  • 105
  • 1
  • 12

2 Answers2

2

You need to use multiprocessing.Queue instead of regular Queue. +more

This is due the Process doesn't run using the same memory space and there are some objects that aren't pickable, like the a regular queue (Queue.Queue). To overcome this, the multiprocessing library provide a Queue class that is actually a Proxy to a Queue.

And also, you could extract the def worker(.. out as any other method. This could be your main problem because on "how" a process is forked on a OS level.

You can also use a multiprocessing.Manager +more.

Community
  • 1
  • 1
Rafael Aguilar
  • 3,084
  • 1
  • 25
  • 31
  • I changed Queue() to multiprocessing.queue(), now getting a AttributeError: 'module' object has no attribute 'queue' error – Paul Jan 10 '17 at 19:15
  • Please check the documentation and the answer carefully, you are using `multiprocessing.queue` without caps, it's `multiprocessing.Queue` – Rafael Aguilar Jan 10 '17 at 19:18
  • Thank you! a combination of your solution and the one posted below by Tadhg fixed it. Thank you so much :) – Paul Jan 10 '17 at 19:21
2

dynamically created functions cannot be pickled and therefore cannot be used as the target of a Process, the function worker needs to be defined in the global scope instead of inside the definition of mp_factorizer.

Tadhg McDonald-Jensen
  • 20,699
  • 5
  • 35
  • 59