3

I am using MassTransitStateMachine example based of from https://github.com/MassTransit/Sample-ShoppingWeb. Everything works fine if I have only one application of state machine running. But when I have more than one instance of state machine application running, it seems the wrong state machine application is processing message from the consumer.

i.e. StateMachineA sent command to ConsumerA. ConsumerA consumed and published event. The event did not get route to StateMachineA, instead, it got sent to StateMachineB.

I would like to deploy state machine in multiple containers, but I couldn't get pass this.

I tried to Googling on this topic, but it seems that sample or discussion on MassTransitStateMachine is a bit hard to come by. Or maybe I am not reading the documentation enough.

Commands and events

public interface IMessage
{
    Guid CorrelationId { get; }
    PricingSpec PricingSpec { get; }
}

public interface IPricingRequested : IMessage
{
}

public interface ISubPricingRequest : IMessage
{
}

public interface ISubPricingProcessed : IMessage
{
}

public interface IPricingProcessed : IMessage
{
}

Client

class Program
{
    private static bool _continueRunning = true;

    static void Main(string[] args)
    {
        Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Debug()
            .MinimumLevel.Override("MassTransit", LogEventLevel.Warning)
            .Enrich.FromLogContext()
            .WriteTo.Console(new CompactJsonFormatter())
            .CreateLogger();

        Console.CancelKeyPress += Console_CancelKeyPress;
        var bus = CreateBus();
        Console.WriteLine("Starting Pricing Requester");
        Console.ReadLine();
        while(_continueRunning)
        {
            PricingSpec pricingSpec = new PricingSpec()
            {
                Symbol = $"Symbol{DateTime.Now.ToString("yyyymmddhhmmss")}",
                Underlyings = new List<string>() { "AOT" },
            };

            bus.Publish<IPricingRequested>(new { CorrelationId = Guid.NewGuid(), PricingSpec = pricingSpec });
            Console.WriteLine(pricingSpec.ToString());
            Console.ReadLine();
        }
    }

    private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
    {
        e.Cancel = true;
        _continueRunning = false;
    }

    private static IBus CreateBus()
    {
        var rabbitHost = new Uri("rabbitmq://localhost:5672/saga");
        var user = "guest";
        var password = "guest";
        var inputQueue = "pricing-requester";
        var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
        {
            var host = configurator.Host(rabbitHost, h =>
            {
                h.Username(user);
                h.Password(password);
            });

            configurator.ReceiveEndpoint(host, inputQueue, c =>
            {
                c.Consumer(() => new PricingProcessedConsumer());
            });
        });

        TaskUtil.Await(() => bus.StartAsync());
        return bus;
    }
}

State Machine

public class AutocallablePricingStateMachine : MassTransitStateMachine<AutocallablePricingState>
{
    public AutocallablePricingStateMachine()
    {
        InstanceState(x => x.CurrentState);
        this.Event(() => this.PricingRequested, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId));
        this.Event(() => this.SubPricingProcessed, x => x.CorrelateById(c => c.Message.CorrelationId));

        Initially(
            When(PricingRequested)
            .Then(context =>
            {
                this.UpdateSagaState(context.Instance, context.Data.PricingSpec);
            })
            .Then(InterceptPricingRequested)
            .ThenAsync(context => this.SendCommand<ISubPricingRequest>("sub-pricer", context))
            .TransitionTo(Processing));

        During(Processing,
            When(SubPricingProcessed)
            .Then(context =>
            {
                InterceptSubPricingProcessed(context);
            })
            .Publish(context => new PricingProcessed(context.Data.CorrelationId, context.Data.PricingSpec))
            .Finalize());

        SetCompletedWhenFinalized();
    }

    private void InterceptPricingRequested(BehaviorContext<AutocallablePricingState, IPricingRequested> obj)
    {
        Console.WriteLine($"Sending ISubPricingRequest Command Correlation {obj.Data.CorrelationId}");
    }

    private void InterceptSubPricingProcessed(BehaviorContext<AutocallablePricingState, ISubPricingProcessed> obj)
    {
        Console.WriteLine($"Receiving ISubPricingProcessed Event Correlation {obj.Data.CorrelationId}");
    }

    private void UpdateSagaState(AutocallablePricingState state, PricingSpec pricingSpec)
    {
        var currentDate = DateTime.Now;
        state.PricingSpec = pricingSpec;
    }

    private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<AutocallablePricingState, IMessage> context)
        where TCommand : class, IMessage
    {
        var sendEndPoint = await context.GetSendEndpoint(new Uri($"rabbitmq://localhost:5672/saga/{endpointKey}"));            
        await sendEndPoint.Send<TCommand>(new
        {
            CorrelationId = context.Data.CorrelationId,
            PricingSpec = context.Data.PricingSpec,
        });
    }

    public SagaState Processing { get; private set; }
    public Event<IPricingRequested> PricingRequested { get; private set; }
    public Event<ISubPricingProcessed> SubPricingProcessed { get; private set; }
}

public class AutocallablePricingState : SagaStateMachineInstance
{
    public AutocallablePricingState(Guid correlationId)
    {
        this.CorrelationId = correlationId;
    }

    public string CurrentState { get; set; }
    public PricingSpec PricingSpec { get; set; }
    public Guid CorrelationId { get; set; }
}

Consumer

public class SubPricingRequestConsumer : IConsumer<ISubPricingRequest>
{
    public async Task Consume(ConsumeContext<ISubPricingRequest> context)
    {
        Console.WriteLine($"Sub Pricing for symbol {context.Message.PricingSpec.Symbol}");
        await Task.Delay(2000);
        this.UpdatePricingSpec(context.Message.PricingSpec);
        await context.Publish<ISubPricingProcessed>(new
        {
            CorrelationId = context.Message.CorrelationId,
            PricingSpec = context.Message.PricingSpec,
        });
    }

    private void UpdatePricingSpec(PricingSpec pricingSpec)
    {
        Random random = new Random();
        double premium = random.Next();
        Console.WriteLine($"Sub Pricing for symbol {pricingSpec.Symbol}, Premium {premium}");
        pricingSpec.Premium = premium;
    }
}

I expected that if I launch multiple AutocallablePricingStateMachine application, it would consume the event from consumer that processed command that was sent by the same AutocallablePricingMachine application (or pod/container.)

Wit B
  • 53
  • 1
  • 5
  • Where do you configure your state machine and give it to the bus? – Alexey Zimarev Aug 23 '19 at 08:24
  • You can probably put the whole thing to a github repository so it would be easier to look at as a whole. – Alexey Zimarev Aug 23 '19 at 08:25
  • Thank you @AlexeyZimarev Just added project to github https://github.com/tboonte/MassTransit_Saga_FullCircle I think when consumer publishes event, the two sagas is competing to process it. RabbitMQ just send assign it using round robin. How do we get event to the original Saga container/instance? – Wit B Aug 23 '19 at 09:05
  • You log quite a few things to the console, can you update the question with the console output? – Alexey Zimarev Aug 23 '19 at 11:33
  • 3
    you're using an in-memory saga repository, and running multiple instances of the service – so it's going to load balance the events to whichever instance has capacity, but it may not have the state machine. If you're using multiple instances, you need a shared state machine saga repository. – Chris Patterson Aug 23 '19 at 16:14
  • Thank you Chris and Alex, using InMemorySagaRepository was the cupid. I tried with RedisSagaRepository and it works as expected. Thanks again for you help! – Wit B Aug 26 '19 at 02:26

0 Answers0