So I tried to experiment with some service downloading and resizing images (using threads for downloading the images and processes to resize them). I fire up the download threads (with a manager thread that will watch them) and as soon as an image is saved locally I add its path to a Queue. The manager thread will add a poison pill to the queue when all images are downloaded.
The main thread meanwhile watches the queue and gets the paths from it as they are downloaded and it fires up a new async process from a pool to resize the image.
At the end when I try to join the pool it hangs sometimes, seems to be a deadlock. It does not happen every time but the more url in the IMG_URLS list are the more often it happens. In case of this deadlock happens the logs tell us that some processes were not properly started or are in a deadlock immediately because the "resizing {file}" log does not appear for them.
import logging
import multiprocessing as mp
import time
from queue import Queue
from threading import Thread
def resize_image(file):
logging.info(f"resizing {file}")
time.sleep(0.1)
logging.info(f"done resizing {file}")
class Service(object):
def __init__(self):
self.img_queue = Queue()
def download_image(self, url) -> None:
logging.info(f"downloading image from URL {url}")
time.sleep(1)
file = f"local-{url}"
self.img_queue.put(file)
logging.info(f"image saved to {file}")
def download_images(self, img_url_list: list):
logging.info("beginning image downloads")
threads = []
for url in img_url_list:
t = Thread(target=self.download_image, args=(url,))
t.start()
threads.append(t)
for t in threads:
t.join()
logging.info("all images downloaded")
self.img_queue.put(None)
def resize_images(self):
logging.info("beginning image resizing")
with mp.Pool() as p:
while True:
file = self.img_queue.get()
if file is None:
logging.info("got SENTINEL")
break
logging.info(f"got {file}")
p.apply_async(func=resize_image, args=(file,))
p.close()
p.join()
logging.info("all images resized")
def run(self, img_url_list):
logging.info("START service")
dl_manager_thread = Thread(target=self.download_images, args=(img_url_list,))
dl_manager_thread.start()
self.resize_images()
logging.info(f"END service")
if __name__ == "__main__":
FORMAT = "[%(threadName)s, %(asctime)s, %(levelname)s] %(message)s"
logging.basicConfig(level=logging.DEBUG, format=FORMAT)
IMG_URLS = list(range(8))
service = Service()
service.run(IMG_URLS)
When running this with python 3.8.5 (Ubuntu 20.04, Ryzen 2600). I get the following:
[MainThread, 2020-11-30 19:58:01,257, INFO] START service
[Thread-1, 2020-11-30 19:58:01,257, INFO] beginning image downloads
[MainThread, 2020-11-30 19:58:01,257, INFO] beginning image resizing
[Thread-2, 2020-11-30 19:58:01,258, INFO] downloading image from URL 0
[Thread-3, 2020-11-30 19:58:01,258, INFO] downloading image from URL 1
[Thread-4, 2020-11-30 19:58:01,258, INFO] downloading image from URL 2
[Thread-5, 2020-11-30 19:58:01,259, INFO] downloading image from URL 3
[Thread-6, 2020-11-30 19:58:01,260, INFO] downloading image from URL 4
[Thread-7, 2020-11-30 19:58:01,260, INFO] downloading image from URL 5
[Thread-8, 2020-11-30 19:58:01,261, INFO] downloading image from URL 6
[Thread-9, 2020-11-30 19:58:01,262, INFO] downloading image from URL 7
[Thread-2, 2020-11-30 19:58:02,259, INFO] image saved to local-0
[MainThread, 2020-11-30 19:58:02,260, INFO] got local-0
[Thread-3, 2020-11-30 19:58:02,260, INFO] image saved to local-1
[Thread-4, 2020-11-30 19:58:02,260, INFO] image saved to local-2
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-1
[MainThread, 2020-11-30 19:58:02,261, INFO] resizing local-0
[Thread-5, 2020-11-30 19:58:02,261, INFO] image saved to local-3
[Thread-6, 2020-11-30 19:58:02,261, INFO] image saved to local-4
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-2
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-3
[MainThread, 2020-11-30 19:58:02,262, INFO] resizing local-1
[Thread-7, 2020-11-30 19:58:02,262, INFO] image saved to local-5
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-3
[Thread-8, 2020-11-30 19:58:02,263, INFO] image saved to local-6
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-6
[MainThread, 2020-11-30 19:58:02,264, INFO] resizing local-6
[Thread-9, 2020-11-30 19:58:02,264, INFO] image saved to local-7
[MainThread, 2020-11-30 19:58:02,265, INFO] got local-7
[Thread-1, 2020-11-30 19:58:02,265, INFO] all images downloaded
[MainThread, 2020-11-30 19:58:02,265, INFO] got SENTINEL
[MainThread, 2020-11-30 19:58:02,265, INFO] resizing local-7
[MainThread, 2020-11-30 19:58:02,362, INFO] done resizing local-0
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-1
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-3
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-4
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-5
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-6
[MainThread, 2020-11-30 19:58:02,366, INFO] done resizing local-7
And sometimes here it starts hanging. Notice that resizing local-2 log is missing, so that process didn't start or it waits for something.
If I change the pool to use spawning not forking, it works fine. I guess the fork copies some lock in an acquired state in some cases and that is the issue, but I don't clearly see where and why.
with mp.get_context("spawn").Pool() as p:
Any idea?