8

Since version 3.1 the major API for working with queues is deprecated. In the class comment it says:

Deprecated as of 3.1 in favor of functional programming model

I searched a lot in the web for a solution but didn't find a solid E2E explanation on how I should migrate.

Looking for examples for:

  1. read from queue
  2. write to queue

If there are a few ways to do that (as I saw in web) I'd be glad for an explanation and the typical use case for each option as well.

orirab
  • 2,915
  • 1
  • 24
  • 48
Rami Loiferman
  • 853
  • 1
  • 6
  • 22
  • Does this answer your question? [@EnableBinding @deprecated as of 3.1 in favor of functional programming model](https://stackoverflow.com/questions/65576021/enablebinding-deprecated-as-of-3-1-in-favor-of-functional-programming-model) – szmozes Dec 17 '22 at 15:17

1 Answers1

30
  1. I'm assuming you are already familiar with the main concepts, and will focus on the migration.
  2. I'm using kotlin for the demo code, to reduce verbosity

First, some references which may help:

  • Here is the initial relevant doc: link
  • This is an explanation for the naming scheme in the new functional format: link
  • This is a more detailed explanation with some more advanced scenarios: link

TL;DR

Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you.

Input/Consumer

Whereas before you had code looking like this:

interface BindableGradesChannel {
    @Input
    fun gradesChannel(): SubscribableChannel

    companion object {
        const val INPUT = "gradesChannel"
    }
}

and the usage was similar to:

@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)
    
    @StreamListener(BindableScoresChannel.INPUT)
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

now the entire definition is irrelevant, and can be done like so:

@Service
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @Bean
    fun gradesChannel(): Consumer<Grade> {
        return Consumer { listen(grade = it) }
    }
    
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

notice how the Consumer bean replaced the @StreamListener and the @Input.

Regarding the configuration, if before in order to configure you had an application.yml looking like so:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

now it should be like so:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel-in-0:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

notice how gradesChannel was replaced by gradesChannel-in-0 - to understand the full naming convention please see the naming convention link at the top.

Some details:

  1. If you have more than one such bean in your application, you need to define the spring.cloud.function.definition property.
  2. You have the option to give your channels custom names, so if you'd like to continue using gradesChannel you can set spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel and use everywhere in the configuration gradesChannel.

Output/Supplier

The concept here is similar, you replace config and code looking like this:

interface BindableStudentsChannel {
    @Output
    fun studentsChannel(): MessageChannel
}

and

@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
    fun publish(message: Message<Student>) {
        studentsChannel.studentsChannel().send(message)
    }
}

can now be replaced by:

@Service
class StudentsQueueWriter {
    @Bean
    fun studentsChannel(): Supplier<Student> {
        return Supplier { Student("Adam") }
    }
}

As you can see, we have a major difference - when is it called and by who?

Before we could trigger it manually, but now it is triggered by spring, every second (by default). This is fine for use cases such as when you need to publish a sensor data every second, but this is not good when you want to send the message on an event. Besides using Function for whatever reason, spring offers 2 alternatives:

StreamBridge - link

Using StreamBridge you can. define the target explicitly like so:

@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
    fun publish(message: Message<Student>) {
        streamBridge.send("studentsChannel-out-0", message)
    }
}

This way you don't define the target channel as a bean, but you can still send the message. The downside is that you have some explicit configuration in your class.

Reactor API - link

The other way is to use some kind of reactive mechanism such as Sinks.Many, and to return it. Using this your code will look similar to:

@Service
class StudentsQueueWriter {
    val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
    @Bean
    fun studentsChannel(): Supplier<Flux<Student>> {
        return Supplier { students.asFlux() }
    }
}

and the usage may be similar to:

class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
    fun newStudent() {
        studentsQueueWriter.students.tryEmitNext(Student("Adam"))
    }
}
orirab
  • 2,915
  • 1
  • 24
  • 48
  • 3
    Also, please see the [relevant section in the doc](https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-producing-consuming-messages) as well as these two blog posts - https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified and https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive – Oleg Zhurakousky Jan 31 '21 at 15:18
  • nice refernces, thanks! may I add them to the reference section at the top? – orirab Feb 04 '21 at 08:47
  • If you think that helps, sure – Oleg Zhurakousky Feb 04 '21 at 13:24
  • Beautiful answer. Thanks! – DKWoo Mar 22 '21 at 08:31
  • Beautiful answer, with detailed example :) – ericlee Mar 26 '21 at 09:34
  • 1
    @orirab Unfortunately, `EmitterProcessor` is also deprecated now and will be removed soon. Do you have an example for another alternative? I want to avoid the explicit configuration in code like in the StreamBridge example. – codebat Jan 18 '22 at 08:33
  • @codebat I think you can try using `val students: Sinks.Many = Sinks.many().multicast().onBackpressureBuffer()` instead, and in the bean have `Supplier { students.asFlux() }` and the usage should be: `students.tryEmitNext(Student("Hello"))` – orirab Jan 18 '22 at 10:52
  • If this works for you, we can edit the question to reflect this – orirab Jan 18 '22 at 10:52
  • @orirab Hard to tell if it works - I implemented it as you suggested in my code, and the result of `tryEmitNext` is "OK", but no message shows up in rabbitmq. But I'm also not sure if the bindings are configured correctly. I think I will post a separate question regarding this. – codebat Jan 18 '22 at 15:39
  • Does it work if you use the deprecated EmitterProcessor? If so, the problem is with the Sink, otherwise it is probably a configuration error – orirab Jan 19 '22 at 14:12
  • Yeah apparently it's some config error, it also does not work with EmitterProcessor. Although I can see that the beans are created, no exchange or queue shows up and the messages just disappear. I will try to check the config. – codebat Jan 20 '22 at 10:14
  • can you mix the old producer (not refactored - who sends the output) and have the new functional consumer(who receives the input)? – Gligorije Nikolic Feb 14 '22 at 15:06
  • I think you can, but untested. Also take note - the old approach is deprecated, so in a few versions it will probably be removed. – orirab Feb 14 '22 at 17:56
  • btw, I checked removing the `EmitterProcessor` and it works - I'll update the answer to use the non-deprecated option – orirab Feb 21 '22 at 18:56