I have a React app that POST over Klein and runs a Celery job async (via RabbitMQ). I would like to display all tasks in some table with status updates via a WAMP Pub/Sub (I'm using Crossbar as a router). My React table component gets the initial data and subscribes to changes as such:
class Table extends React.Component {
componentWillMount(){
this.props.session //Autobahn session variable passed down as prop
.subscribe("celery.update.task", (args,kwargs,details)=>{update state})
}
componentDidMount(){
//Gets all tasks in db (SQLite)
this.props.session.call("celery.all.tasks", (a,k,d)=>{set initial state}
}
render() {
//render table with data from state
}
I'm currently trying to get the WAMP components on the backend (using AutobahnPython) to publish "celery.update.task" on celery events. This application will be internet-facing so I would like to use secure websockets (wss) and so followed this tutorial for running the WAMP components. My current issue is that the WAMP component meant to listen to celery events does not publish async (once the python process is ended, all publishes then get sent):
import threading, time, ast, six
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunnner
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet._sslverify import OpenSSLCertificateAuthorities
from twisted.internet.ssl import CertificateOptions
from OpenSSL import crypto
class MonitorThread(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
from app_queue import celery #Celery instance
self.celery_app = celery
self.interval = 1
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
yield
@inlineCallbacks
#there are analogous callbacks for "received", "started" and "failed"
def handle_task_success(self, event):
...
res = yield self.publish("celery.task.update", "some_task_update")
resultValue(res)
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.select_app.events.Receiver(connection, handlers={
"task-succeeded": self.handle_task_success
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
pass
time.sleep(self.interval)
if __name__ == "__main__":
cert = crypto.load_certificate(crypto.FILETYPE_PEM,six.u(open('.crossbar/example.cert.pem', "r").read()))
options = CertificateOptions(trustRoot=OpenSSLCertificateAuthorities([cert]))
runner = ApplicationRunner(url=u"wss://127.0.0.1:443/ws", realm=u"realm1", ssl=options)
runner.run(MonitorThread)
The above code is able to monitor all events but the publish methods aren't pushed to subscribers until the python process is ended. I would like for the WAMP component to publish to "celery.task.update" on every Celery event (received, started, succeeded, failed) so that the Table component is live updated.
Is there a way to make this work? I've tried without success trying this and this.