1

I am looking for a python library which permits to : manage a task queue, run tasks in parallel (on one or several computers), permits that a task can generate other tasks in the queue and is compatible with UNIX and Windows.

I read some doc about Celery, RQ, SCoOP, multiprocessing for the task manager part and redis, rabbitMQ and ZMQ for the message broker part but I don't really know what's the best option.

mrnl
  • 51
  • 4
  • I'd use Redis, from Python, bash, Perl, PHP https://stackoverflow.com/a/22220082/2836621 – Mark Setchell Apr 25 '19 at 08:51
  • I'd suggest using Celery with Redis broker wrapped up in Docker containers. While using Docker in Swarm mode, you'll be able to spawn your Celery workers across multiple servers in no time. There are also many tools to manage running Celery tasks such as [Flower](https://flower.readthedocs.io/en/latest/screenshots.html). – constt Apr 26 '19 at 04:55

1 Answers1

1

Consider the Python multiprocessing library.

This allows many multiprocessing options such as running multiple processes as a pool of workers using a queue of work. It runs on one server, but you could implement a connector which executes the work on another server (eg via SSH and remotely running a python executable).

Otherwise I'm not aware of a Python library that can work cross-server and cross-platform. You might need a containerised application - something like Kubernetes.

Below is some sample code I wrote which adds "task ids" to a Queue which represents runnable tasks. These can then be executed in parallel by a Pool of workers.

import time
from multiprocessing import Queue, Pool, Process
from Queue import Empty

# For writing to logs when using multiprocessing
import logging
from multiprocessing_logging import install_mp_handler()


class RuntimeHelper:
    """
    Wrapper to your "runtime" which can execute runs and is persistant within a worker thread.
    """
    def __init__(self):
        # Implement your own code here
        # Do some initialisation such as creating DB connections etc
        # Will be done once per worker when the worker starts
        pass

    def execute_run(self, run_id):
        # Implement your own code here to actually do the Run/Task.
        # In this case we just sleep for 30 secs instead of doing any real work
        time.sleep(30)
        pass


def worker(run_id_queue):
    """
    This function will be executed once by a Pool of Processes using multiprocessing.Pool
    :param queue: The thread-safe Queue of run_ids to use
    :return:
    """
    helper = RuntimeHelper()
    # Iterate runs until death
    logging.info("Starting")
    while True:
        try:
            run_id = run_id_queue.get_nowait()
            # A run_id=None is a signal to this process to die
            # An empty queue means: dont die, the queue is just empty for now and more work could be added soon
            if run_id is not None:
                logging.info("run_id={0}".format(run_id))
                helper.execute_run(run_id)
            else:
                logging.info("Kill signal received")
                return True
        except Empty:
            # Wait X seconds before checking for new work
            time.sleep(15)


if __name__ == '__main__':
    num_processes = 10
    check_interval_seconds = 15
    max_runtime_seconds = 60*15

    # ==========================================
    # INITIALISATION
    # ==========================================
    install_mp_handler() # Must be called before Pool is create

    queue = Queue()
    pool = Pool(num_processes, worker, (queue,))
    # don't forget the coma here  ^

    # ==========================================
    # LOOP
    # ==========================================

    logging.info('Starting to do work')

    # Naive wait-loop implementation
    max_iterations = max_runtime_seconds / check_interval_seconds
    for i in range(max_iterations):
        # Add work
        ready_runs = <Your code to get some runs>
        for ready_run in ready_runs:
            queue.put(ready_run.id)
        # Sleep while some of the runs are busy
        logging.info('Main thread sleeping {0} of {1}'.format(i, max_iterations))
        time.sleep(check_interval_seconds)

    # Empty the queue of work and send the kill signal (run_id = None)
    logging.info('Finishing up')
    while True:
        try:
            run_id = queue.get_nowait()
        except Empty:
            break
    for i in range(num_processes):
        queue.put(None)
    logging.info('Waiting for subprocesses')

    # Wait for the pool finish what it is busy with
    pool.close()
    pool.join()
    logging.info('Done')
BjornO
  • 851
  • 8
  • 16