0

I am trying to write a script where an asset is to be manupulated twice with the second manupulation building upon the first one. Assuming that the first one takes significantly longer than the second one, I was thinking that the best way to do it would be to have multiple workers on the first and a single on the second. I am having issues coding that however. I setup an example to demostrate.

In this example, the asset is a list of strings. The first manupulation is calculating the hash values of the strings and the second one is summing the digits of the calculated hashses. The runtimes are artificially tweaked to create the described effect with time.sleep. To make the manupulations clear, here are the steps for a single string:

>>> s = 'foobar'
>>> h = hash(s)
>>> h
5857481616689290475
>>> n = sum(int(v) for v in str(abs(h)))
>>> n
101

I have managed to get it to work with 1 process handling each manupulation (see below) but I want to have multiple ones on the first.

import multiprocessing as mp
from time import sleep
from random import random


def hasher(q, l, words):
    for word in words:
        l.acquire()
        h = hash(word)
        print('hash of {} is {}'.format(word, h))
        l.release()
        sleep(1.0 * random())
        q.put(h)
    q.put('END')


def summer(q, l):
    while True:
        data = q.get()
        sleep(0.1 * random())
        if data == 'END':
            break
        else:
            l.acquire()
            print('sum of {} is {}'.format(data, sum(int(x) for x in str(abs(data)))))
            l.release()


if __name__ == '__main__':
    queue = mp.Queue()
    lock = mp.Lock()
    words = ['fwiimaafqa', 'nuwivfmgdc', 'foymwgcbut', 'sefmayofio', 'crbgzpihpa',
             'xsioddsfyw', 'zbefmckkyi', 'vkxymewyvt', 'ryrvrfkjqf', 'zobdvstxfh']

    bots = []
    for _ in range(1):
        bot = mp.Process(target=hasher, args=(queue, lock, words))
        bot.start()
        bots.append(bot)


    bot2 = mp.Process(target=summer, args=(queue, lock))
    bot2.start()

    for bot in bots:
        bot.join()
    bot2.join()

can someone help me start more hashers in parallel?

P.S I also tried with Pool but there, I could not get the summer to work in parallel..

Ma0
  • 15,057
  • 4
  • 35
  • 65
  • Can you clarify what is your *specific problem* that you need help with? You already start process(es) in a loop, what issue do you have starting 2, 3, 4, ... instead of 1? Is your issue chunking up ``words`` so that each process works only on a subset? Do you have technical issues, i.e. is there an error with multiple processes? – MisterMiyagi Dec 30 '20 at 12:20
  • When I change `for _ in range(1):` to a bigger value so that more processes are spawned, the `list` is not chunked as desired. Instead, all spawned processes, go through the entire list. – Ma0 Dec 30 '20 at 12:26
  • The `hasher` function should work on a ***single*** word, not the whole list... The looping on `words` should be done in the main process not the spawned ones. You should handle this in main and pass to each worker a word. This would be done much easier with a `Pool` – Tomerikoo Dec 30 '20 at 12:37
  • @Tomerikoo these are also my thoughts and I tried them out to the best of my ability but could not get it to work as intended. If you manage to crack it, please share your code in an answer. – Ma0 Dec 30 '20 at 12:44
  • Does this answer your question? [Filling a queue and managing multiprocessing in python](https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python) – Tomerikoo Dec 30 '20 at 13:31

4 Answers4

3

If you intend to do it linearly, meaning first hash'em then sum'em, then a Pool will reduce and simplify much of your code. For starters, there is no need for queues and locks. Using the map method, you get in return a list of results so you can create a chain of hashing and summing:

from multiprocessing import Pool
from time import sleep

def hasher(word):
    h = hash(word)
    print('hash of {} is {}'.format(word, h))
    return h

def summer(h):
    s = sum(int(x) for x in str(abs(h)))
    print('sum of {} is {}'.format(h, s))
    return s

if __name__ == '__main__':
    words = ['fwiimaafqa', 'nuwivfmgdc', 'foymwgcbut', 'sefmayofio', 'crbgzpihpa',
             'xsioddsfyw', 'zbefmckkyi', 'vkxymewyvt', 'ryrvrfkjqf', 'zobdvstxfh']

    with Pool() as pool:
        hashes = pool.map(hasher, words)
        sums = pool.map(summer, hashes)
  • You can customize the amount of processes in the Pool with the processes argument which defaults to cpu_count().

  • The map divides the input list to chunks which can be controlled using the chunks argument. Note that this is an approximation and I would leave it to the Pool to do what it sees is best.

Tomerikoo
  • 18,379
  • 16
  • 47
  • 61
  • the original code is not linear. Running and looking at the print statements reveals that the hashing and summing are done in parallel (ofc the summing operates on the hashed values but not everything is hashed before the summing begins). – Ma0 Dec 30 '20 at 13:12
  • @Ma0 well in that case isn't it possible to just combine the functions? Meaning, the `Pool` will work in parallel on each word, hashing it and summing it – Tomerikoo Dec 30 '20 at 13:16
  • Oh I believe I misunderstood the question... In that case it seems the other answer should be fine for your needs :) – Tomerikoo Dec 30 '20 at 13:19
  • With my approach, hashing and summing are performed in parallel but somehow I cannot get more than one hashers. With the pool approach, you can have multiple hashers and\or summers in parallel, but the two groups are serial to one another. I want the combination of the two. Multiple hashers running concurently and in parallel to the summers. I just cannot seem to be able to get this to work. – Ma0 Dec 30 '20 at 13:20
  • Yes I understood that now... You have several hashers hashing into the queue and one summer constantly checking if there is something to work on. That is not what my solution is offering unfortunately as you noticed it will first hash'em all and them sum'em all... I will leave my answer if it might be useful to someone coming here with similar needs that this solves... – Tomerikoo Dec 30 '20 at 13:24
2

This solution uses a process pool but overlaps the summer processing with the hasher processing so that as soon as a hash becomes available it is submitted to a summer process for summation rather than waiting for all of the hashing to completed. I have modified the summer worker to return the hash argument and sum result back to the main process for printing.

import multiprocessing as mp
from time import sleep
from random import random


def hasher(word):
    h = hash(word)
    print('hash of {} is {}'.format(word, h))
    sleep(1.0 * random())
    return h


def summer(h):
    sleep(0.1 * random())
    s = sum(int(x) for x in str(abs(h)))
    return h, s



def compute_chunksize(number_of_jobs, pool_size):
    chunksize, remainder = divmod(number_of_jobs, pool_size * 4)
    if remainder != 0:
        chunksize += 1
    return chunksize


if __name__ == '__main__':
    N_CPU = mp.cpu_count()
    words = ['fwiimaafqa', 'nuwivfmgdc', 'foymwgcbut', 'sefmayofio', 'crbgzpihpa',
        'xsioddsfyw', 'zbefmckkyi', 'vkxymewyvt', 'ryrvrfkjqf', 'zobdvstxfh']
    with mp.Pool(N_CPU) as pool:
        chunksize = compute_chunksize(len(words), N_CPU)
        results = [pool.apply_async(summer, (h,)) for h in pool.imap(hasher, words, chunksize)]
        for result in results:
            h, s = result.get()
            print('sum of {} is {}'.format(h, s))

Prints:

hash of fwiimaafqa is 2628677472122653395
hash of nuwivfmgdc is 9063591039750236764
hash of foymwgcbut is -6974737365015421151
hash of sefmayofio is 5524859369098572411
hash of crbgzpihpa is -3068552401416065138
hash of xsioddsfyw is 1721137744256960524
hash of zbefmckkyi is -3131190058570978937
hash of vkxymewyvt is -3144743710838890191
hash of ryrvrfkjqf is -5704469091812886058
hash of zobdvstxfh is 5337176127259296686
sum of 2628677472122653395 is 87
sum of 9063591039750236764 is 85
sum of -6974737365015421151 is 77
sum of 5524859369098572411 is 93
sum of -3068552401416065138 is 68
sum of 1721137744256960524 is 76
sum of -3131190058570978937 is 86
sum of -3144743710838890191 is 81
sum of -5704469091812886058 is 91
sum of 5337176127259296686 is 95

Explanation

The call imap takes an iterable (in this case, words) and submit each item of the iterable to the pool as a job. It returns an iterable that when iterated is the next result. imap_unordered is the same except the results are returned in arbitrary order, not necessarily in the order of submission.

Since the hash values are being returned one by one when we iterate the call to imap_unordered, we then use method apply_async to submit the jobs to compute the hash sum. This is a non-blocking job submission request that submits job one-by-one and returns an AsyncResult class instance whose blocking get method can be called to get the result of the job submission. So we are essentially using a list comprehension to submit all the summer jobs and create a list of AsyncResult objects from which we get the results from in submission order). We could have also specified with apply_async a callback function that would be passed the result as soon as it becomes available.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Nice! I liked the use of `imap` to make it non-blocking and pass the result immediately – Tomerikoo Dec 30 '20 at 13:41
  • @Tomerikoo Of course, one might be processor-limited anyway. – Booboo Dec 30 '20 at 13:44
  • I've modified the code to use a list comprehension rather than a generator expression, which may not be as memory efficient if you have a large word list but will queue the hash jobs more efficiently. – Booboo Dec 30 '20 at 13:58
  • thanks a lot for the answer Booboo. I do have to admit however that I am having a difficult time understandig how the hashing and summing are parallel here but I guess I need to do some reading on the `apply_async`. – Ma0 Dec 30 '20 at 14:31
  • I have added an explanation to the answer. – Booboo Dec 30 '20 at 14:52
1

Expanding on the idea of chunking the asset by @Wuerfelfreak, here is my take:

import multiprocessing as mp
from time import sleep
from random import random
from os import getpid
from collections import defaultdict


def chop(iterable, n):
    chunks = defaultdict(list)
    for i, item in enumerate(iterable):
        chunks[(i % n)].append(item)
    return chunks.values()
    

def hasher(q, l, words):
    for word in words:
        if word is not None:
            h = hash(word)
            pid = getpid()
            l.acquire()
            # lock is only to prevent interlaced printing; there should be no computations done here
            print('hash of {} is {} (performed by {})'.format(word, h, pid))
            l.release()
            sleep(1.0 * random())
            q.put(h)


def summer(q, l):
    while True:
        data = q.get()
        if data == 'END': # signal to terminate
            return
        sleep(0.1 * random())
        s = sum(int(x) for x in str(abs(data)))
        pid = getpid()
        # lock is only to prevent interlaced printing; there should be no computations done here
        l.acquire()
        print('sum of {} is {} (performed by {})'.format(data, s, pid))
        l.release()


if __name__ == '__main__':
    queue = mp.Queue()
    lock = mp.Lock()
    words = ['fwiimaafqa', 'nuwivfmgdc', 'foymwgcbut', 'sefmayofio', 'crbgzpihpa',
             'xsioddsfyw', 'zbefmckkyi', 'vkxymewyvt', 'ryrvrfkjqf', 'zobdvstxfh']

    bots = []
    n = 3
    for chunk in chop(words, n):
        bot = mp.Process(target=hasher, args=(queue, lock, chunk))
        bot.start()
        bots.append(bot)

    bot2 = mp.Process(target=summer, args=(queue, lock))
    bot2.start()

    for bot in bots:
        bot.join()
    queue.put('END')  # signal bot2 to terminate
    bot2.join()

This fulfills all requirements since, there are multiple hashers working in parallel and the summing does not wait for all the hashing to be concluded. This is also evident by the print statements:

hash of fwiimaafqa is 873046927385382714 (performed by 22484)
hash of crbgzpihpa is 1605709352436415502 (performed by 4068)
hash of ryrvrfkjqf is -1887526129446447819 (performed by 16488)
hash of zobdvstxfh is 1051323868469477168 (performed by 16488)
sum of -1887526129446447819 is 96 (performed by 21424)
hash of xsioddsfyw is -2508948279346433147 (performed by 4068)
hash of nuwivfmgdc is 2212724786813710734 (performed by 22484)
sum of 873046927385382714 is 87 (performed by 21424)
sum of 1605709352436415502 is 68 (performed by 21424)
sum of 1051323868469477168 is 89 (performed by 21424)
hash of zbefmckkyi is -1045565728264456870 (performed by 4068)
hash of vkxymewyvt is -2286350865213111794 (performed by 4068)
sum of -2508948279346433147 is 89 (performed by 21424)
sum of -1045565728264456870 is 85 (performed by 21424)
sum of -2286350865213111794 is 74 (performed by 21424)
hash of foymwgcbut is -7897655598357124743 (performed by 22484)
sum of 2212724786813710734 is 75 (performed by 21424)
hash of sefmayofio is -8074280815242974134 (performed by 22484)
sum of -7897655598357124743 is 105 (performed by 21424)
sum of -8074280815242974134 is 79 (performed by 21424)

Note: The use of the lock is only to prevent the interleaving of printed output from multiple processes printing in parallel and is not required if you are not printing from these worker functions.

Ma0
  • 15,057
  • 4
  • 35
  • 65
  • It seems that you are using a lock just so that the printing is orderly. If that is the case 1. You might wish to make that clear so that in production code it wouldn't necessarily be used and 2. You should remove the call to `hash` from this serialization or else you are defeating the whole purpose of multiprocessing. – Booboo Dec 30 '20 at 15:01
  • The lock is indeed only for the printing there. As for your 2. point, could you elaborate? – Ma0 Dec 30 '20 at 15:04
  • You are calling `hash` only after you are acquiring the lock. That seems wrong. – Booboo Dec 30 '20 at 15:06
  • 1
    You could also simplify termination processing for `summer` by having the main process joining all the `hasher` processes and then it putting a single 'END' item on the queue. Then there is no need to pass `n` to `summer` -- it is always looking for a single `END`. – Booboo Dec 30 '20 at 15:10
0
bots = []
n = 4
for _id in range(n):
    bot = mp.Process(target=hasher, args=(queue, lock, words, n, _id))
    bot.start()
    bots.append(bot)

and

def hasher(q, l, words, n, i):
    for word in words[i::n]: # so that every process only works on every n-th word
        ...
Tomerikoo
  • 18,379
  • 16
  • 47
  • 61
wuerfelfreak
  • 2,363
  • 1
  • 14
  • 29