I'm trying to schedule Celery task from Java.
I'm sending task to RabbitMQ like this:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
boolean durable = true;
channel.queueDeclarePassive("celery");
channel.exchangeDeclarePassive("celery");
String MESSAGE_FORMAT = "{\"id\": \"%s\", " + "\"task\": \"%s\", " + "\"args\": [\"%s\"]}";
message = String.format(MESSAGE_FORMAT, UUID.randomUUID().toString(), "celery.tasks.add", "2501");
channel.basicPublish("celery", "celery", null, message.getBytes("UTF-8"));
The messages is sent:
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+-----------------------------------------------------------------------------------------------------------+---------------+------------------+------------+-------------+
| celery | | 0 | {"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]} | 105 | string | | False
But when I start worker:
celery -A tasks worker --loglevel=info
I get a message:
[2016-03-02 16:23:08,457: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id": "76e0fc3b-3ae0-4603-9099-3cd6bb15ffa9", "task": "celery.tasks.add", "args": ["2501"]}' (105b)
{content_type:None content_encoding:None
delivery_info:{'redelivered': False, 'delivery_tag': 2, 'routing_key': 'celery', 'exchange': '', 'consumer_tag': 'None4'} headers={}}
So the task is not run, because the format is somewhat wrong. I followed: http://docs.celeryproject.org/en/latest/internals/protocol.html
The task is declared in tasks.py
like this:
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def add(materialId):
f = open('logg','w')
f.write('processing task: ' + materialId)
f.close()
So I'm not sure what should be corrected. Could you help me on this?
If it's not possible I will fall back to invoking python process from java but this will complicate the deployment process so I would like to avoid it.
Edit:
To test my method I tried to schedule task this way from python also, but the error is the same:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='celery', durable=True)
channel.basic_publish(exchange='celery', routing_key='celery', body='{"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]}')
Edit2:
This is not a duplicate of Interoperating with Django/Celery From Java since the answer does not solve my problem: adjusting routing key and exchange did not solve the problem.