I am using MassTransitStateMachine example based of from https://github.com/MassTransit/Sample-ShoppingWeb. Everything works fine if I have only one application of state machine running. But when I have more than one instance of state machine application running, it seems the wrong state machine application is processing message from the consumer.
i.e. StateMachineA sent command to ConsumerA. ConsumerA consumed and published event. The event did not get route to StateMachineA, instead, it got sent to StateMachineB.
I would like to deploy state machine in multiple containers, but I couldn't get pass this.
I tried to Googling on this topic, but it seems that sample or discussion on MassTransitStateMachine is a bit hard to come by. Or maybe I am not reading the documentation enough.
Commands and events
public interface IMessage
{
Guid CorrelationId { get; }
PricingSpec PricingSpec { get; }
}
public interface IPricingRequested : IMessage
{
}
public interface ISubPricingRequest : IMessage
{
}
public interface ISubPricingProcessed : IMessage
{
}
public interface IPricingProcessed : IMessage
{
}
Client
class Program
{
private static bool _continueRunning = true;
static void Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Override("MassTransit", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Console(new CompactJsonFormatter())
.CreateLogger();
Console.CancelKeyPress += Console_CancelKeyPress;
var bus = CreateBus();
Console.WriteLine("Starting Pricing Requester");
Console.ReadLine();
while(_continueRunning)
{
PricingSpec pricingSpec = new PricingSpec()
{
Symbol = $"Symbol{DateTime.Now.ToString("yyyymmddhhmmss")}",
Underlyings = new List<string>() { "AOT" },
};
bus.Publish<IPricingRequested>(new { CorrelationId = Guid.NewGuid(), PricingSpec = pricingSpec });
Console.WriteLine(pricingSpec.ToString());
Console.ReadLine();
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
e.Cancel = true;
_continueRunning = false;
}
private static IBus CreateBus()
{
var rabbitHost = new Uri("rabbitmq://localhost:5672/saga");
var user = "guest";
var password = "guest";
var inputQueue = "pricing-requester";
var bus = Bus.Factory.CreateUsingRabbitMq(configurator =>
{
var host = configurator.Host(rabbitHost, h =>
{
h.Username(user);
h.Password(password);
});
configurator.ReceiveEndpoint(host, inputQueue, c =>
{
c.Consumer(() => new PricingProcessedConsumer());
});
});
TaskUtil.Await(() => bus.StartAsync());
return bus;
}
}
State Machine
public class AutocallablePricingStateMachine : MassTransitStateMachine<AutocallablePricingState>
{
public AutocallablePricingStateMachine()
{
InstanceState(x => x.CurrentState);
this.Event(() => this.PricingRequested, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId));
this.Event(() => this.SubPricingProcessed, x => x.CorrelateById(c => c.Message.CorrelationId));
Initially(
When(PricingRequested)
.Then(context =>
{
this.UpdateSagaState(context.Instance, context.Data.PricingSpec);
})
.Then(InterceptPricingRequested)
.ThenAsync(context => this.SendCommand<ISubPricingRequest>("sub-pricer", context))
.TransitionTo(Processing));
During(Processing,
When(SubPricingProcessed)
.Then(context =>
{
InterceptSubPricingProcessed(context);
})
.Publish(context => new PricingProcessed(context.Data.CorrelationId, context.Data.PricingSpec))
.Finalize());
SetCompletedWhenFinalized();
}
private void InterceptPricingRequested(BehaviorContext<AutocallablePricingState, IPricingRequested> obj)
{
Console.WriteLine($"Sending ISubPricingRequest Command Correlation {obj.Data.CorrelationId}");
}
private void InterceptSubPricingProcessed(BehaviorContext<AutocallablePricingState, ISubPricingProcessed> obj)
{
Console.WriteLine($"Receiving ISubPricingProcessed Event Correlation {obj.Data.CorrelationId}");
}
private void UpdateSagaState(AutocallablePricingState state, PricingSpec pricingSpec)
{
var currentDate = DateTime.Now;
state.PricingSpec = pricingSpec;
}
private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<AutocallablePricingState, IMessage> context)
where TCommand : class, IMessage
{
var sendEndPoint = await context.GetSendEndpoint(new Uri($"rabbitmq://localhost:5672/saga/{endpointKey}"));
await sendEndPoint.Send<TCommand>(new
{
CorrelationId = context.Data.CorrelationId,
PricingSpec = context.Data.PricingSpec,
});
}
public SagaState Processing { get; private set; }
public Event<IPricingRequested> PricingRequested { get; private set; }
public Event<ISubPricingProcessed> SubPricingProcessed { get; private set; }
}
public class AutocallablePricingState : SagaStateMachineInstance
{
public AutocallablePricingState(Guid correlationId)
{
this.CorrelationId = correlationId;
}
public string CurrentState { get; set; }
public PricingSpec PricingSpec { get; set; }
public Guid CorrelationId { get; set; }
}
Consumer
public class SubPricingRequestConsumer : IConsumer<ISubPricingRequest>
{
public async Task Consume(ConsumeContext<ISubPricingRequest> context)
{
Console.WriteLine($"Sub Pricing for symbol {context.Message.PricingSpec.Symbol}");
await Task.Delay(2000);
this.UpdatePricingSpec(context.Message.PricingSpec);
await context.Publish<ISubPricingProcessed>(new
{
CorrelationId = context.Message.CorrelationId,
PricingSpec = context.Message.PricingSpec,
});
}
private void UpdatePricingSpec(PricingSpec pricingSpec)
{
Random random = new Random();
double premium = random.Next();
Console.WriteLine($"Sub Pricing for symbol {pricingSpec.Symbol}, Premium {premium}");
pricingSpec.Premium = premium;
}
}
I expected that if I launch multiple AutocallablePricingStateMachine application, it would consume the event from consumer that processed command that was sent by the same AutocallablePricingMachine application (or pod/container.)