1

We're am trying to serialize processing of a list of business objects using a Saga.

Right now, without a Saga, we simply loop through a list of objects, and fire off a bus.Send(new ProcessBusinessObejct(obj)) async to have handlers execute. So the processing happens more or less in parallel, depending on this setting, I believe:

endpointConfiguration.LimitMessageProcessingConcurrencyTo( 4 );

This has worked fine but the amount of concurrent handlers is now hard on the database.

It would be OK to trigger these handlers in series, i.e. continue with the next only when the current process has finished (failed or succeeded). We don't want to set the concurrency to 1, it would affect all handlers in the endpoint.

The idea is to use the Scatter/Gather pattern and a Saga to keep track of the number of objects and update the state machine with the a count (total count, failed count, success count), and lastly fire an event when the list is done/empty.

The problem is

A) I'm not sure how to keep track of the list in the saga. The SagaData would need a List to keep all objects? Then remove an instance when a handler signals it's done processing. The saga does not support hierarchical data and hence no List or List. I believe this is still the case in NSB v7.

And B) Is this use of saga feasable or overkill or is there a much simpler way to accomplish this?

We are using Sql Server persistence and transport and NSB 7.

Any input is much appreciated!

John
  • 3,591
  • 8
  • 44
  • 72

1 Answers1

1

I think you are looking to do this. Mind you, depending on the persistence layer you are using, you might need to separate the actual import from updating the saga state. I have blogged about this here.

Saga data can also store a List, but I think in most of the scenarios you can get away with counts. Another important note (although it should be obvious) is that if a message fails to process and goes to the error queue (e.g. an uncaught exception in ImportData), the whole saga will be left incompleted until that message is retried and processed.

public class MySaga : Saga<MySagaData>
   : IAmStartedByMessages<StartTheProcess>,
     IHandleMessages<ImportData>,
     IHandleMessages<ImportFinished>
{
    public async Task Handle(StartTheProcess message, IMessageHandlerContext context)
    {
        Data.ObjectsToImport = message.ObjectCount;
        Data.JobID = Guid.NewGuid(); //To generate a correlation ID to connect future messages back to this saga instance

        foreach(var id in message.ObjectIdsToImport)
        {
            await context.SendLocal(new ImportData
            {
                JobID = Data.JobID //You need this to correlate messages back to the saga
                //Anything else you need to pass on to ImportData
                ObjectIdToImport = id
            }
        });
    }

    public async Task Handle(ImportData message, IMessageHandlerContext context)
    {
        //import the data and increment the counter
        var result = ImportData(message.ObjectIdToImport);
        if(result == Result.Success)
        {
            Data.SuccessImport++;
        }
        else
        {
            Data.FailedImport++;
        }

        await CheckIfFinished(context);
    }

    public async Task Handle(ImportFinished message, IMessageHandlerContext context)
    {
        //do any post cleanups or Mark as complete 
        MarkAsComplete();
        return Task.CompletedTask;
    }

    private async Task CheckIfFinished(IMessageHandlerContext context)
    {
        if(Data.SuccessImport + Data.FailedImport == Data.ObjectsToImport)
        {
            //Everything is done
            context.SendLocal(new ImportFinished { JobID = Data.JobID });
        }
    }
}
Hadi Eskandari
  • 25,575
  • 8
  • 51
  • 65
  • thank you for you reply. The loop in your sage is basically what I have, without the saga. That loop wouldn't process the objects one by one, i.e. process the next message after the current has completed. And yes, the actual import is done separately in a handler, not in the saga. That saga is only there to manage the state of the state machine. – John Sep 07 '18 at 09:29
  • That's why you need the saga. Not saying you can't do this with vanilla handlers + database but you'll have to manage the state (plus transactions, etc.) yourself. Also, you can keep the saga as an orchestrator and push the actual processing messages to another endpoint/handler. – Hadi Eskandari Sep 09 '18 at 03:44