2

System/Dependencies details:

CPU --> 4
requirements --> celery==4.3.0, twisted==19.7.0 , python3.7 

Below is the celery setup I have

from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor, defer
from twisted.web.error import Error
from celery import signals

app =   Celery('tasks', broker='pyamqp://guest@localhost//')

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('started new thread')

@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
    """
    This is invoked when the individual workers shut down. It just stops the twisted reactor
    @param kwargs:
    @return:
    """
    reactor.callFromThread(reactor.stop)
    print('REACTOR SHUTDOWN')

def getPage(inp):
    print(inp)
    return inp

def inThread():
    print('inside inthread method')
    try:
        result = threads.blockingCallFromThread(
            reactor, getPage, "http://twistedmatrix.com/")
    except Exception as exc:
        print(exc)
    else:
        print(result)


@app.task
def add(x, y):
    print('inside add method')
    inThread()
    return x + y

Running celery worker like below:

celery -A run worker --loglevel=info

Logs when celery start:

(2_env) ubuntu@gpy:~/app/env/src$ celery -A run worker --loglevel=info

[tasks]
  . run.add

[2020-04-09 07:25:29,357: WARNING/Worker-1] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-4] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-3] started new thread
[2020-04-09 07:25:29,364: WARNING/Worker-2] started new thread
[2020-04-09 07:25:29,367: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

calling method like below:

>>> run.add.delay(1,2)
<AsyncResult: d41680fd-7cc1-4e75-81be-6496bad0cc16>
>>> 

sometimes I can see it is working fine.

[2020-04-09 07:27:17,998: INFO/MainProcess] Received task: run.add[00934769-48c4-48b8-852c-8b746bdd5e03]
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside add method
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside inthread method
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: INFO/MainProcess] Task run.add[00934769-48c4-48b8-852c-8b746bdd5e03] succeeded in 0.00144551398989s: 3

Sometimes I can see it's not able to call getPage method and got hung like below logs

[2020-04-09 07:27:22,318: INFO/MainProcess] Received task: run.add[d41680fd-7cc1-4e75-81be-6496bad0cc16]
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside add method
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside inthread method

is there any issue in using reactor.run inside Thread?

UPDATE

I put print into *twisted.internet.threads.blockingCallFromThread* .

def blockingCallFromThread(reactor, f, *a, **kw):
    queue = Queue.Queue()
    def _callFromThread():
        print('inside _callFromThread')
        result = defer.maybeDeferred(f, *a, **kw)
        result.addBoth(queue.put)
    print('before calling _callFromThread')
    reactor.callFromThread(_callFromThread)
    print('after calling _callFromThread')
    result = queue.get()
    if isinstance(result, failure.Failure):
        result.raiseException()
    return result

I can see that the celery worker got hung only when _callFromThread method is not get called in reactor.callFromThread(_callFromThread) but when I manually stop the worker with CTRL + c then I can it get called.

Everytime I stop worker where the job was hung, it starts processing job.

Update:27 April 2020

It got solved if I use crochet to run the twisted reactor. I update the below function.

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    from crochet import setup
    setup()
    print('started new thread')
Prashant Gaur
  • 9,540
  • 10
  • 49
  • 71

1 Answers1

1

With some care, which you seem to have taken, you can run the Twisted reactor in one thread. However, you will not be able to run it in more than one thread which I suppose is what is happening when you use it with Celery. It has both instance and global state which will get stomped on if it is run in more than one thread.

Instead, try using crochet to coordinate calls onto the reactor running in a single non-main thread from as many other threads as you like.

Jean-Paul Calderone
  • 47,755
  • 6
  • 94
  • 122
  • but this code was working fine with python2 ? When i migrated my code to python3 i started getting above issue. – Prashant Gaur Apr 24 '20 at 11:53
  • If it worked it was only by accident. Twisted makes no promises about what happens if you try to run a reactor in more than one thread. – Jean-Paul Calderone Apr 24 '20 at 11:57
  • On a re-reading of the question, though, I'm not sure my earlier understanding was correct. Do you know if Celery is starting multiple threads running the reactor? I see now that the only code that runs the reactor is configure_infrastructure which is perhaps only run once when things start up. I have not used Celery though, so I'm not sure. – Jean-Paul Calderone Apr 24 '20 at 11:59
  • I have updated the question with one more finding. Please have a look. – Prashant Gaur Apr 24 '20 at 12:30
  • If I am using crochet to manage reactor here, things seem working and I do not see any task to be getting hanged. I have updated the question too, please see the recent update. – Prashant Gaur Apr 27 '20 at 13:32
  • I have provided finding over here https://stackoverflow.com/questions/61667244/celery-worker-hang-when-running-with-twisted can you please have a look? – Prashant Gaur May 08 '20 at 05:08
  • @Jean-PaulCalderone Yes, reactor is running only once and it depends on the concurrency of celery worker. if we have concurrency =4 then celery will run with 4 workers and each worker has it's own reactor running in separate process. – Harshit Agarwal May 09 '20 at 18:05