0

I'm trying to schedule Celery task from Java.

I'm sending task to RabbitMQ like this:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

boolean durable = true;
channel.queueDeclarePassive("celery");
channel.exchangeDeclarePassive("celery");

String MESSAGE_FORMAT = "{\"id\": \"%s\", " + "\"task\": \"%s\", " + "\"args\": [\"%s\"]}";
message = String.format(MESSAGE_FORMAT, UUID.randomUUID().toString(), "celery.tasks.add", "2501");
channel.basicPublish("celery", "celery", null, message.getBytes("UTF-8"));

The messages is sent:

| routing_key | exchange | message_count |                                                  payload                                                  | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+-----------------------------------------------------------------------------------------------------------+---------------+------------------+------------+-------------+
| celery      |          | 0             | {"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]} | 105           | string           |            | False 

But when I start worker:

celery -A tasks worker --loglevel=info

I get a message:

[2016-03-02 16:23:08,457: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

The full contents of the message body was: body: '{"id": "76e0fc3b-3ae0-4603-9099-3cd6bb15ffa9", "task": "celery.tasks.add", "args": ["2501"]}' (105b)
{content_type:None content_encoding:None
  delivery_info:{'redelivered': False, 'delivery_tag': 2, 'routing_key': 'celery', 'exchange': '', 'consumer_tag': 'None4'} headers={}}

So the task is not run, because the format is somewhat wrong. I followed: http://docs.celeryproject.org/en/latest/internals/protocol.html

The task is declared in tasks.py like this:

from celery import Celery

app = Celery('tasks')
app.config_from_object('celeryconfig')


@app.task
def add(materialId):
    f = open('logg','w')
    f.write('processing task: ' + materialId)
    f.close()

So I'm not sure what should be corrected. Could you help me on this?

If it's not possible I will fall back to invoking python process from java but this will complicate the deployment process so I would like to avoid it.

Edit:

To test my method I tried to schedule task this way from python also, but the error is the same:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='celery', durable=True)
channel.basic_publish(exchange='celery', routing_key='celery', body='{"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]}')

Edit2:

This is not a duplicate of Interoperating with Django/Celery From Java since the answer does not solve my problem: adjusting routing key and exchange did not solve the problem.

Community
  • 1
  • 1
nuoritoveri
  • 2,494
  • 1
  • 24
  • 28
  • Possible duplicate of [Interoperating with Django/Celery From Java](http://stackoverflow.com/questions/6933833/interoperating-with-django-celery-from-java) – scytale Mar 03 '16 at 10:15

3 Answers3

2

You need to use the BasicProperties builder to specify the content type of your message as JSON.

channel.basicPublish("celery", "celery", null, 
    new AMQP.BasicProperties().builder().contentType("application/json").build(),
    message.getBytes("UTF-8"));

For how to use it and more examples in Rabbit's documentation: https://www.rabbitmq.com/api-guide.html#publishing

For the formats supported by celery for the content type header: http://docs.celeryproject.org/en/latest/internals/protocol.html#serialization

kegfault
  • 21
  • 2
-1

You're going to need to get the raw task request that is sent from the python client code and compare that against what you're generating in Java.

scytale
  • 12,346
  • 3
  • 32
  • 46
  • Thanks, I haven't thought about it. It showed me that I should declare exchange as well (I will edit my question). However, I'm still getting the same error, even if I exactly duplicate the message send from python client (I only changed UUID). – nuoritoveri Mar 03 '16 at 09:13
-1

Alternatively you could try submitting tasks using the HTTP gateway in the examples dir in the celery source - I'm not sure how maintained this particular part of the code is.

scytale
  • 12,346
  • 3
  • 32
  • 46