I am fairly new to developing distributed applications with messaging, and to Spring Cloud Stream in particular. I am currently wondering about best practices on how to deal with errors on the broker side.
In our application, we need to both consume and produce messages from/to multiple sources/destinations like this:
Consumer side
For consuming, we have defined multiple @Bean
s of type java.util.function.Consumer
. The configuration for those looks like this:
spring.cloud.stream.bindings.consumeA-in-0.destination=inputA
spring.cloud.stream.bindings.consumeA-in-0.group=$Default
spring.cloud.stream.bindings.consumeB-in-0.destination=inputB
spring.cloud.stream.bindings.consumeB-in-0.group=$Default
This part works quite well - wenn starting the application, the exchanges "inputA" and "inputB" as well as the queues "inputA.$Default" and "inputB.$Default" with corresponding binding are automatically created in RabbitMQ.
Also, in case of an error (e.g. a queue is suddenly not available), the application gets notified immediately with a QueuesNotAvailableException
and continuously tries to re-establish the connection.
My only question here is: Is there some way to handle this exception in code? Or, what are best practices to deal with failures like this on broker side?
Producer side
This one is more problematic. Producing messages is triggered by some internal logic, we cannot use function @Bean
s here. Instead, we currently rely on StreamBridge
to send messages. The problem is that this approach does not trigger creation of exchanges and queues on startup. So when our code calls streamBridge.send("outputA", message)
, the message is sent (result is true
), but it just disappears into the void since RabbitMQ automatically drops unroutable messages.
I found that with this configuration, I can at least get RabbitMQ to create exchanges and queues as soon as the first message is sent:
spring.cloud.stream.source=produceA;produceB
spring.cloud.stream.default.producer.requiredGroups=$Default
spring.cloud.stream.bindings.produceA-out-0.destination=outputA
spring.cloud.stream.bindings.produceB-out-0.destination=outputB
I need to use streamBridge.send("produceA-out-0", message)
in code to make it work, which is not too great since it means having explicit configuration hardcoded, but at least it works.
I also tried to implement the producer in a Reactor style as desribed in this answer, but in this case the exchange/queue also is not created on application startup and the sent message just disappears even though the return status of the sending method is "OK".
Failures on the broker side are not registered at all with this approach - when I simulate one e.g. by deleting the queue or the exchange, it is not registered by the application. Only when another message is sent, I get in the logs:
ERROR 21804 --- [127.0.0.1:32404] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'produceA-out-0' in vhost '/', class-id=60, method-id=40)
But still, the result of StreamBridge#send
was true
in this case. But we need to know that sending did actually fail at this point (we persist the state of the sent object using this boolean return value). Is there any way to accomplish that?
Any other suggestions on how to make this producer scenario more robust? Best practices?
EDIT
I found an interesting solution to the producer problem using correlations:
...
CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
messageHeaderAccessor.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, correlation);
Message<String> message = MessageBuilder.createMessage(payload, messageHeaderAccessor.getMessageHeaders());
boolean sent = streamBridge.send(channel, message);
try {
final CorrelationData.Confirm confirm = correlation.getFuture().get(30, TimeUnit.SECONDS);
if (correlation.getReturned() == null && confirm.isAck()) {
// success logic
} else {
// failed logic
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// failed logic
} catch (ExecutionException | TimeoutException e) {
// failed logic
}
using these additional configurations:
spring.cloud.stream.rabbit.default.producer.useConfirmHeader=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
This seems to work quite well, although I'm still clueless about the return value of StreamBridge#send
, it is always true and I cannot find information in which cases it would be false. But the rest is fine, I can get information on issues with the exchange or the queue from the correlation or the confirm.
But this solution is very much focused on RabbitMQ, which causes two problems:
- our application should be able to connect to different brokers (e.g. Azure Service Bus)
- in tests we use Kafka binder and I don't know how to configure the application context to make it work in this case, too
Any help would be appreciated.