2

For my application architecture I've an API which publish messages to RabbitMQ based on requests received. A windows service acts as consumer for RabbitMQ which process messages and updates the DB or in certain cases uses SignalR callbacks.

Once a message is received with the service there are multiple files being generated, couple of DB updates and process and finally an external API is called once data is ready.

I was thinking to use Semaphore Slim library to parallel process the messages with control over thread throttling. But as I read through couple of articles I got to know that .NET TPL DataFlow or Reactive Extensions can very well handle this scenario. But I'm confused, What to choose?

All the guide and references I find online, discuss about process a list of message which is already available and to be executed. But for my case the messages are dynamic, which should be process as EventHanlder delegate of RabbitMQ .Net client is invoked.

This is the code snippet where I consume the message

var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                
                ProcessMessage(message);

                channel.BasicAck(ea.DeliveryTag, true);
            };

Could you guide me to whether this scenario can be accomplished or is it over complicating my entire model?

Rahul S
  • 73
  • 2
  • 7
  • 1
    Wrong site for such question. – aybe Jul 06 '20 at 19:15
  • Related: [How do Reactive Framework, PLINQ, TPL and Parallel Extensions relate to each other?](https://stackoverflow.com/questions/2138361/how-do-reactive-framework-plinq-tpl-and-parallel-extensions-relate-to-each-oth) – Theodor Zoulias Jul 07 '20 at 01:33
  • 2
    The two libraries are *very* different. Rx is meant to *analyze* event streams, not process them in a pipeline. While you can create a pipeline, and process events in parallel, you have to override defaults at every step. Event analytics *don't* require multiple threads so Rx use a single thread by default. OTOH DataFlow is used to create a pipeline of *stateless* blocks, each of them running on a separate thread. You can easily modify the number of tasks per block, but any form of analytics requires significant coding. – Panagiotis Kanavos Jul 07 '20 at 17:12
  • 2
    You *can* combine the two though - a DataFlow block can be a publisher in an Rx query that eg produces sliding windows of events, or emits a new event only when X new events occur in a window. An Rx query can be used as a source for a DataFlow block too. – Panagiotis Kanavos Jul 07 '20 at 17:13
  • 2
    If you *don't* need any event analytics, the appropriate library is TPL Dataflow – Panagiotis Kanavos Jul 07 '20 at 17:14
  • @Panagiotis Kanavos This piece of information helps. I mostly don't need any event analytics for now. TPL would be right candidate for this scenario then. But I was struggling to find any documentation regarding TPL DataFlow which explains how to process messages as received from an external queue. Could you help me with this. – Rahul S Jul 07 '20 at 17:20

1 Answers1

0

IMHO RX promotes a purist/functional/side-effect-free approach to things, while TPL Dataflow allows more freedom to use "dirty" workarounds. So although you could use either to solve the problem at hand, with RX you'll end up with a prettier solution, while with TPL Dataflow you'll end up with a working solution faster. Also, if you know nothing about these libraries, the TPL Dataflow has a smoother learning curve and its documentation is familiar and maintained by Microsoft, while RX has an unfamiliar centralized documentation, unified for all platforms and languages.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks you for information. I have referred these libraries and documentation before but mostly in all the articles everyone is taking about processing a collection of data which is already available. But here in my case, the data is dynamic. I've to process messages as received from RabbitMQ queue concurrently with throttling on it. The message processing will include multiple blocks it goes through. – Rahul S Jul 07 '20 at 17:02
  • This is the code snippet where I consume the message var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); ProcessMessage(message); }; – Rahul S Jul 07 '20 at 17:02
  • 1
    @RahulS both RX and TPL Dataflow can process a stream of incoming messages. It's up to you if you want to do it with style (RX) or efficiency (TDF). – Theodor Zoulias Jul 07 '20 at 17:07
  • That is exactly what I'm looking out for. Could you suggest me some documentation where I can refer this dynamic message processing using RX or TPL. That would really help. – Rahul S Jul 07 '20 at 17:12
  • @RahulS I don't have in mind any specific documents. I just know how these libraries work. With TPL Dataflow you create a pipeline and then feed it using the `SendAsync` method of the first block in the pipeline, and then the data flows through the pipeline. You can call `SendAsync` any time you like. The pipeline is ready to process messages all the time. With RX you transform your event source to an `Observable`, using the `FromEvent` operator, transform the `Observable` further using the available operators, and finally subscribe to it to receive the incoming transformed notifications. – Theodor Zoulias Jul 07 '20 at 17:40