6

I have developed asynchronous Spring Cloud Stream services, and I am trying to develop an edge service that uses @MessagingGateway to provide synchronous access to services that are async by nature.

I am currently getting the following stack trace:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

My @MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

If I consume the message on the reply channel via a @StreamListener, it works just fine:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      }
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);
    }
  }

On the producer side, I am configuring requiredGroups to ensure that multiple consumers can process the message, and correspondingly, the consumers have matching group configurations.

Consumer:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created

Producer:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

The bit of code on the producer side that processes the request and sends the response:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

I can debug and see that the request is received and processed, but when the response is sent to the reply channel, that's when the error occurs.

To get the @MessagingGateway working, what configurations and/or code am I missing? I know I'm combining Spring Integration and Spring Cloud Gateway, so I'm not sure if using them together is causing the issues.

Keith Bennett
  • 733
  • 11
  • 25

3 Answers3

7

It's good question and really good idea. But it isn't going to work so easy.

First of all we have to determine for ourselves that gateway means request/reply, therefore correlation. And this available in @MessagingGateway via replyChannel header in face of TemporaryReplyChannel instance. Even if you have an explicit replyChannel = AccountChannels.ACCOUNT_CREATED, the correlation is done only via the mentioned header and its value. The fact that this TemporaryReplyChannel is not serializable and can't be transferred over the network to the consumer on another side.

Luckily Spring Integration provide some solution for us. It is a part of the HeaderEnricher and its headerChannelsToString option behind HeaderChannelRegistry:

Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/> is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher

But in this case you have to introduce an internal channel from the gateway to the HeaderEnricher and only the last one will send the message to the AccountChannels.CREATE_ACCOUNT_REQUEST. So, the replyChannel header will be converted to a string representation and be able to travel over the network. On the consumer side when you send a reply you should ensure that you transfer that replyChannel header as well, as it is. So, when the message will arrive to the AccountChannels.ACCOUNT_CREATED on the producer side, where we have that @MessagingGateway, the correlation mechanism is able to convert a channel identificator to the proper TemporaryReplyChannel and correlate the reply to the waiting gateway call.

Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory.

More info about gateway: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

UPDATE

Some code for help:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();

}

UPDATE

Some simple application to demonstrate the PoC:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}

The application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

I use spring-cloud-starter-stream-rabbit.

The

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()

Does the trick copying request headers to the reply message. So, the gateway is able on the reply side to convert channel identifier in the headers to the appropriate TemporaryReplyChannel to convey the reply properly to the caller of gateway.

The SCSt issue on the matter: https://github.com/spring-cloud/spring-cloud-stream/issues/815

dschulten
  • 2,994
  • 1
  • 27
  • 44
Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks for the quick reply, Artem. I think I understand the gist of what you are stating, but I want to make sure I understand. Are you stating that I need to create a HeaderEnricher bean on the side with the @MessagingGateway? If so, how would I configure its inputChannel and outputChannel attributes given that the current requestChannel is AccountChannels.CREATE_ACCOUNT_REQUEST? – Keith Bennett Dec 13 '17 at 19:58
  • Also, I'm not clear on how the TemporaryReplyChannel plays into the creation of the HeaderEncricher bean. – Keith Bennett Dec 13 '17 at 20:01
  • The Gateway creates `TemporaryReplyChannel` and populates it into the message it sends to the `requestChannel`. – Artem Bilan Dec 13 '17 at 20:03
  • That `requestChannel` must be as input for the `HeaderEncricher`, something internal, not binding destination. The `outputChannel` of the `HeaderEncricher` will be already `AccountChannels.CREATE_ACCOUNT_REQUEST` – Artem Bilan Dec 13 '17 at 20:04
  • So, if my bean has `@Transformer(inputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST_HEADERS, outputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST)`, which channel do I specify as the argument to `headerChannelRegistry.channelToChannelName(channel)`? Sorry if I'm missing something obvious here. – Keith Bennett Dec 13 '17 at 20:38
  • You don't specify any channel there, you have to take a look into Java DSL `HeaderEnricherSpec` and use its `headerChannelsToString()` operation. The raw `HeaderEnricher` doesn't expose that option. So, you should declare a simple `IntegrationFlow` instead of `@Transformer` – Artem Bilan Dec 13 '17 at 20:50
  • I've been trying to figure out how to do what you're suggesting and have hit a wall. I haven't worked with the Spring Integration DSL before. Is it possible for you to share the beans I need to configure to get this done, taking into account the code I originally shared? If so, I really appreciate it. – Keith Bennett Dec 13 '17 at 22:12
  • See an UPDATE in my answer. – Artem Bilan Dec 13 '17 at 22:17
  • Your update helped. However, I am still getting `2017-12-13 16:37:46.582 WARN [accounts-edge,84460ed05e3dabdf,de980fadff1f95df,false] 24308 --- [nt-created-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{...`. I see in my logs that the replyChannel is populated with the channel name I specified, but is it possible that I should specify the same name as I have configured in `requiredGroups`? Could that be why the message isn't being sent? – Keith Bennett Dec 13 '17 at 22:50
  • No, `replyChannel` header must be exactly a `TemporaryReplyChannel` instance or its string representation after converting to string via that header-enricher function. Looks like you somehow overrides it somewhere downstream or even in the consumer on the other side. – Artem Bilan Dec 14 '17 at 01:40
  • I can see on the @MessagingGateway side that the message is received. I see this in the logs: `headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=account-created, amqp_deliveryTag=2, amqp_consumerQueue=account-created.accounts-edge-account-created, amqp_redelivered=false, messageSent=true, spanTraceId=38056f7842484161, spanId=c02b30b3a8dc7fb6, amqp_receivedRoutingKey=account-created, replyChannel=account-created, id=81f5b33a-0e37-3425-c692-5b4197a828b2, spanSampled=0, amqp_consumerTag=amq.ctag-giNMTOHkOzGxVRTqOwjXrA, contentType=application/json;charset=UTF-8}]`. – Keith Bennett Dec 14 '17 at 15:54
  • But the message is never processed. What I don't understand is that on the service side, I set the replyChannel to the TemporaryReplyChannel value, but the amqp_consumerQueue is still set to the value I used when I had everything wired up to process the message with a @StreamListener. I'm trying, on the processing side, to both return a response to the MessagingGateway AND send it through the asynchronous channel to other processes. Also, I'm not clear why I would set the replyChannel to `AccountChannels.ACCOUNT_CREATED` when on the processing side I set it to the TemporaryReplyChannel value. – Keith Bennett Dec 14 '17 at 16:02
  • ??? You shouldn't set anything by your self. That's wrong. The gateway does that automatically because it is about request/reply and therefore correlation. If you override it, you get unexpected behavior. Would it be so hard for you to share Spring Boot app with us to play with the code – Artem Bilan Dec 14 '17 at 16:04
  • I sure wish I could share the code, but I can't due to NDA with my client. In fact, I've had to mask the actual code I'm developing and posting here. Has Pivotal developed any examples where Spring Cloud Stream is used to provide an asynchronous, event-driven set of services while at the same time providing for a synchronous messaging gateway to the services? Specifically, what I'm doing is using the groups/requiredGroups configurations to allow for multiple asynchronous nodes to receive messages. But, I want to also allow for a messaging gateway to send and receive a response. – Keith Bennett Dec 14 '17 at 16:17
  • I see, but I really don't understand what and how overrides the `replyChannel` header to that `account-created` value. The `MessagingGateway` doesn't do that. There is no such a sample. I'll try to cook something. Although need to understand the requirements why would one do the request/reply with streaming?.. – Artem Bilan Dec 14 '17 at 16:24
  • Well, actually I requested **simple** Spring Boot project. I don't need the whole client one and I'm sure I won't understand it. Only what we need a short snippet in the scope of the problem to reproduce and play. – Artem Bilan Dec 14 '17 at 16:25
  • I want to do the request/reply in combination with streaming because I want all of the transactions-based services (i.e., those that create/update/delete data) to be asynchronous and using Spring Cloud Stream. At the same time, I want to be able to provide request/reply messaging gateways for clients who need to/want to wait for a response (e.g., a web client who sends the account data and waits for the account to be created so it can be displayed in the UI). Does that give you enough of an idea about what I'm trying to do? – Keith Bennett Dec 14 '17 at 16:34
  • M-m-m, my friend. I see you don't carry request headers to the reply. You even don't care about them in your `@StreamListener`. You have to copy request message headers into the reply message before sending to the `accountChannels.accountCreated()` – Artem Bilan Dec 14 '17 at 19:56
  • Which headers from the request message should I copy into the reply message? Another oddity is that until my request times out on the producer side (i.e., @MessagingGateway side), the mesage isn't received by the consumer. Once the request times out, I see the message hit my CREATE_ACCOUNT_REQUEST @StreamListener. The request processes and sends the response message, but I want the message to be sent back to the @MessagingGateway and at the same time be sent out over the existing channel to the asynchronous @StreamListeners that are identified in my configuration with `requiredGroups.` – Keith Bennett Dec 14 '17 at 20:12
  • Right, bring for us a simple app to let us understand your requirements and current PoC, otherwise it isn't clear. Would be better if you copy all the request header, but most important are `replyChannel` and `errorChannel` – Artem Bilan Dec 14 '17 at 20:15
  • See an UPDATE in my answer. – Artem Bilan Dec 14 '17 at 20:21
  • Artem, one more question. When using this approach, string to object conversion isn't happening via the Jackson message converter. Is there an additional configuration to the IntegrationFlow that needs to be done to get the required Content-Type configured? – Keith Bennett Dec 18 '17 at 19:51
  • There is `JsonToObjectTransformer` in SI on the matter to be used in the `IntegrationFlow`: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#json-transformers. But that all sounds like a fully separate story, not related to this Gateway subject. – Artem Bilan Dec 18 '17 at 20:09
  • When I get the message via the RabbitMQ UI, I see that the content-type is text/plain. That appears to probably be the culprit. Normally, when I build the REST controllers, I just specify the produces/consumes attributes on the mapping annotations and the correct headers are generated. When using Spring Integration, and specifically via the headerEnricherFlow example you provided, how is content-type negotiation done? – Keith Bennett Dec 18 '17 at 20:33
  • There is no "content-type negotiation" at all. We talk here only about some header changes. Nothing is done for `payload`. When you send the message to the `AccountChannels.CREATE_ACCOUNT_REQUEST`, that's where bidner does payload conversion and `content-type` header population. Not sure why do you think `HeaderEnricher` is some kind of bottleneck here... – Artem Bilan Dec 18 '17 at 20:47
  • I don't think it is. I am currently receiving `org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.lang.String] to type x.y.z.Account`, so it looks to me like the Accept/Content-Type headers aren't getting set in my code. I was just wondering if I had to set those headers within the HeaderEnricher or if I need to do that somewhere else. I apologize about my ignorance on SI as I'm still new to it. Given the solution, and SI in general, do you have advice on how best to set these headers so complex objects can be mapped? – Keith Bennett Dec 18 '17 at 20:52
  • Please, start a new SO thread. That's too complicate to figure out what's going on. Thanks – Artem Bilan Dec 18 '17 at 20:57
  • Artem, referring back to your comment: "Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory." My concern is that the @MessagingGateway service can't scale since there can only be one instance of it running. Is there any way to get this solution working with more than a single instance? – Keith Bennett Feb 05 '18 at 16:48
  • As I said before: this is home-made solution. There is no guarantee how it is going to work in any possible scenario. And right, some limitations might apply. You can scale though, as far as you have proper partition sticking. – Artem Bilan Feb 05 '18 at 16:59
2

With Artem's help, I've found the solution I was looking for. I have taken the code Artem posted and split it into two services, a Gateway service and a CloudStream service. I also added a @RestController for testing purposes. This essentially mimics what I was wanting to do with durable queues. Thanks Artem for your assistance! I really appreciate your time! I hope this helps others who want to do the same thing.

Gateway Code

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;

import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  interface GatewayChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String process(String payload);
  }

  private static final String ENRICH = "enrich";

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> getUser(@PathVariable("string") String string) {
      return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
    }
  }

}

Gateway Config (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
server:
  port: 8080

CloudStream Code

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> process(Message<String> request) {
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  }

}

CloudStream Config (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
server:
  port: 8081
Keith Bennett
  • 733
  • 11
  • 25
1

Hmm, I am a bit confused as well as to what you are trying to accomplish, but let's se if we can figure this out. Mixing SI and SCSt is definitely natural as one builds on another so all should work: Here is an example code snippet I just dug up from an old sample that exposes REST endpoint yet delegates (via Gateway) to Source's output channel. See if that helps:

@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
    . . . 

    @Autowired
    private Source channels;

    @Autowired
    private CompletionService completionService;

    @RequestMapping("/complete")
    public String completeRequest(@RequestParam int id) {
        this.completionService.complete("foo");
        return "OK";
    }

    @MessagingGateway
    interface CompletionService {
        @Gateway(requestChannel = Source.OUTPUT)
        void complete(String message);
    }
}
Oleg Zhurakousky
  • 5,820
  • 16
  • 17