2

My simplified problem

I have created a function that returns the average product after breaking up a list of numbers into 3 distinct lists.

For example:

Input array 'arr' = [1,2,3,4,5,6,7,8,9]

Example partition: [1,5,6],[2,3,9],[4,7,8]

Example objective: mean([1 x 5 x 6],[2 x 3 x 9],[4 x 7 x 8]) = 102.67

My goal - make workers compete for the best solution and communicate

I am now trying to run this function in parallel (just 2 workers for now), so that after every 10 seconds the workers share their partition (with the highest objective) with each other and use it as the starting point for the next 10 seconds, and so on until the optimal result improves over time. This best result will be passed into the compute function as update_partition.

I am not sure of how to make workers communicate their results, so would appreciate some help on this.

As I am new to multiprocessing, I would also appreciate any advice at all to improve my solution - e.g. using a queue, manager, pool etc.

My attempt - excluding communication

# Competing and communicating workers

from multiprocessing import Process
import random
import numpy as np
import sys

# Sub functions used in the compute function
def partition(arr, n):
    random.shuffle(arr)
    return [np.array(arr[i::n]) for i in range(n)]

def average(partitionList):
    return np.mean([np.prod(i) for i in partitionList]), partitionList

def swap(A,B,i,j):
    b_temp = B[j].copy()
    B[j] = A[i]
    A[i] = b_temp
    return A,B

# Main function - this just shuffles one element from each group of the array at a time to try and maximise the objective
def compute(message,arr,r,update_partition = 'Default'):

    if update_partition != 'Default':
        current_partition = update_partition
    else:    
        current_partition = partition(arr, r)
        
    current_partition = partition(arr, r)
    obj_prev = average(current_partition)[0]
    print('\n [%s] Initial objective: %.2f | Arrays: %s' % (message,obj_prev,current_partition))

    while True:
        for i in range(3):
            randPosOne = np.random.randint(3)
            randPosTwo = np.random.randint(3)

            if i != 2:
                swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
            else:
                swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)

            obj = average(current_partition)[0]

            if obj > obj_prev:
                obj_prev = obj
                store = average(current_partition)[1]
                print('\n [%s] Current objective: %.2f | Arrays: %s' % (message,obj,store))

            else:
                obj = obj_prev
                if i != 2:
                    swap(current_partition[i],current_partition[i+1],randPosOne,randPosTwo)
                else:
                    swap(current_partition[i-2],current_partition[i],randPosOne,randPosTwo)
                    

if __name__ == '__main__':
    
    # This is just an arbitray array of random numbers used as an input
    arr = random.sample(range(10, 50), 12)
    
    # This represents how many groups we would like to make out of the arr list
    r = 3 #int(sys.argv[1])
    
    first = Process(target=compute, args=("Worker 1", arr,r))
    first.start()
    second = Process(target=compute, args=("Worker 2", arr,r))
    second.start()


Jwem93
  • 243
  • 1
  • 7
  • 1
    This might help. https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce – Abhishek Rai Nov 22 '20 at 08:48
  • Entering a domain of cooperating-, or partially-cooperating agents is by far not a goal to add a few SLOCs, the less to build on a built-in multiprocessing.Queue template. If indeed interested in going in this direction, perhaps review the Pyomo topologies available & propose library extension to add another layer of informal communication, above the Pyomo topologies, that would permit agents to adapt their behaviour independently of the working evolutionary-topology. Any tool fits that purpose. ZeroMQ / nanomsg are smart-enough, lightweight & reasonably fast / low-latency candidates for this – user3666197 Nov 22 '20 at 10:18
  • Thanks Abrar, that is helpful, I will try and explore how I can adapt the first solution there so that I have a results cache with the best outcomes from each worker – Jwem93 Nov 22 '20 at 22:50
  • 1
    Hi @user3666197. You are right, as my solution is not the best way to approach such a task. The main objective is for me to learn how workers could communicate their best results, irrespective of whether it is the best approach for this particular problem (which is contrived anyway) – Jwem93 Nov 22 '20 at 22:52

1 Answers1

1

This is not necessarily going to satisfy you because this solution is not about the multiple processes communicating with one another to solve the problem. But then I don't believe that the best approach to solving the problem requires that they do.

My first observation is that using a random shuffle to generate the partitions is less than ideal since it will generate partitions that are essentially identical except for the order of the elements within the partition and thus giving rise to the same products and mean. The code below generates distinct, lexically ordered partitions and uses a process pool of arbitrary size to compute the mean for each partition. So, you can use as many processes you want (up to the number of processors you have) to solve the problem). For an array size of 9 elements there are only 280 possible ways of partitioning the elements into 3 tuples of 3 elements each. But this number grows rapidly as the number of elements increases. For an array size of 12 elements (3 tuples of 4 elements each) the number of partitions becomes 5775. The tradeoff is that the function generate_tuples is more costly (due to the sorting it does) in its effort to eliminate redundant partitions.

The following code finds the partitioning that produces the maximum mean:

from itertools import permutations
import random
import multiprocessing
from statistics import mean
from math import prod

def generate_tuples(arr):
    slice_size = len(arr) // 3
    s = set()
    cnt = 0
    for p in permutations(arr):
        t = tuple(sorted([tuple(sorted(p[0:slice_size])), tuple(sorted(p[slice_size:slice_size*2])), tuple(sorted(p[slice_size*2:slice_size*3]))]))
        if t not in s:
            yield t
            s.add(t)
            cnt += 1
    print('Total partitions = ', cnt)



def compute(t):
    return t, mean(prod(x) for x in t)


def main():
    with multiprocessing.Pool(6) as pool:
        arr = random.sample(range(10, 50), 12) # count should be divisible by 3
        print('arr =', arr)
        # chunksize should be approximately: size_of_iterable / (pool_size * 4):
        results = pool.imap(compute, generate_tuples(arr), chunksize=241)
        max_t = None
        max_mean = 0
        for t, m in results:
            if m > max_mean:
                max_mean = m
                max_t = t
        print(max_t, max_mean)


if __name__ == '__main__':
    main()

Prints:

arr = [25, 37, 38, 11, 44, 24, 36, 35, 26, 23, 49, 10]
Total partitions =  5775
((10, 11, 23, 24), (25, 26, 35, 36), (37, 38, 44, 49)) 1303685.3333333333

Update

The following might be useful information for when you attempt to use multiprocessing.

The first approach uses a managed shared list. The advantage of this managed list is that access is automatically serialized so processes accessing the list, depending on the complexity of operations being performed, do not have to explicitly perform locking. And rather than passing the shared list instance as an argument to your worker function(s), it is often more convenient to initialize each process once by assigning the shared list to a global when the process pool is created:

import multiprocessing

def pool_initializer(the_list):
    global arr

    arr = the_list


def reverse():
    arr = arr[::-1]


if __name__ == __main__: # required for Windows
    with multiprocessing.Manger() as manager:
        arr = manager.list(random.sample(range(10, 50), 12))
        with Pool(initializer=pool_initializer, initargs=(arr,) as pool:
            pool.apply(reverse)
        print(arr)

The downside is that arr is actually a proxy to the actual shared memory and so access can be slower than using the second option, which is using a muliprocessing.Array. As long no two processes attempt to modify the same element, then you do not have to worry about locking. Otherwise, you will have to create a sharable Lock instance and serialize access to the array when necessary. See [https://stackoverflow.com/questions/39122270/multiprocessing-shared-array].

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thanks for your help Booboo. The idea with the partitioning is that it is done once at the beginning to create different starting points for each worker, while the main part of the code that improves the solution is the swapping of elements between each group of the partition. This is a contrived example, so apologies if it is not the clearest. My main goal is to learn how workers can communicate to find the best solution to prevent them from going in a bad direction. – Jwem93 Nov 22 '20 at 22:48
  • I figured. As it turns out, I doubt that for this problem that my solution even profits by using multiprocessing. That is, it probably runs just as fast by iterating the `generate_tuples` generator and then calling `compute` directly (or, better yet, inlining the function). – Booboo Nov 22 '20 at 23:48
  • 1
    I've added some info to the answer that might be useful. – Booboo Nov 23 '20 at 00:30
  • Thanks @Booboo, this is helpful. I was actually looking at something similar (i.e. regarding using a managed list) – Jwem93 Nov 24 '20 at 00:58