5

I have a MassTransitStateMachine that orchestrates a process which involves creating multiple events.

Once all of the events are done, I want the state to transition to a 'clean up' phase.

Here is the relevant state declaration and filter function:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

So while the state is ImportingData, I expect to receive multiple DataImported events. Each event marks its location as done so that that IsAllDataImported method can determine if we should transition to the next state.

However, if the last two DataImported events arrive at the same time, the handler for transitioning to the CleaningUp phase fires twice, and I end up trying to perform the clean up twice.

I could solve this in my own code, but I was expecting the state machine to manage this. Am I doing something wrong, or do I just need to handle the contention myself?

Robert
  • 1,487
  • 1
  • 14
  • 26

2 Answers2

6

The solution proposed by Chris won't work in my situation because I have multiple events of the same type arriving. I need to transition only when all of those events have arrived. The CompositeEvent construct doesn't work for this use case.

My solution to this was to raise a new AllDataImported event during the MarkImportCompletedForLocation method. This method now handles determining whether all sub-imports are complete in a thread safe way.

So my state machine definition is:

            During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark the URI in the locations list as done. 
                .Then(MarkImportCompletedForLocation),

            When(AllDataImported)
                // Once all are done, we can transition to cleaning up...
                .TransitionTo(CleaningUp)
                .Then(CleanUpSources)
        );

The IsAllDataImported method is no longer needed as a filter.

The saga state has a Locations property:

public Dictionary<Uri, bool> Locations { get; set; }

And the MarkImportCompletedForLocation method is defined as follows:

    private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
    {
        lock (ctx.Instance.Locations)
        {
            ctx.Instance.Locations[ctx.Data.ImportSource] = true;
            if (ctx.Instance.Locations.Values.All(x => x))
            {
                var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
            }
        }
    }

(I've just written this so that I understand how the general flow will work; I recognise that the MarkImportCompletedForLocation method needs to be more defensive by verifying that keys exist in the dictionary.)

Robert
  • 1,487
  • 1
  • 14
  • 26
  • Interesting solution, I'm dealing with a similar issue myself where I need to track the completion of a number of events of the same type. How did you manage storing the Dictionary collection to the saga state? I'm using the EntityFramework extensions to persist the saga state, but this doesn't support storing Dictionaries. Are you using the NHibernate extension library? – simon_d Jul 11 '16 at 12:53
  • 1
    Sorry @simon_d I only just saw your comment. I'm using some trickery whereby I persist my saga state as a JSON structure in to my repository. This means I don't need to bother with DB schema changes when my state structure changes (e.g. when I update my application). As such, a dictionary type property serialises/deserialises without any issue. – Robert Dec 09 '16 at 01:08
3

You can use a composite event to accumulate multiple events into a subsequent event that fires when the dependent events have fired. This is defined using:

CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);

During(ImportingData,
    When(DataImported)
        .Then(context => { do something with data }),
    When(MoreDataImported)
        .Then(context => { do smoething with more data}),
    When(AllDataImported)
        .Then(context => { okay, have all data now}));

Then, in your state machine state instance:

class DataImportSagaState :
    SagaStateMachineInstance
{
    public int ImportStatus { get; set; }
}

This should address the problem you are trying to solve, so give it a shot. Note that event order doesn't matter, they can arrive in any order as the state of which events have been received is in the ImportStatus property of the instance.

The data of the individual events is not saved, so you'll need to capture that into the state instance yourself using .Then() methods.

Chris Patterson
  • 28,659
  • 3
  • 47
  • 59
  • Hey, thanks for the answer. I'm wondering if this will actually solve my issue though. Your example seems to work with multiple events of different types, whereas I have multiple event instances of the same type. – Robert Apr 18 '16 at 08:26
  • The data import is split among multiple handlers that will each be responsible for a subset of the total data to be imported. As each one completes, I need to decide if it's appropriate to move on to the next stage. As such, each handler will fire the same event type, but with a different identifier. – Robert Apr 18 '16 at 08:35
  • Ah, well, yeah, that's where you need to keep track of what you've received yourself, since you'll likely be dealing with ordering and other things as well. I don't think that's a built-in style of work. You could publish a different event back to yourself when you're _done_ though. – Chris Patterson Apr 18 '16 at 20:41
  • Yeah - order is not actually important in this case but I see what you are saying. I'll post my solution as a separate answer to help anyone else that has this question. – Robert Apr 18 '16 at 21:16