0

please tell me how to do this? I want to start a saga in the payment controller of a microservice, wait until other microservices perform the necessary actions and get the result of the saga in order to inform the client about the result. how can i get the result of the saga in the controller action that publish payment event ? All Publish / Send overloads return Task

I want to get something like this

saga

public class InvetsmentBuyerStateMachine : MassTransitStateMachine<InvetsmentBuyerState>
    {
        /// <summary>
        /// Log
        /// </summary>
        private readonly ILog _log;

        public InvetsmentBuyerStateMachine(IPortfolioManager portfolioManager)
        {
            _log = LogProvider.For<InvetsmentBuyerStateMachine>();

            State(() => WaitingForResponce);

            Event(() => InvestmentNew, x => x.CorrelateById(os => os.Investment.Id, ctx => ctx.Message.Investment.Id).SelectId(context => Guid.NewGuid()));

            Event(() => InvetsmentStrategyApproved, x => x.CorrelateById(os => os.Investment.Id, ctx => ctx.Message.Investment.Id));
            Event(() => InvetsmentStrategyRevoked, x => x.CorrelateById(os => os.Investment.Id, ctx => ctx.Message.Investment.Id));

            Event(() => OrderManagementApproved, x => x.CorrelateById(os => os.Investment.Id, ctx => ctx.Message.Investment.Id));
            Event(() => OrderManagementRevoked, x => x.CorrelateById(os => os.Investment.Id, ctx => ctx.Message.Investment.Id));

            CompositeEvent(() => InvestmentApproved, x => x.BuyApprovementState, InvetsmentStrategyApproved, OrderManagementApproved);

            Initially(
                When(InvestmentNew)
                    // Set received Investment to machine state
                    .Then(ctx => ctx.Instance.Investment = ctx.Data.Investment)
                    // Log
                    .ThenAsync(ctx => Task.Run(() => _log.Debug($"Investment {ctx.Instance.Investment.ToString()} added to buyer")))
                    .TransitionTo(WaitingForResponce));

            During(WaitingForResponce,
                Ignore(InvetsmentStrategyRevoked),

                When(InvestmentApproved)
                    // Add investment into portfolio
                    .Then(ctx => portfolioManager.Add(ctx.Instance.Investment))
                    // Log
                    .ThenAsync(ctx => Task.Run(() => _log.Debug($"Investment {ctx.Instance.Investment.ToString()} bought into portfolio")))
                    // Publish new portfolio
                    .Then(ctx => ctx.Publish<InvetsmentBuyerState, IPortfolio>(new { Investments = portfolioManager.GetInvestments() }))
                    .Finalize(),

                When(OrderManagementRevoked)
                    // Log
                    .ThenAsync(ctx => Task.Run(() => _log.Debug($"Investment {ctx.Instance.Investment.ToString()} revoked")))
                    .Finalize());

            SetCompletedWhenFinalized();
        }

        /// <summary>
        /// State waiting for responce from decision process
        /// </summary>
        public State WaitingForResponce { get; private set; }

        /// <summary>
        /// New investment occured on bus
        /// </summary>
        public Event<IInvestmentOfferNew> InvestmentNew { get; private set; }

        /// <summary>
        /// Composite event investment approved
        /// </summary>
        public Event InvestmentApproved { get; private set; }

        /// <summary>
        /// Strategy approved the investment
        /// </summary>
        public Event<IInvestmentOfferStrategyApproved> InvetsmentStrategyApproved { get; private set; }

        /// <summary>
        /// Strategy revoked the investment
        /// </summary>
        public Event<IInvestmentOfferStrategyRevoked> InvetsmentStrategyRevoked { get; private set; }

        /// <summary>
        /// Order management approved the investment
        /// </summary>
        public Event<IInvestmentOfferOrderManagementApproved> OrderManagementApproved { get; private set; }

        /// <summary>
        /// Order management revoked the investment
        /// </summary>
        public Event<IInvestmentOfferOrderManagementRevoked> OrderManagementRevoked { get; private set; }
    }

Publish and get response

var response = await _bus.Publish<IInvestmentOfferNew>(new
                {
                    Investment = investment
                });
   

return Json(response) // OrderManagementRevoked or OrderManagementApproved

Arthur
  • 13
  • 1
  • 2
  • Just how long do you expect that API controller to wait for all the orchestrated behaviors to complete? And if you want to get a response, you can use the request client and respond to the original request (you'll need to keep the ResponseAddress/RequestId around), but it can be done if the state machine is expected to complete quickly enough. – Chris Patterson Apr 22 '21 at 23:23
  • @ChrisPatterson, I can wait up to 10-15 seconds for a response – Arthur Apr 23 '21 at 05:52
  • Likely a duplicate of this question, and the right answer: https://stackoverflow.com/a/51593956/1882 – Chris Patterson Apr 23 '21 at 12:27

0 Answers0