I'm building a web server via django 1.11.5, that uses celery-3.1.23 & rabbitmq as message queue manager, to send async-tasks to a number of different demon-processes (processes with infinite loop [long time running]).
How can I dynamically create queues for each process separately, and receive messages from the process' queue inside the daemon-process, do something asynchronously, and then forward the result to another "aggregator queue", to collect & validate the results, and sending a response to the user. (please see attached ilustracion)
So far, I connected the processes via multiprocessing.connection Client and Server objects, and opened the processes by the Process object.
code - consumer:
from multiprocessing.connection import Listener
from multiprocessing import Process
def main_process_loop(path, port_id, auth_key):
# Initialize the action1 intance to handle work stream:
action_handler = ActionHandler(path)
# Initialize the infinite loop that will run the process:
pid, auth_key_bytes = int(port_id), bytes(auth_key)
address = ('localhost', pid) # family is deduced to be 'AF_INET'
while True:
try:
listener = Listener(address, authkey=auth_key_bytes)
conn = listener.accept()
input_values = conn.recv()
listener.close()
if input_values == None:
raise Exception(ERR_MSG_INVALID_ARGV)
else:
#do something with input_values and ActionHandler
# need to return success message to user
except Exception as err:
# need to return fail message to user
if __name__ == '__main__':
# worker_processes = []
for auth_key, port_id in PID_DICT.items():
path = TEMPLATE_FORMAT.format(auth_key)
p = Process(target=main_process_loop, args=(path, port_id, auth_key))
# worker_processes.append(p)
p.start()
# for p in worker_processes:
# p.join()
# print "all processes have been initiated"
code - celery task:
from multiprocessing.connection import Client
from celery import Celery
app = Celery('tasks', broker='amqp://localhost:5672//')
@app.task
def run_backend_processes(a_lst, b_lst, in_type, out_path, in_file_name):
ARGV_FORMAT = r"IN_TYPE={0} IN_PATH={1} B_LEVEL=" + str(b_lst) + " OUT_PATH={2}"
##################################################
for process in a_lst:
pid = {
'A': 6001,
'B': 6002,
'C': 6003,
'D': 6004,
}[process]
file_path = os.path.join(out_path, process + "_" + in_file_name)
argv_string = ARGV_FORMAT.format(in_type, file_path, out_path)
address = ('localhost', int(pid))
conn = Client(address, authkey=bytes(mxd_process))
conn.send(str(argv_string))
conn.close()
return 'process succeed'
and the django's view is not unique - uses "run_backend_processes.delay"
Thank you, Yoav.
Q&A Tried: