0

I'm building a web server via django 1.11.5, that uses celery-3.1.23 & rabbitmq as message queue manager, to send async-tasks to a number of different demon-processes (processes with infinite loop [long time running]).

How can I dynamically create queues for each process separately, and receive messages from the process' queue inside the daemon-process, do something asynchronously, and then forward the result to another "aggregator queue", to collect & validate the results, and sending a response to the user. (please see attached ilustracion) enter image description here

So far, I connected the processes via multiprocessing.connection Client and Server objects, and opened the processes by the Process object.

code - consumer:

from multiprocessing.connection import Listener
from multiprocessing import Process

def main_process_loop(path, port_id, auth_key):

    # Initialize the action1 intance to handle work stream:
    action_handler = ActionHandler(path)


    # Initialize the infinite loop that will run the process:

    pid, auth_key_bytes = int(port_id), bytes(auth_key)
    address = ('localhost', pid)  # family is deduced to be 'AF_INET'

    while True:

        try:
            listener = Listener(address, authkey=auth_key_bytes)
            conn = listener.accept()
            input_values = conn.recv()
            listener.close()

            if input_values == None:
                raise Exception(ERR_MSG_INVALID_ARGV)

            else:
                #do something with input_values and ActionHandler

            # need to return success message to user

        except Exception as err:
            # need to return fail message to user


if __name__ == '__main__':
    # worker_processes = []
    for auth_key, port_id in PID_DICT.items():
        path = TEMPLATE_FORMAT.format(auth_key)
        p = Process(target=main_process_loop, args=(path, port_id, auth_key))
        # worker_processes.append(p)
        p.start()
    # for p in worker_processes:
    #     p.join()
    # print "all processes have been initiated"

code - celery task:

from multiprocessing.connection import Client
from celery import Celery

app = Celery('tasks', broker='amqp://localhost:5672//')

@app.task
def run_backend_processes(a_lst, b_lst, in_type, out_path, in_file_name):



    ARGV_FORMAT = r"IN_TYPE={0} IN_PATH={1} B_LEVEL=" + str(b_lst) + " OUT_PATH={2}"




    ##################################################


    for process in a_lst:


        pid = {
            'A': 6001,
            'B': 6002,
            'C': 6003,
            'D': 6004,
        }[process]



        file_path = os.path.join(out_path, process + "_" + in_file_name)
        argv_string = ARGV_FORMAT.format(in_type, file_path, out_path)

        address = ('localhost', int(pid))
        conn = Client(address, authkey=bytes(mxd_process))
        conn.send(str(argv_string))
        conn.close()
    return 'process succeed'

and the django's view is not unique - uses "run_backend_processes.delay"

Thank you, Yoav.

Q&A Tried:

  1. Celery parallel distributed task with multiprocessing

  2. Can a celery worker/server accept tasks from a non celery producer?

Yoav Abadi
  • 403
  • 7
  • 16
  • If you're doing your own multiprocessing, why are you using celery? – 2ps Jan 21 '18 at 16:58
  • Because I need to send the tasks to the long running processes asynchronously and with queues and routing. – Yoav Abadi Jan 22 '18 at 09:32
  • As @2ps suggested, I'm now using pika lib, and it's much better for my architecture. Should I close / delete this question? – Yoav Abadi Jan 24 '18 at 14:25

0 Answers0