4

I'm trying to configure a Saga that works in the following way:

  1. Saga receives a Shipping Order Message. That shipping order has a RouteId property that I can use to correlate Shipping Orders for the same "truck"
  2. These shipping orders are created by another system, that can use a batch process to send this orders. But, this system can't group the shipping orders for the same address.
  3. After some amount of seconds, I sent another message only with this RouteId. I need to grab all the Shipping Orders of the received RouteId, group them by Address and them translate it to another object and send to another web service.

But I'm facing two issues:

  1. If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed
  2. In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.

Relevant Code:

Bus configuration for saga

Bus = Configure.With(Activator)
   .Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
   .Logging(l => l.ColoredConsole())
   .Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
   .Sagas(s => {
       s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
          if (enforceExclusiveAccess)
          {
              s.EnforceExclusiveAccess();
          }
       })
   .Options(o =>
       {
         if (maxDegreeOfParallelism > 0)
         {
            o.SetMaxParallelism(maxDegreeOfParallelism);
         }
         if (maxNumberOfWorkers > 0)
         {
            o.SetNumberOfWorkers(maxNumberOfWorkers);
         }
      })
   .Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
   .Start();

SagaData class:

public class RouteListSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();

    public long RoutePlanId { get; set; }

    public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
    public bool SentToLisa { get; set; }

    public void AddShippingActivity(LisaShippingActivity shippingActivity)
    {
        if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
        {
            _shippingActivities.Add(shippingActivity);
        }
    }

    public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}

CorrelateMessages method

protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
    config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
    config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}

Handle for the message that supose to initiate the Saga and send the DefferedMessage if saga IsNew

public async Task Handle(ShippingOrder message)
{
  try
  {
    var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);

    if (Data.ShippingActivities.Contains(lisaActivity))
      return;

    Data.RoutePlanId = message.RoutePlanId;
    Data.AddShippingActivity(lisaActivity);
    var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);

    if (IsNew)
    {
      await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
    }
 }
 catch (Exception err)
 {
   Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
   throw;
 }
}

And, finally, the handler for the deffered message:

public Task Handle(VerifyRouteListIsComplete message)
{
  try
  {
    if (!Data.SentToLisa)
    {
      var lisaData = Data.GroupShippingActivitiesToLisaActivities();

      _lisaService.SyncRouteList(lisaData).Wait();

      Data.SentToLisa = true;
    }
    MarkAsComplete();
    return Task.CompletedTask;
  }
  catch (Exception err)
  {
    Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
    _serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
    MarkAsUnchanged();
    return Task.CompletedTask;
  }
}

Any help is appreciated!

Gerson Dias
  • 77
  • 2
  • 7

1 Answers1

4

I am not sure I understand the symptoms, you're experiencing, correctly.

If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed

If EnforceExclusiveAccess is called, I would expect the messages to be handled in a serial fashion, the first one with IsNew == true and the second one with IsNew == false.

If not, I would expect both messages to be handled in parallel with IsNew == true, but then – when the sage data is inserted – I would expect one of them to succeed and the other one to fail with a ConcurrencyException.

After the ConcurrencyException, the message would be processed again, this time with IsNew == false.

Is that not what you're experiencing?

In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.

Are you saying that the data in the saga data seems to be in the state that it was in when the VerifyRouteListIsComplete message was deferred?

That sounds really weird, and also pretty unlikely could you maybe try again and see if it really is so?


UPDATE: I have found out why you are experiencing this weird behavior: You have accidentally set up your saga handler instance to be re-used across messages.

You did it by registering it like this (WARNING: Don't do this!):

_sagaHandler = new ShippingOrderSagaHandler(_subscriber);

_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);

where the Subscribe method then makes this call on BuiltinHandlerActivator (WARNING: Don't do this!):

activator.Register(() => handlerInstance);

This reason why this is bad (especially for a saga handler), is because the handler instance itself is stateful – it has a Data property containing the current state of the process, and that also includes the IsNew property.

What you should ALWAYS DO, is to ensure that a new handler instance is created every time a message comes in – your code should be changed to something like this:

_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();

which can be done if the implementation of Subscribe is changed into this:

public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
    _activator.Register((bus, context) => getHandler());
    await _activator.Bus.Subscribe<T>();
}

That will solve your exclusive access problem :)

There's another problem with your code: You have a potential race condition between registering your handler and starting the subscriber bus instance, because you could in theory be unfortunate and start receiving messages in between the bus gets started and you register your handler.

You should change your code to ensure that all handlers are registered before you start the bus (and thus start receiving messages).

mookid8000
  • 18,258
  • 2
  • 39
  • 63
  • I uploaded a git repo at https://github.com/GersonDias/RebusSagaConcurrencyStackOverflow that reproduces my architecture and the error that I'm facing. If you run the project Rebus.Publisher it will publish 4 messages to a Queue. After that runs the Rebus.Subscriber project and you will see in database that are 4 defered messages besides the code to defer a message is inside a `if(IsNew)`. I called the `EnforceExclusiveAccess` as well and did not notice a concurrency exception. Can you try to take a look at this code to help me to figure out what I did wrong? I expected the same as you – Gerson Dias May 21 '18 at 21:06
  • but, the symptoms are exactly that, the messages are not handled in a serial fashion, but the saga is inserted/updated almost correctly (I can't be sure, but I feel like something messed out, especially with collections and the correct correlation based on message properties). – Gerson Dias May 21 '18 at 21:25
  • Should the problem reside in the fact that I'm sending many messages of the type defined in IAmInitiatedBy interface? – Gerson Dias May 24 '18 at 12:51
  • No, that should not make a difference. If you can reproduce the issue, e.g. in a unit test or a small console app, then I'll be happy to debug it for you. – mookid8000 May 24 '18 at 19:08
  • Tks a lot, @mookid8000! In this repo https://github.com/GersonDias/RebusSagaConcurrencyStackOverflow you can see the problem happing... One test that I'm doing using this code, is to start the Rebus.Publisher console app to send the messages and them start the Rebus.Subscriber project. You will see in database that are 4 defered messages, besides the ```if (IsNew)``` before the call to send the message... – Gerson Dias May 25 '18 at 08:48