1

I'm fairly new to multiprocessing and I have written the script below, but the methods are not getting called. I dont understand what I'm missing.

What I want to do is the following:

  1. call two different methods asynchronously.
  2. call one method before the other.

        # import all necessary modules
        import Queue
        import logging
        import multiprocessing
        import time, sys
        import signal
    
        debug = True
    
        def init_worker():
            signal.signal(signal.SIGINT, signal.SIG_IGN)
    
        research_name_id = {}
        ids = [55, 125, 428, 429, 430, 895, 572, 126, 833, 502, 404]
        # declare all the static variables
        num_threads = 2  # number of parallel threads
    
        minDelay = 3  # minimum delay 
        maxDelay = 7  # maximum delay 
    
        # declare an empty queue which will hold the publication ids
        queue = Queue.Queue(0)
    
    
        proxies = []
        #print (proxies)
    
        def split(a, n):
            """Function to split data evenly among threads"""
            k, m = len(a) / n, len(a) % n
            return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)]
                    for i in xrange(n))
        def run_worker(
                i,
                data,
                queue,
                research_name_id,
                proxies,
                debug,
                minDelay,
                maxDelay):
            """ Function to pull out all publication links from nist
            data - research ids pulled using a different script
            queue  -  add the publication urls to the list
            research_name_id - dictionary with research id as key and name as value
            proxies - scraped proxies
            """
            print 'getLinks', i
            for d in data:
                print d
                queue.put(d)
    
    
    
    
        def fun_worker(i, queue, proxies, debug, minDelay, maxDelay):
            print 'publicationData', i
            try:
                print queue.pop()
            except:
                pass
    
    
    
    
        def main():
            print "Initializing workers"
            pool = multiprocessing.Pool(num_threads, init_worker)
            distributed_ids = list(split(list(ids), num_threads))
            for i in range(num_threads):
                data_thread = distributed_ids[i]
                print data_thread
                pool.apply_async(run_worker, args=(i + 1,
                        data_thread,
                        queue,
                        research_name_id,
                        proxies,
                        debug,
                        minDelay,
                        maxDelay,
                    ))
    
                pool.apply_async(fun_worker,
                    args=(
                        i + 1,
                        queue,
                        proxies,
                        debug,
                        minDelay,
                        maxDelay,
                    ))
    
            try:
                print "Waiting 10 seconds"
                time.sleep(10)
    
            except KeyboardInterrupt:
                print "Caught KeyboardInterrupt, terminating workers"
                pool.terminate()
                pool.join()
    
            else:
                print "Quitting normally"
                pool.close()
                pool.join()
    
        if __name__ == "__main__":
            main()
    

The only output that I get is

      Initializing workers
      [55, 125, 428, 429, 430, 895]
      [572, 126, 833, 502, 404]
      Waiting 10 seconds
      Quitting normally
nEO
  • 5,305
  • 3
  • 21
  • 25

1 Answers1

0

There are a couple of issues:

  1. You're not using multiprocessing.Queue
  2. If you want to share a queue with a subprocess via apply_async etc, you need to use a manager (see example).

However, you should take a step back and ask yourself what you are trying to do. Is apply_async is really the way to go? You have a list of items that you want to map over repeatedly, applying some long-running transformations that are compute intensive (because if they're just blocking on I/O, you might as well use threads). It seems to me that imap_unordered is actually what you want:

pool = multiprocessing.Pool(num_threads, init_worker)
links = pool.imap_unordered(run_worker1, ids)
output = pool.imap_unordered(fun_worker1, links)

run_worker1 and fun_worker1 need to be modified to take a single argument. If you need to share other data, then you should pass it in the initializer instead of passing it to the subprocesses over and over again.

Community
  • 1
  • 1
Torsten Marek
  • 83,780
  • 21
  • 91
  • 98
  • Thanks for your comment. I also want to start multiple processes. is apply_async the right way to do that? I will read more on impa_unordered – nEO May 15 '16 at 06:29