Before anyone marks it as a duplicate question. I have been looking at StackOverflow posts for days, I haven't really found a good or satisfying answer.
I have a program that at some point will take individual strings (also many other arguments and objects), do some complicated processes on them, and spit 1 or more strings back. Because each string is processed separately, using multiprocessing seems natural here, especially since I work on machines with over 100 cores.
The following is a minimal example, which works with up to 12 to 15 cores, if I try to give it more cores, it hangs at p.join()
. I know it's hanging at join because I tried to add some debug prints before and after join and it would stop at some point between the two print commands.
Minimal example:
import os, random, sys, time, string
import multiprocessing as mp
letters = string.ascii_uppercase
align_len = 1300
def return_string(queue):
n_strings = [1,2,3,4]
alignments = []
# generating 1 to 4 sequences randomly, each sequence of length 1300
# the original code might even produce more than 4, but 1 to 4 is an average case
# instead of the random string there will be some complicated function called
# in the original code
for i in range(random.choice(n_strings)):
alignment = ""
for i in range(align_len):
alignment += random.choice(letters)
alignments.append(alignment)
for a in alignments:
queue.put(a)
def run_string_gen(cores):
processes = []
queue = mp.Queue()
# running the target function 1000 time
for i in range(1000):
# print(i)
process = mp.Process(target=return_string, args = (queue,))
processes.append(process)
if len(processes) == cores:
counter = len(processes)
for p in processes:
p.start()
for p in processes:
p.join()
while queue.qsize() != 0:
a = queue.get()
# the original idea is that instead of print
# I will be writing to a file that is already open
print(a)
processes = []
queue = mp.Queue()
# any leftovers processes
if processes:
for p in processes:
p.start()
for p in processes:
p.join()
while queue.qsize() != 0:
a = queue.get()
print(a)
if __name__ == "__main__":
cores = int(sys.argv[1])
if cores > os.cpu_count():
cores = os.cpu_count()
start = time.perf_counter()
run_string_gen(cores)
print(f"it took {time.perf_counter() - start}")
The suspect is that the queue is getting full, but also it's not that many strings, when I give it 20 cores, it's hanging, but that's about 20*4=80
strings (if the choice was always 4), but is that many strings for the queue to get full?
Assuming the queue is getting full, I am not sure at which point I should check and empty it. Doing it inside return_string
seems to be a bad idea as some other processes will also have the queue and might be emptying it/filling it at the same time. Do I use lock.acquire()
and lock.release()
then?
These strings will be added to a file, so I can avoid using a queue and output the strings to a file. However, because starting a process means copying objects, I cannot pass a _io.TextIOWrapper
object (which is an open file to append to) but I need to open and close the file inside return_string
while syncing using lock.acquire()
and lock.release()
, but this seems wasteful to keep opening and closing the output file to write to it.
Some of the suggested solutions out there:
1- De-queuing the queue before joining is one of the answers I found. However, I cannot anticipate how long each process will take, and adding a sleep
command after p.start()
loop and before p.join()
is bad (at least for my code), because if they finish fast and I end up waiting, that's just a lot of time wasted, and the whole idea is to have speed here.
2- Add some kind of sentinal character e.g. none
to know if one worker finished. But didn't get this part, if I run the target function 10 times for 10 cores, I will have 10 sentinels, but the problems is that it's hanging and can't get to the queue to empty and check for sentinal.
Any suggestions or ideas on what to do here?