0

I have following problem: I run celery with many workers. During celery startup I create few subprocesses:

proc = subprocess.Popen("program", stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)

I need these subprocesses to start and be used later by celery workers (repeatedly). So I save subprocesses to multiprocessing.Manager().dict() - something like pool...

pool = multiprocessing.Manager().dict()
pool[proc_id] = proc

All subprocesses are accessible from celery workers, but they don't work - I found out that pipes are broken in a moment, when subprocess is shared via pool. So first question: is there any chance to share subprocess pipes between another processes (celery workers)?

I also tried to save pipes to separated regular dict. Then when worker gets subprocess from pool, these pipes are wired to subprocess:

proc.stdin = dict_of_pipes[proc_id]

This solution sometimes works, but sometimes pipe is not found in dictionary - I quess because sharing regular dictionary between processes is not ok?

As "program" you can imagine /bin/bash. Locking is solved, dictionaries are never accessed by more than 1 process at a time...

Second question - is it possible to open new pipe to subprocess? (from any celery worker?) Or other solution?

Bach
  • 6,145
  • 7
  • 36
  • 61
misoK
  • 415
  • 1
  • 3
  • 15

2 Answers2

3

After some experimenting I found out that it is not possible to open pipe to already existing subprocess (my 2nd question) and that I can not copy(share) existing pipes between processes (my main problem).

So I solved it like this: Each subprocess is wrapped with python's multiprocessing.Process, which implements XML RPC server - these "wrappers" are started when celery is starting, or anytime by celery workers. After the wrapper process is started, it sends the port he is running on via multiprocessing.Pipe, these ports are saved in shared pool (multiprocessing.Manager().dict()). Celery workers can then call running subprocesses via XML RPC wrappers without troubles with pipes. XML RPC is not necessary, but it makes code simpler and easier to use.

misoK
  • 415
  • 1
  • 3
  • 15
0

It is possible, you can send the new pipe through an existing pipe. There is a question about that: Python 2.6 send connection object over Queue / Pipe / etc

That answer works for me.

# Somewhere in the main process code
#
#
in, out = Pipe()
reduced = reduction.reduce_connection(out)
in_old_pipe.send(reduced)
.
# Somewhere else in the subprocess code
.
.
reduced = out_old_pipe.recv()
newi = reduced[0](*reduced[1])

This way, you can use a main pipe to connect newly instantiated subprocesses.

Community
  • 1
  • 1
AturSams
  • 7,568
  • 18
  • 64
  • 98