0

I have some issues with the new routing feature in spring cloud stream

I tried to implement a simple scenario, I want to send a message with a header spring.cloud.function.definition = consume1 or consume2

I expect that consume1 or consume2 should be called based on what is sent on the header but the methods are called randomly.

I send the message to the exchange consumer using the rabbit admin console

I'm having the following logs:

2020-02-27 14:48:25.896  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume1 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=9a4dff25-88ef-4d76-93e2-c8719cda122d, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, sourceData=(Body:'[B@3a92faa7(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-gGChFNCKIVd25yyR9H6-fQ, consumerQueue=consumer.app]), timestamp=1582811303347}]]
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-02-27 14:48:25.984  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-02-27 14:48:25.991  INFO 22132 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 7 ms
2020-02-27 14:48:26.037  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-1
2020-02-27 14:48:26.111  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-1' has 1 subscriber(s).
2020-02-27 14:48:26.116  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2020-02-27 14:48:26.123  INFO 22132 --- [oundedElastic-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory.publisher#32438e24:0/SimpleConnection@3e58666d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62514]
2020-02-27 14:48:26.139  INFO 22132 --- [-1.customer-1-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:48:26.140  INFO 22132 --- [-1.customer-1-1] com.example.demo.TestSink                : Data received customer-1...body
2020-02-27 14:49:14.185  INFO 22132 --- [ consumer.app-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.194  INFO 22132 --- [ consumer.app-1] com.example.demo.TestConsumer            : ==============>consume2 messge [[payload=ok, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=#, amqp_receivedExchange=consumer, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=consumer.app, amqp_redelivered=false, id=33581edb-2832-1c92-b765-a05794512b34, spring.cloud.function.definition=consume1, amqp_consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, sourceData=(Body:'[B@8159793(byte[2])' MessageProperties [headers={spring.cloud.function.definition=consume1}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=consumer, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-RIp2nZdcG2a0hNQeImwtBw, consumerQueue=consumer.app]), timestamp=1582811354186}]]
2020-02-27 14:49:14.203  INFO 22132 --- [oundedElastic-1] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel customer-2
2020-02-27 14:49:14.213  INFO 22132 --- [oundedElastic-1] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.customer-2' has 1 subscriber(s).
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
2020-02-27 14:49:14.216  INFO 22132 --- [-2.customer-2-1] com.example.demo.TestSink                : Data received customer-2...body

application.yml

spring:
  main:
    allow-bean-definition-overriding: true
spring.cloud.stream:
  function.definition: supplier;receive1;receive2;consume1;consume2
  function.routing:
    enabled: true

  bindings:
    consume1-in-0.destination: consumer
    consume1-in-0.group: app
    consume2-in-0.destination: consumer
    consume2-in-0.group: app
    receive1-in-0.destination: customer-1
    receive1-in-0.group: customer-1
    receive2-in-0.destination: customer-2
    receive2-in-0.group: customer-2

DemoApplication.java

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.commons.logging.Log
import org.apache.commons.logging.LogFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.HttpStatus
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import java.util.function.Consumer
import java.util.function.Supplier


@SpringBootApplication
class DemoApplication

fun main(args: Array<String>) {
    runApplication<DemoApplication>(*args)
}

@RestController
class DynamicDestinationController(private val jsonMapper: ObjectMapper) {

    private val processor: EmitterProcessor<Message<String>> = EmitterProcessor.create<Message<String>>()

    @RequestMapping(path = ["/api/dest/{destName}"], method = [GET], consumes = ["*/*"])
    @ResponseStatus(HttpStatus.ACCEPTED)
    fun handleRequest(@PathVariable destName:String) {
        val message: Message<String> = MessageBuilder.withPayload("body")
                .setHeader("spring.cloud.stream.sendto.destination", destName).build()
        processor.onNext(message)
    }

    @Bean
    fun supplier(): Supplier<Flux<Message<String>>> {
        return Supplier { processor }
    }
}

const val destResourceUrl = "http://localhost:8080/api/dest"
@Component
class TestConsumer() {

    private val restTemplate: RestTemplate = RestTemplate()
    private val logger: Log = LogFactory.getLog(javaClass)

    @Bean
    fun consume1(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume1 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-1", String::class.java)
    }

    @Bean
    fun consume2(): Consumer<Message<String>> = Consumer {
        logger.info("==============>consume2 messge [[payload=${it.payload}, headers=${it.headers}]]")
        restTemplate.getForEntity("$destResourceUrl/customer-2", String::class.java)
    }
}


@Component
class TestSink {
    private val logger: Log = LogFactory.getLog(javaClass)
    @Bean
    fun receive1(): Consumer<String> = Consumer {
        logger.info("Data received customer-1..." + it);
    }

    @Bean
    fun receive2(): Consumer<String> = Consumer {
        logger.info("Data received customer-2..." + it);
    }
}

Any idea how to fix the route to consumer?

thanks in advance.

demo-repo

GUISSOUMA Issam
  • 2,572
  • 2
  • 16
  • 27
  • why do you have source, processor and sink all in a single app? I mean is it just for testing and the intention is to break it apart? – Oleg Zhurakousky Feb 27 '20 at 13:55
  • This is just a test, but in a real project I need to have multiple functions in the same app, I'm studing the migration to the new functional paradigm. on the current project in a single app I have multiple Binding input and output with condions on inputs... – GUISSOUMA Issam Feb 27 '20 at 14:00
  • The reason why I asked is that it will not work at the moment with how you approach it. The EmitterProcessor is tied to a single Supplier, hence the data always goes there and so on. . . The good news is that very shortly we'll merge an issue that would allow you to do that. Basically once [this](https://github.com/spring-cloud/spring-cloud-stream/pull/1918#pullrequestreview-365688063) is merged, I'll follow up here with complete example. – Oleg Zhurakousky Feb 27 '20 at 14:35
  • Ok thanks for the answer. – GUISSOUMA Issam Feb 27 '20 at 15:05
  • I'm not sure my problem is the EmitterProcessor, can you please see this simplified app https://github.com/iguissouma/cloud-stream-functions-sample and if you can confirm if it's the same problem, I published a message in the queue rabbit ha_consumer_queue with a header spring.cloud.function.definition=simpleConsumer1 but the method executed is not always simpleConsumer1? thanks in advance – GUISSOUMA Issam Feb 27 '20 at 16:49

2 Answers2

2

Actually I am a bit confused, so let's do one step at the time. Here is the functioning (modelled after yours) app which uses sendto feature allowing you to send messages to the specific (existing and/or dynamically resolved) destinations.

(in java but you can rework it to Kotlin)

@Controller
public class WebSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebSourceApplication.class,
                "--spring.cloud.function.definition=supplier;consA;consB",
                "--spring.cloud.stream.bindings.consA-in-0.destination=consumerA",
                "--spring.cloud.stream.bindings.consA-in-0.group=consumerA-grp",
                "--spring.cloud.stream.bindings.consB-in-0.destination=consumerB",
                "--spring.cloud.stream.bindings.consB-in-0.group=consumerB-grp"
                );
    }

    EmitterProcessor<Message<String>> processor = EmitterProcessor.create();

    @RequestMapping(path = "/api/dest/{destName}", consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void delegateToSupplier(@RequestBody String body, @PathVariable String destName) {
        Message<String>  message = MessageBuilder.withPayload(body)
            .setHeader("spring.cloud.stream.sendto.destination", destName)
            .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<Message<String>>> supplier() {
        return () -> processor;
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

And when i curl it i get consistent invocation pr the appropriate consumer:

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerA
log: Consuming from consA:  Hello Spring Cloud Stream
. . .

curl -H "Content-Type: application/json" -X POST -d "Hello Spring Cloud Stream" http://localhost:8080/api/dest/consumerB
log: Consuming from consB:  Hello Spring Cloud Stream

Notice: There is no enable routing property. That feature is mainly aimed to always call one function functionRouter and have it call other functions on your behalf. It is a feature of spring-cloud-function which means it works outside of spring-cloud-srteam and channels/destinations etc.

Isn't that what you are trying to accomplish? Send message to a different destination based on some oath variable in your HTTP request?

Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • the send to is working fine, what I want to achieve is like the old style one input with two listeners and conditions but I did't get how the route to consumer is working – GUISSOUMA Issam Feb 27 '20 at 18:47
  • So I added TestConsumer with two functions consume1 and cosume2, how another microservice call them? with the old style I send a message to that input and the method with the condition verified is executed. how to achieve same behaviour? – GUISSOUMA Issam Feb 27 '20 at 18:50
  • Your issue is that you are testing all in a single app which goes agains the design of the framework and creates certain conflicts. For example you can't really have `spring.cloud.function.definition` and `spring.cloud.stream.function.routing.enabled` properties. So let me post a different example. But that is why I asked initially why do you have all that in a single app? – Oleg Zhurakousky Feb 27 '20 at 18:52
1

Here is an example of a different microservice which receives on routing function which hen routes to different functions

public class FunctionRoutingApplication {

    public static void main(String[] args) {
        SpringApplication.run(FunctionRoutingApplication.class,
                "--spring.cloud.stream.function.routing.enabled=true"
                );
    }

    @Bean
    public Consumer<String> consA() {
        return v -> {
            System.out.println("Consuming from consA:  " + v);
        };
    }

    @Bean
    public Consumer<String> consB() {
        return v -> {
            System.out.println("Consuming from consB:  " + v);
        };
    }
}

And that's pretty much it. Go to your broker and send data to functionRouter-in-0 exchange while providing spring.cloud.function.definition=consA/consB headers and you will see consistent invocations.

Am I still missing something?

Oleg Zhurakousky
  • 5,820
  • 16
  • 17
  • the functionRouter-in-0 exchange should be created automatically or manually? it can be shared by other microservices? my initial requirement was to create a microservice that schedule the send of messages to others microservice(that was the need of dynamic destination), the microservice can schedule/reschedule/unschedule the send of message so I want to have one input channel and based on routing expression the method is called – GUISSOUMA Issam Feb 27 '20 at 19:46
  • The name `functionRouter` is [constant](https://github.com/spring-cloud/spring-cloud-function/blob/master/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java#L54), the `-in-0` is it's default binding name and destination if not [overriden](https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_binding_and_binding_names). The destination(exchanges, queues, topics etc) as any other destinations are either created (if don't exist) or we use existing. – Oleg Zhurakousky Feb 27 '20 at 19:53
  • Also, may be this will help. The output of function can be routed to destinations via `spring.cloud.stream.sendto.destination` property. The input on the other hand which arrives onto a destination can be routed to a specific function by the RoutingFunction based on rules described here https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing. – Oleg Zhurakousky Feb 27 '20 at 19:56
  • Thank you for taking time to explain all this. – GUISSOUMA Issam Feb 27 '20 at 20:32
  • What, if I'm receiving two different types of messages, which have different header fields and values ? e.g. my consuming service receives the following messages (on an "order" queue and a "shipment" queue): `Order1: { Header: {catalog:groceries} } , Order2: { Header: {catalog:tools} }` , `Shipment1: { Header: {region:Europe} }, Shipment2: { Header: {region:America} }` I will have 2 bindings for the 2 queues, but I can not route each message to their own Consumer Function, because I can set the routing condition only globally for either `headers['catalog']` or `headers['region']`. – Danny Apr 09 '20 at 22:29
  • You can set conditions within message headers (not jus globally). It's all explained in the documentation linked earlier. Also keep in mind, SO discourages long comment threads especially if you are changing the original question (however slightly). If you still believe you have a question feel free to ask it as a new question and optionally link to this one for context. – Oleg Zhurakousky Apr 10 '20 at 06:17
  • Thank you for the quick answer. I posted that as a new question with some deeper questioning https://stackoverflow.com/questions/61135632/spring-cloud-function-separate-routing-expression-for-different-consumer – Danny Apr 10 '20 at 07:18