13

As I understand message brokers like RabbitMQ facilitates different applications written in different language/platform to communicate with each other. So since celery can use RabbitMQ as message broker, I believe we can queue task from any application to Celery, even though the producer isn't written in Python.

Now I am trying to figure out how I can queue a task to Celery from an application written in C# via RabbitMQ. But I could not find any such example yet.

The only information close to this I found is this SO question

Where the accepted answer suggests to use the Celery message format protocol to queue messages to RabbitMQ from Java. However, the link given in the answer does not have any example, only the message format.

Also, the message format says task id (UUID) is required to communicate in this protocol. How is my C# application supposed to know the task id of the celery task? As I understand it can only know about the task name, but not the task id.

Community
  • 1
  • 1
Joe
  • 155
  • 1
  • 4

3 Answers3

8

I don't know whether the question is still relevant, but hopefully the answer will help others.

Here is how I succeeded in queening a task to Celery example worker.

  1. You'll need to establish connection between your producer(client) to RabbitMQ as described here.

        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = username;
        factory.Password = password;
        factory.VirtualHost = virtualhost;
        factory.HostName = hostname;
        factory.Port = port;
    
        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();
    

    In default RabbitMQ configuration there is only Guest user which can only be used for local connections (from 127.0.0.1). An answer to this question explains how to define users in RabbitMQ.

  2. Next - creating a callback to get results. This example is using Direct reply-to, so an answer listener will look like:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var ansBody = ea.Body;
            var ansMessage = Encoding.UTF8.GetString(ansBody);
            Console.WriteLine(" [x] Received {0}", ansMessage);
            Console.WriteLine(" [x] Done");
        };
        channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
    
  3. Creating a task message that Celery will consume:

        IDictionary<string, object> headers = new Dictionary<string, object>();
        headers.Add("task", "tasks.add");
        Guid id = Guid.NewGuid();
        headers.Add("id", id.ToString());
    
        IBasicProperties props = channel.CreateBasicProperties();
        props.Headers = headers;
        props.CorrelationId = (string)headers["id"];
        props.ContentEncoding = "utf-8";
        props.ContentType = "application/json";
        props.ReplyTo = "amq.rabbitmq.reply-to";
    
        object[] taskArgs = new object[] { 1, 200 };
    
        object[] arguments = new object[] { taskArgs, new object(), new object()};
    
        MemoryStream stream = new MemoryStream();
        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[]));
        ser.WriteObject(stream, arguments);
        stream.Position = 0;
        StreamReader sr = new StreamReader(stream);
        string message = sr.ReadToEnd();
    
        var body = Encoding.UTF8.GetBytes(message);
    
  4. And finally, publishing the message to RabbitMQ:

            channel.BasicPublish(exchange: "",
                             routingKey: "celery",
                             basicProperties: props,
                             body: body);
    
Community
  • 1
  • 1
Igor Kleinerman
  • 143
  • 1
  • 8
  • @Igor..Thanks for the answer, please could you create a github gist showing the celery code that accepts the task. – FrankDupree Jun 21 '18 at 13:31
  • @FrankDupree, It's very simple. [Basic example](http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application) in Celery documentation is what you are looking for. It also includes steps to setup and run a worker. – Igor Kleinerman Jun 27 '18 at 20:14
  • @IgorKleinerman planning to use this to have a C# API pass work to be done to Pandas functions.. Have you had any issues with this approach? – Giannis May 03 '19 at 13:55
6

Celery comes with Flower. Flower provides a REST API to managing tasks. https://flower.readthedocs.io/en/latest/api.html#post--api-task-async-apply-(.+) In most cases this will be much simpler and robust to use than creating tasks manually and inserting them on the MQ.

Giannis
  • 5,286
  • 15
  • 58
  • 113
4

According to this article, celery .Net client uses default TaskScheduler that comes with .Net Framework. This knows how to generate ID for your task. This article also points to some example here.