Before raising the question: Spring Cloud Stream Kafka routing expression and Spring Cloud Function - Separate routing-expression for different Consumer
I am looking to send few orders and based on condition certain consumers only consumed it
application.yml
spring:
cloud:
stream:
bindings:
generateThreeOrderTypes-out-0:
destination: ordertopic
content-type: application/json
functionRouter-in-0:
destination: ordertopic
content-type: application/json
readOrder-in-0:
destination: orderprocessor
content-type: application/json
function:
routing:
enabled: true
function:
definition: generateThreeOrderTypes;readINDOrder;readUSAOrder
routing-expression: "headers['orderCountry'] == 'IND' ? 'readINDOrder' : 'readUSAOrder'"
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
MainApp.java
package com.example.demo;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import com.example.demo.model.Order;
import reactor.core.publisher.Flux;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Supplier<Flux<Message<Order>>> generateThreeOrderTypes(){
List<Message<Order>> orders = new ArrayList<>();
orders.add(MessageBuilder.withPayload(Order.builder()
.orderId(ThreadLocalRandom.current().nextLong(1, 100))
.userId(ThreadLocalRandom.current().nextLong(1, 100))
.productId(ThreadLocalRandom.current().nextLong(1, 100))
.build())
.setHeader("orderCountry", "IND")
.build());
orders.add(MessageBuilder.withPayload(Order.builder()
.orderId(ThreadLocalRandom.current().nextLong(1, 100))
.userId(ThreadLocalRandom.current().nextLong(1, 100))
.productId(ThreadLocalRandom.current().nextLong(1, 100))
.build())
.setHeader("orderCountry", "USA")
.build());
orders.add(MessageBuilder.withPayload(Order.builder()
.orderId(ThreadLocalRandom.current().nextLong(1, 100))
.userId(ThreadLocalRandom.current().nextLong(1, 100))
.productId(ThreadLocalRandom.current().nextLong(1, 100))
.build())
.setHeader("orderCountry", "IND")
.build());
return () -> Flux.fromIterable(orders);
}
@Bean
public Consumer<Order> readINDOrder(){
return order -> System.out.println("== Received Order (IND): "+ order + " at "+ Instant.now().toString());
}
@Bean
public Consumer<Order> readUSAOrder(){
return order -> System.out.println("-- Received Order (USA): "+ order + " at "+ Instant.now().toString());
}
}