3

In Python 3.6, I am running multiple processes in parallel, where each process pings a URL and returns a Pandas dataframe. I want to keep running the (2+) processes continually, I have created a minimal representative example as below.

My questions are:

1) My understanding is that since I have different functions, I cannot use Pool.map_async() and its variants. Is that right? The only examples of these I have seen were repeating the same function, like on this answer.

2) What is the best practice to make this setup to run perpetually? In my code below, I use a while loop, which I suspect is not suited for this purpose.

3) Is the way I am using the Process and Manager optimal? I use multiprocessing.Manager.dict() as the shared dictionary to return the results form the processes. I saw in a comment on this answer that using a Queue here would make sense, however the Queue object has no `.dict()' method. So, I am not sure how that would work.

I would be grateful for any improvements and suggestions with example code.

import numpy as np
import pandas as pd
import multiprocessing
import time

def worker1(name, t , seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    time.sleep(t)
    np.random.seed(seed)
    df= pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

def worker2(name, t, seed, return_dict):
    '''worker function'''
    print(str(name) + 'is here.')
    np.random.seed(seed)
    time.sleep(t)
    df = pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

    return_dict[name] = [df.columns.tolist()] + df.values.tolist()

if __name__ == '__main__':
    t=1
    while True:

        start_time = time.time()
        manager = multiprocessing.Manager()
        parallel_dict = manager.dict()
        seed=np.random.randint(0,1000,1) # send seed to worker to return a diff df
        jobs = []
        p1 = multiprocessing.Process(target=worker1, args=('name1', t, seed, parallel_dict))
        p2 = multiprocessing.Process(target=worker2, args=('name2', t, seed+1, parallel_dict))
        jobs.append(p1)
        jobs.append(p2)
        p1.start()
        p2.start()
        for proc in jobs:
            proc.join()
        parallel_end_time = time.time() - start_time
        #print(parallel_dict)
        df1= pd.DataFrame(parallel_dict['name1'][1:],columns=parallel_dict['name1'][0])
        df2 = pd.DataFrame(parallel_dict['name2'][1:], columns=parallel_dict['name2'][0])
        merged_df = pd.concat([df1,df2], axis=0)
        print(merged_df)
Zhubarb
  • 11,432
  • 18
  • 75
  • 114

1 Answers1

6

Answer 1 (map on multiple functions)

You're technically right. With map, map_async and other variations, you should use a single function.

But this constraint can be bypassed by implementing an executor, and passing the function to execute as part of the parameters:

def dispatcher(args):
    return args[0](*args[1:])

So a minimum working example:

import multiprocessing as mp

def function_1(v):
    print("hi %s"%v)
    return 1
    
def function_2(v):
    print("by %s"%v)
    return 2

def dispatcher(args):
    return args[0](*args[1:])

with mp.Pool(2) as p:
    tasks = [
        (function_1, "A"),
        (function_2, "B")
    ]
    r = p.map_async(dispatcher, tasks)
    r.wait()
    results = r.get()

Answer 2 (Scheduling)

I would remove the while from the script and schedule a cron job (on GNU/Linux) (on windows) so that the OS will be responsible for it's execution.

On Linux you can run cronotab -e and add the following line to make the script run every 5 minutes.

*/5 * * * * python /path/to/script.py

Answer 3 (Shared Dictionary)

yes but no.

To my knowledge using the Manager for data such as collections is the best way. For Arrays or primitive types (int, floats, ecc) exists Value and Array which are faster.

As in the documentation

A manager object returned by Manager() controls a server process which holds > Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, > RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and > Array.

Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

But you have only to return a Dataframe, so the shared dictionary it's not needed.

Cleaned Code

Using all the previous ideas the code can be rewritten as:

map version

import numpy as np
import pandas as pd
from time import sleep
import multiprocessing as mp

def worker1(t , seed):
    print('worker1 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0,1000,8).reshape(2,4), columns=list('ABCD'))
     

def worker2(t , seed):
    print('worker2 is here.')
    sleep(t)
    np.random.seed(seed)
    return pd.DataFrame(np.random.randint(0, 1000, 12).reshape(3, 4), columns=list('ABCD'))

def dispatcher(args):
    return args[0](*args[1:])

def task_generator(sleep_time=1):
    seed = np.random.randint(0,1000,1)
    yield worker1, sleep_time, seed    
    yield worker2, sleep_time, seed + 1

with mp.Pool(2) as p:
    results = p.map(dispatcher, task_generator())
    merged = pd.concat(results, axis=0)
    print(merged)

If the process of concatenation of the Dataframe is the bottleneck, An approach with imap might become optimal.

imap version

with mp.Pool(2) as p:
    merged = pd.DataFrame()
    for result in p.imap_unordered(dispatcher, task_generator()):
        merged = pd.concat([merged,result], axis=0)
    print(merged)

The main difference is that in the map case, the program first wait for all the process tasks to end, and then concatenate all the Dataframes.

While in the imap_unoredered case, As soon as a task as ended, the Dataframe is concatenated ot the current results.

Community
  • 1
  • 1
Tommaso Fontana
  • 710
  • 3
  • 18
  • Thank you very much. Would I not have to use `map_async()` instead of `map` in order for worker1` and `worker2` to run in parallel? Is there a reason you are using `map()`? – Zhubarb Feb 01 '20 at 19:53
  • 1
    No problem! map distribute the tasks to all the processes on the Pool. So it is parallel! The difference is that map is blocking while map_async is non blocking. So with map you have to wait that all the task finished, while with map_async you will get some results objects and then you'll have to call result.wait() and result.get(). – Tommaso Fontana Feb 02 '20 at 16:05