1

I have one scheduler which produces one event. My consumer consumes this event. The payload of this event is a json with below fields:

private String topic;
private String partition;
private String filterKey;
private long CustId;  

Now I need to trigger one more consumer which will take all this information which I get a response from first consumer.

@KafkaListener(topics = "<**topic-name-from-first-consumer-response**>", groupId = "group" containerFactory = "kafkaListenerFactory")
    public void consumeJson(List<User> data, Acknowledgment acknowledgment,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                            @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
                            // consumer code goes here...}

I need to create some dynamic variable which I can pass in place of topic name.

similarly, I am using the filtering in the configuration file and I need to pass key dynamically in the configuration.

factory.setRecordFilterStrategy(new RecordFilterStrategy<String, Object>() {
            @Override
            public boolean filter(ConsumerRecord<String, Object> consumerRecord) {
                if(consumerRecord.key().equals("**Key will go here**")) {
                    return false;
                }
                else {
                    return true;
                }
            }

        });

How can we dynamically inject these values from the response of first consumer and trigger the second consumer. Both the consumers are in same application

Sumit Sood
  • 441
  • 7
  • 23

1 Answers1

3

You cannot do that with an annotated listener, the configuration is only used during initialization; you would need to create a listener container yourself (using the ConcurrentKafkaListenerContainerFactory) to dynamically create a listener.

EDIT

Here's an example.

@SpringBootApplication
public class So69134055Application {

    public static void main(String[] args) {
        SpringApplication.run(So69134055Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69134055").partitions(1).replicas(1).build();
    }

}

@Component
class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    private static final Method otherListen;

    static {
        try {
            otherListen = Listener.class.getDeclaredMethod("otherListen", List.class);
        }
        catch (NoSuchMethodException | SecurityException ex) {
            throw new IllegalStateException(ex);
        }
    }

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private final MessageHandlerMethodFactory methodFactory;

    private final KafkaAdmin admin;

    private final KafkaTemplate<String, String> template;

    public Listener(ConcurrentKafkaListenerContainerFactory<String, String> factory, KafkaAdmin admin,
            KafkaTemplate<String, String> template, KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp) {

        this.factory = factory;
        this.admin = admin;
        this.template = template;
        this.methodFactory = bpp.getMessageHandlerMethodFactory();
    }

    @KafkaListener(id = "so69134055", topics = "so69134055")
    public void listen(String topicName) {
        try (AdminClient client = AdminClient.create(this.admin.getConfigurationProperties())) {
            NewTopic topic = TopicBuilder.name(topicName).build();
            client.createTopics(List.of(topic)).all().get(10, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            log.error("Failed to create topic", e);
        }
        ConcurrentMessageListenerContainer<String, String> container =
                this.factory.createContainer(new TopicPartitionOffset(topicName, 0));
        BatchMessagingMessageListenerAdapter<String, String> adapter =
                new BatchMessagingMessageListenerAdapter<>(this, otherListen);
        adapter.setHandlerMethod(new HandlerAdapter(
                this.methodFactory.createInvocableHandlerMethod(this, otherListen)));
        FilteringBatchMessageListenerAdapter<String, String> filtered =
                new FilteringBatchMessageListenerAdapter<>(adapter, record -> !record.key().equals("foo"));
        container.getContainerProperties().setMessageListener(filtered);
        container.getContainerProperties().setGroupId("group.for." + topicName);
        container.setBeanName(topicName + ".container");
        container.start();
        IntStream.range(0, 10).forEach(i -> this.template.send(topicName, 0, i % 2 == 0 ? "foo" : "bar", "test" + i));
    }

    void otherListen(List<String> others) {
        log.info("Others: {}", others);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

Output - showing that the filter was applied to the records with bar in the key.

Others: [test0, test2, test4, test6, test8]
Gary Russell
  • 166,535
  • 14
  • 146
  • 179