4

I feel like I'm so close to getting this working but can't seem to get it over the line...

I have a .NET Core ASP application with saga/state machine that seems to work well for the most part. It:

  1. Receives a request
  2. Publishes an event that is picked up by a routing slip
  3. When the routing slip completes it publishes an event
  4. The event is picked up the saga
  5. The saga then sends a Request to a request/response consumer
  6. The response comes back to the saga
  7. BUT THEN I then use RespondAsync to attempt to send a response back to the original calling controller but alas nothing goes back

My controller looks like:

private readonly IRequestClient<IRequestLink, IRequestLinkCompleted> _client;

public async Task<IActionResult> CreateLinkAsync([FromBody]CreateLinkRequest request)
{
        var requestLink = new RequestLink            {

            GroupName = $"{request.Name} Group",
            Name = request.Name,
            LinkId = Guid.NewGuid()
        };

        var result = await _client.Request(requestLink).ConfigureAwait(false);

        return Ok();
}

A cut down version of my saga looks like:

Request(() => LinkRequest, x => x.RequestId, cfg =>
        {
            cfg.ServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.SchedulingServiceAddress = new Uri($"rabbitmq://localhost/request_end_point_name");
            cfg.Timeout = TimeSpan.FromSeconds(30);
        });


During(RequestReceived,
                When(LinkCreatedEvent)
                    .Request(LinkRequest, context => new SelectUrlByPublicId(context.Data.DatabaseId, context.Data.LinkId))
                    .TransitionTo(LinkRequest.Pending));

            During(LinkRequest.Pending,                
                When(LinkRequest.Completed)
                    .ThenAsync(context => context.RespondAsync(new RequestLinkCompleted
                    {
                        CorrelationId = LinkRequest.GetRequestId(context.Instance),
                        DatabaseId = context.Data.DatabaseId
                    }))
                    .Finalize());

And finally in my start up code I configure the request/response as such:

services.AddScoped<IRequestClient<IRequestLink, IRequestLinkCompleted>>(x => new MessageRequestClient<IRequestLink, IRequestLinkCompleted>(_bus, new Uri($"{messageBusSettings.Host}/create_link_saga"), TimeSpan.FromSeconds(30)));

I'm guessing that the RespondAsync call isn't using the right/original requestId but I've got no idea how I check or change that. can anyone help?

Ben Thomson
  • 1,083
  • 13
  • 29

2 Answers2

3

Since the context of the original request is lost, you need to do essentially what was being done in RespondAsync yourself.

  1. Save the ResponseAddress from the request message context
  2. Save the RequestId from the same context

In your saga, when it's time to respond, you need to use context.GetSendEndpoint(context.Instance.SavedResponseAddress) and then call Send setting the RequestId in the delegate to match the saved RequestId from the original context.

Now, you might need to save these in routing slip variables since your saga doesn't get the command, just the subsequent events, but the net effect is the same, the original request message is gone and never seen by the saga.

Chris Patterson
  • 28,659
  • 3
  • 47
  • 59
  • I had same problem and you saved my life after 2 days struggling with this. – Mohammad Reza Sadreddini Jul 30 '18 at 12:45
  • I thought that was what I needed to do but I can't figure out how to set the RequestId on the send! Do I have to write a pipe? – Ben Thomson Jul 30 '18 at 23:11
  • I've just found the override with the callback sendcontext. I don't know how I didn't see that yesterday(I spent hours looking for it). So for anyone reading I got the end point as pointed out in the above answer and then did endpoint.Send(responseObject, callback: sendContext => sendContext.RequestId = context.Instance.SavedRequestId) – Ben Thomson Jul 30 '18 at 23:40
  • @BenThomson can u please update the code that contains solution – spartacus Jan 31 '19 at 12:02
  • @spartacus sorry how do you mean? The answer was not mine. – Ben Thomson Jan 31 '19 at 22:38
  • you can get the ResponseAddress and RequestId from CreateConsumeContext() – promontis Jul 15 '21 at 12:10
3

For those looking for a little clearer picture of some code snippets to get this working, here's what we did to get up and running with the saga returning a response to the initial requester. I marked the answer above as correct, but it still took us a few hours to get our implementation working since we are still fairly new to sagas and couldn't figure out where to get access to the original context instead of just our message due the way we implemented our saga structure. Hopefully this saves someone else some time in the future.

Initial Saga setup:

 public PaymentStateMachine()
        {
            InstanceState(x => x.CurrentState);

            this.ConfigureCorrelationIds();
.....
//Saga workflow registration
.....

            this.During(ValidationSucceeded
                , SetPaymentGatewaySubmissionHandler());            

            SetCompletedWhenFinalized();
        }

We had to register our initial event such that we saved some information off of the initial message into the saga table. The key pieces to note are the context.responseAddress and context.RequestId being saved into the Saga Factory.

private void ConfigureCorrelationIds()
{
Event(() => PaymentSubmittedEvent,
                x =>
                {

                    x.CorrelateBy<int>(pay => pay.OrderId, context => context.Message.Order.Id)
                    .SelectId(c => c.Message.CorrelationId);

                    x.InsertOnInitial = true;

                    x.SetSagaFactory(context => new PaymentSagaState()
                    {
                        ResponseAddress = context.ResponseAddress.ToString(),
                        CorrelationId = context.CorrelationId.Value,
                        RequestId = context.RequestId,
                        OrderId = context.Message.Order.Id,
                        Created = DateTime.Now,
                        Updated = DateTime.Now,
                        CurrentState = Initial.Name
                    });
                });
....
//Other events registered
}

From there, we then used the answer marked above to send back the response to the original message's response address when we were done processing our saga. The new PaymentSubmittedResponse is just a private implementation within the saga of our message contract that the client is waiting for on the front end.

private EventActivityBinder<PaymentSagaState, IPaymentGatewaySubmittedEvent> SetPaymentGatewaySubmissionHandler() =>
            When(PaymentGatewaySubmittedEvent)
                .Then(c => this.UpdateSagaState(c, PaymentGatewayComplete.Name))
                .Then(c => Console.Out.WriteLineAsync(
                    $"{DateTime.Now} PaymentGatewayComplete: {c.Data.Status} to {c.Instance.CorrelationId}"))
                .ThenAsync(async c =>
                {              
                    //Send response back to orignial requestor once we are done with this step               
                    var responseEndpoint =
                            await c.GetSendEndpoint(new Uri(c.Instance.ResponseAddress));

                    await responseEndpoint.Send(new PaymentSubmittedResponse(c.Data), callback: sendContext => sendContext.RequestId = c.Instance.RequestId);
                })
            .TransitionTo(ClientPaymentSubmissionResponded);

private void UpdateSagaState(BehaviorContext<PaymentSagaState> sagaContext, string status)
        {
            sagaContext.Instance.CurrentState = status;
            sagaContext.Instance.Updated = DateTime.Now;
        }

private class PaymentSubmittedResponse : IPaymentSubmittedEvent
        {
            public string Status { get; set; }

            public OrderDto Order { get; set; }

            public Guid CorrelationId { get; set; }
            public DateTime TimeStamp { get; set; }

            public PaymentSubmittedResponse(IPaymentBase paymentMessage)
            {
                Order = paymentMessage.Order;
                CorrelationId = paymentMessage.CorrelationId;
                Status = paymentMessage.Status;
                TimeStamp = paymentMessage.TimeStamp;
            }
        }

Not sure if it's entirely needed or if it's just due to how we implemented it, but we had to introduce one more saga state of ClientPaymentSubmissionResponded to handle the response message event being sent back to the original request.

Cbs25
  • 31
  • 2