1

I am fairly new to developing distributed applications with messaging, and to Spring Cloud Stream in particular. I am currently wondering about best practices on how to deal with errors on the broker side.

In our application, we need to both consume and produce messages from/to multiple sources/destinations like this: infrastructure

Consumer side

For consuming, we have defined multiple @Beans of type java.util.function.Consumer. The configuration for those looks like this:

spring.cloud.stream.bindings.consumeA-in-0.destination=inputA
spring.cloud.stream.bindings.consumeA-in-0.group=$Default
spring.cloud.stream.bindings.consumeB-in-0.destination=inputB
spring.cloud.stream.bindings.consumeB-in-0.group=$Default

This part works quite well - wenn starting the application, the exchanges "inputA" and "inputB" as well as the queues "inputA.$Default" and "inputB.$Default" with corresponding binding are automatically created in RabbitMQ. Also, in case of an error (e.g. a queue is suddenly not available), the application gets notified immediately with a QueuesNotAvailableException and continuously tries to re-establish the connection.

My only question here is: Is there some way to handle this exception in code? Or, what are best practices to deal with failures like this on broker side?

Producer side

This one is more problematic. Producing messages is triggered by some internal logic, we cannot use function @Beans here. Instead, we currently rely on StreamBridge to send messages. The problem is that this approach does not trigger creation of exchanges and queues on startup. So when our code calls streamBridge.send("outputA", message), the message is sent (result is true), but it just disappears into the void since RabbitMQ automatically drops unroutable messages.

I found that with this configuration, I can at least get RabbitMQ to create exchanges and queues as soon as the first message is sent:

spring.cloud.stream.source=produceA;produceB
spring.cloud.stream.default.producer.requiredGroups=$Default
spring.cloud.stream.bindings.produceA-out-0.destination=outputA
spring.cloud.stream.bindings.produceB-out-0.destination=outputB

I need to use streamBridge.send("produceA-out-0", message) in code to make it work, which is not too great since it means having explicit configuration hardcoded, but at least it works.
I also tried to implement the producer in a Reactor style as desribed in this answer, but in this case the exchange/queue also is not created on application startup and the sent message just disappears even though the return status of the sending method is "OK".

Failures on the broker side are not registered at all with this approach - when I simulate one e.g. by deleting the queue or the exchange, it is not registered by the application. Only when another message is sent, I get in the logs:

ERROR 21804 --- [127.0.0.1:32404] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'produceA-out-0' in vhost '/', class-id=60, method-id=40)

But still, the result of StreamBridge#send was true in this case. But we need to know that sending did actually fail at this point (we persist the state of the sent object using this boolean return value). Is there any way to accomplish that?

Any other suggestions on how to make this producer scenario more robust? Best practices?

EDIT

I found an interesting solution to the producer problem using correlations:

...
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
messageHeaderAccessor.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, correlation);
Message<String> message = MessageBuilder.createMessage(payload, messageHeaderAccessor.getMessageHeaders());

boolean sent = streamBridge.send(channel, message);

try {
    final CorrelationData.Confirm confirm = correlation.getFuture().get(30, TimeUnit.SECONDS);
    if (correlation.getReturned() == null && confirm.isAck()) {
        // success logic
    } else {
        // failed logic
    }

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    // failed logic
} catch (ExecutionException | TimeoutException e) {
    // failed logic
}

using these additional configurations:

spring.cloud.stream.rabbit.default.producer.useConfirmHeader=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

This seems to work quite well, although I'm still clueless about the return value of StreamBridge#send, it is always true and I cannot find information in which cases it would be false. But the rest is fine, I can get information on issues with the exchange or the queue from the correlation or the confirm.

But this solution is very much focused on RabbitMQ, which causes two problems:

  • our application should be able to connect to different brokers (e.g. Azure Service Bus)
  • in tests we use Kafka binder and I don't know how to configure the application context to make it work in this case, too

Any help would be appreciated.

codebat
  • 186
  • 1
  • 13

1 Answers1

0

On the consumer side, you can listen for an event such as the ListenerContainerConsumerFailedEvent.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#consumer-events

On the producer side, producers only know about exchanges, not any queues bound to them; hence the requiredGroups property which causes the queue to be bound.

You only need spring.cloud.stream.default.producer.requiredGroups=$Default - you can send to arbitrary destinations using the StreamBridge and the infrastructure will be created.

@SpringBootApplication
public class So70769305Application {

    public static void main(String[] args) {
        SpringApplication.run(So70769305Application.class, args);
    }

    @Bean
    ApplicationRunner runner(StreamBridge bridge) {
        return args -> bridge.send("foo", "test");
    }

}
spring.cloud.stream.default.producer.requiredGroups=$Default

enter image description here

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks, I will have a look into the listener. For the producer, yes I know that I can send to arbitrary destinations and they will be created on send. But what can I do if after the first message is sent to that one and then there is some failure (e.g. the exchange or queue is broken/lost/whatever) - is there any way to notice that in my application? I would need some kind of confirmation that the message was actually sent to an existing queue. – codebat Jan 20 '22 at 07:10
  • You need to use publisher confirms and returns to evaluate the success or failure of a delivery. https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.2.1/reference/html/spring-cloud-stream-binder-rabbit.html#publisher-confirms – Gary Russell Jan 20 '22 at 14:20
  • Right, I found out about that in the meantime. Please see my edited question. – codebat Jan 20 '22 at 15:04
  • 1
    There is no general binder abstraction for this functionality. – Gary Russell Jan 20 '22 at 15:08