I'm trying to configure a Saga that works in the following way:
- Saga receives a Shipping Order Message. That shipping order has a RouteId property that I can use to correlate Shipping Orders for the same "truck"
- These shipping orders are created by another system, that can use a batch process to send this orders. But, this system can't group the shipping orders for the same address.
- After some amount of seconds, I sent another message only with this RouteId. I need to grab all the Shipping Orders of the received RouteId, group them by Address and them translate it to another object and send to another web service.
But I'm facing two issues:
- If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed
- In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.
Relevant Code:
Bus configuration for saga
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();
SagaData class:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}
CorrelateMessages method
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}
Handle for the message that supose to initiate the Saga and send the DefferedMessage if saga IsNew
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}
And, finally, the handler for the deffered message:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}
Any help is appreciated!