1

Regarding the answer posted for How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?, what is the correct way to handle errors at the @MessagingGateway that can be returned from Spring Cloud Stream services?

To recap, I have a @MessagingGateway that provides synchronous access to asynchronous services built using Spring Cloud Stream. When an error occurs within my Spring Cloud Stream service layer, I create an error response and send it through a SubscribableChannel to other @StreamListener services that process the errors.

For example, when an account is created, I send a message to the accountCreated channel. When an error occurs I send an error response to the accountNotCreated channel.

This works fine, but I also want send an error response to the client of the @MessagingGateway so they receive the error response synchronously. The @MessagingGateway annotation has an errorChannel attribute, but the @Gateway annotation does not. So, the client of the @MessagingGateway should be able to block and wait for either 1) an account to be created or 2) an error response.

Again, the goal here is to build "backend" services that utilize Spring Cloud Stream for transactional services (i.e., those that create, update, or delete data) while at the same time provide our clients "gateway" access that block and wait for the responses to be returned. The solution Artem Bilan provided me works for the happy path, but when an error occurs, that's where I am not clear on how Spring Integration is best suited to handle this.

UPDATE with code example

GatewayApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  @Component
  public interface GatewayChannels {
    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    public static final String ENRICH = "enrich";

    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    StringWrapper process(StringWrapper payload) throws MyException;
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
      try {
        StringWrapper result = gateway.process(new StringWrapper(string));
        // Instead of catching the exception in the below catch clause, here we have just a string
        // representation of the stack trace when an exception occurs.
        return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
      } catch (MyException e) {
        // Why is the exception not caught here?
        return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
            HttpStatus.INTERNAL_SERVER_ERROR);
      }
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(StreamGateway.ENRICH)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StringWrapper.java (use across all three projects)

package com.example.demo;

import com.fasterxml.jackson.annotation.JsonProperty;

public class StringWrapper {
  @JsonProperty
  private String string;

  @JsonProperty
  private long time = System.currentTimeMillis();

  public StringWrapper() {
    super();
  }

  public StringWrapper(String string) {
    this.string = string;
  }

  public String getString() {
    return string;
  }


  public long getTime() {
    return time;
  }

  public void setString(String string) {
    this.string = string;
  }

  @Override
  public String toString() {
    return "StringWrapper [string=" + string + ", time=" + time + "]";
  }

}

CloudStreamApplication.java

package com.example.demo;

import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @Component
  public class Processor {

    @Autowired
    CloudStreamChannels channels;

    @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
    public void process(Message<StringWrapper> request) {
      StringWrapper uppercase = null;
      try {
        uppercase = toUppercase(request);
      } catch (MyException e) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
                .copyHeaders(request.getHeaders()).build());
      }
      if (uppercase != null) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(uppercase)
                .setHeader("__TypeId__", StringWrapper.class.getName())
                .copyHeaders(request.getHeaders()).build());
      }
    }

    private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
      Random random = new Random();
      int number = random.nextInt(50) + 1;
      if (number > 25) {
        throw new MyException("An error occurred.");
      }
      return new StringWrapper(request.getPayload().getString().toUpperCase());
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StreamListenerApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

  }

  public static void main(String[] args) {
    SpringApplication.run(StreamListenerApplication.class, args);
  }

  @Autowired
  CloudStreamChannels channels;

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public void processToUppercaseReply(Message<StringWrapper> message) {
    System.out.println("Processing message: " + message.getPayload());
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          group: stream-listener-to-uppercase-reply
server:
  port: 8089
Keith Bennett
  • 733
  • 11
  • 25

1 Answers1

3

There is only one global errorChannel on @MessagingGateway that is used for all @Gateway methods. If you have a gateway with multiple @Gateway methods, each method can set a message header to indicate which method failed.

If you send a Message<Throwable> to the gateway's reply channel (and there is no error channel) the payload will be thrown to the caller.

If the gateway method has a throws clause, an attempt to unwrap the cause tree is made looking for that exception.

If you add an errorChannel, instead of throwing the exception to the caller, an ErrorMessage with the exception as its payload is sent to the error channel - you can then do any further post-processing on the error channel flow and throw some other exception to the caller if desired. It sounds like you don't need that, though.

So, putting it all together...

  1. Have the error handling service send some message to another destination.
  2. In the gateway service, add a @StreamListener for that destination.
  3. In the @StreamListener construct a Message with an Exception payload and send it to the gateway's reply channel.
  4. The gateway will then throw the payload to the caller.

Something like this should work...

@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload) throws MyException;

.

@StreamListener(CloudStreamChannels.TO_UPPERCASE_FAILURES)
public void failed(Message<FailInfo> failed) { // payload has info about the failure
    Message m = MessageBuilder.withPayload(new MyException(failed.getPayload())).
         .copyHeaders(failed.getHeaders())
         .build();
    this.reply.send(m); // send directly to the gateway's reply channel (perhaps @Autowired)
}

It's important to propagate the reply channel header end to end, regardless of how many remote services are involved.

EDIT

@SpringBootApplication
@EnableBinding(TwoAsyncPipes.class)
public class So47948454aApplication {

    public static void main(String[] args) {
        SpringApplication.run(So47948454aApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            System.out.println(gate.foo(new Foo("foo")));
            try {
                gate.foo(new Foo("fail"));
            }
            catch (MyException e) {
                System.out.println(e);
            }
        };
    }

    @MessagingGateway
    public interface Gate {

        @Gateway(requestChannel = "enrich", replyChannel = "transformed")
        Foo foo(Foo foo) throws MyException;

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from("enrich")
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel("gateOut").get();
    }

    @Bean
    public MessageChannel transformed() {
        return new DirectChannel();
    }

    @Transformer(inputChannel = "gateIn", outputChannel = "transformed")
    public Object jsonToObject(Message<?> in) {
        return jtot().transform(in);
    }

    @Bean
    public JsonToObjectTransformer jtot() {
        return new JsonToObjectTransformer();
    }

    @StreamListener("serviceIn")
    @SendTo("serviceOut")
    public Message<?> listen(Foo in) {
        if (in.foo.equals("fail")) {
            return MessageBuilder.withPayload(new MyException("failed"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            MyException.class.getName())
                    .build();
        }
        else {
            return MessageBuilder.withPayload(new Foo("bar"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            Foo.class.getName())
                    .build();
        }
    }

    public static class Foo {

        String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    @SuppressWarnings("serial")
    public static class MyException extends RuntimeException {

        private String error;

        public MyException() {
            super();
        }

        public MyException(String error) {
            this.error = error;
        }

        public String getError() {
            return this.error;
        }

        public void setError(String error) {
            this.error = error;
        }

        @Override
        public String toString() {
            return "MyException [error=" + this.error + "]";
        }

    }

    public interface TwoAsyncPipes {

        @Output("gateOut")
        MessageChannel gateOut();

        @Input("serviceIn")
        MessageChannel serviceIn();

        @Output("serviceOut")
        MessageChannel serviceOut();

        @Input("gateIn")
        MessageChannel gateIn();

    }

}

and

Foo [foo=bar]
MyException [error=failed]

POM

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>com.example</groupId>
<artifactId>so47948454a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>so47948454a</name>
<description>Demo project for Spring Boot</description>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

Rabbit binder 1.3.0.RELEASE Spring Integration 4.3.12

2017-12-26 13:56:18.121  INFO 39008 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SpringAMQP#7e87ef9e:0/SimpleConnection@45843650 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60995]
Foo [foo=bar]
MyException [error=failed]
2017-12-26 13:56:18.165  INFO 39008 --- [           main] com.example.So47948454aApplication       : Started So47948454aApplication in 3.422 seconds (JVM running for 3.858)

application.yml:

spring:
  cloud:
    stream:
      bindings:
        gateIn:
          destination: serviceOut
          content-type: application/json
        gateOut:
          destination: serviceIn
          content-type: application/json
        serviceIn:
          destination: serviceIn
          content-type: application/json
        serviceOut:
          destination: serviceOut
          content-type: application/json
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks for your feedback, Gary. I have put together some example code and included it as an update in my original post. This is similar to what I'm attempting to accomplish, and I'm not sure why within GatewayApplication.java I'm not hitting the catch block of the @RestController class. Based on your answer, I would think this should be working. Do you see anything I'm forgetting? – Keith Bennett Dec 26 '17 at 16:08
  • Exceptions are not generally JSON-friendly (e.g. no-arg CTOR). That's why I suggested sending a plain message and building the exception locally. Are you seeing any errors in the logs on the gateway server? What happens if you add a no-arg CTOR and a setter to `MyException` ? Running both sides with DEBUG logging should help. – Gary Russell Dec 26 '17 at 16:53
  • Gary, I just updated my code again. It was a little off. I was referencing String instead of StringWrapper in a couple places. Now I'm seeing the following error: `{ "timestamp": 1514307185336, "status": 500, "error": "Internal Server Error", "exception": "org.springframework.core.convert.ConverterNotFoundException", "message": "No converter found capable of converting from type [java.lang.String] to type [com.example.demo.StringWrapper]", "path": "/string/hellobuddy" }`. I don't want to add another issue here, but can you run my example code to see the problem? – Keith Bennett Dec 26 '17 at 17:02
  • Does the happy path work? I don't see how the gateway service is getting any clues as to what object to unmarshall the JSON to. In any case, since you have two return types (`StringWrapper` and `MyException`) you will need more sophistication, such as a transformer and/or router on the reply channel path. – Gary Russell Dec 26 '17 at 17:22
  • When the return type is String, then the happy path works. When I have anything more complex, like StringWrapper, I get the converter exception. I guess I need some pointers on how best to handle the converter exception (String to StringWrapper) and how to propagate MyException back to the gateway. I was expecting the gateway to do one of two things: 1) either the StringWrapper is returned on the reply channel or 2) MyException is thrown by the framework. Isn't that what should be happening based on my code example? What is the correct way to inform the gateway what object to unmarshal to? – Keith Bennett Dec 26 '17 at 17:30
  • There are two ways to do it - use a custom converter in the conversion service that gets injected into the gateway - the converter would have to know (or figure out) whether to unmarshall to a `StringWrapper` or a `MyException`. The second way is to add a transformer between the stream channel and the reply channel; again, it would have to know which object to unmarshall to. What binder are you using? I can try to find some time to create an example. You can send a hint in a message header. – Gary Russell Dec 26 '17 at 17:42
  • See [JSON Transformers](https://docs.spring.io/spring-integration/reference/html/messaging-transformation-chapter.html#json-transformers); you can set the `__TypeId__` header to the required class name and the transformer will unmarshall to that. – Gary Russell Dec 26 '17 at 17:44
  • I'm using the RabbitMQ binder. I'd really appreciate it if you could take my example and modify and/or extend it to make what you are describing work. – Keith Bennett Dec 26 '17 at 17:46
  • I added the `__TypeId__` header and updated my example code. It didn't make a difference as I'm still getting the conversion exception. – Keith Bennett Dec 26 '17 at 18:01
  • You need a transformer for that to work; see my edit - it's all in one class, but hopefully self-explanatory. – Gary Russell Dec 26 '17 at 18:21
  • Gary, what versions of Spring Cloud/Spring Integration are you using? Running your example code, the app hangs and the last thing printed to the console is `Created new connection: SpringAMQP#300bb303:0/SimpleConnection@d1c5cf2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 59455]`. – Keith Bennett Dec 26 '17 at 18:47
  • Version shouldn't make a difference; I added the pom and YAML; I am using the latest boot and cloud releases. I suspect you'r hanging waiting for a reply; add a timeout to the gateway. – Gary Russell Dec 26 '17 at 18:58
  • Thanks, Gary. Your example helps a lot. Now, to wrap this thing up, how can I have a separate `@StreamListener` listening on the `gateIn` reply channel, not knowing whether a Foo response or a MyException will be sent over that channel? This will handle the case where we want to add one to many listeners for events and handle both the happy and exception paths. – Keith Bennett Dec 26 '17 at 20:39
  • 1
    My very first example in my answer did exactly that (added a stream listener for a different destination for the exceptions. Just `@Autowired` a `MessageChannel` with the reply channel's name and send the exception to it. – Gary Russell Dec 26 '17 at 22:34