I have been using @StreamListener to consume single messages, Now I want to consume in a batch. As per documentation, @StreamListener doesn't work with batches, So moving to functional style, but I am not able to consume the message. When I run the application, I don't see these kafka properties set, like I see for other consumers which are using @Streamlistener
Here is my code:
@Component
@Slf4j
@Configuration
@RequiredArgsConstructor
@Profile({"processing"})
public class ActivityBatchSubscriber {
public final ActivityBatchProcessor activityBatchProcessor;
@Bean
public Consumer<List<Message<Event>>> nodeConfigEvents() {
return messages -> {
try {
log.info("Received activity message with message length {} attempt",
messages.size());
Message<Event> eventGenericMessage = messages.get(0);
Event payload = eventGenericMessage.getPayload();
log.info("payload" + payload);
List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
activityBatchProcessor.processActivity(eventList);
} catch (Exception e) {
log.error(e.getMessage());
}
};
}
}
In "processing" profile , have these properties.
spring.cloud.stream.function.definition=nodeConfigEvents
spring.cloud.stream.bindings.nodeConfigEvents-in-0.destination=TOPIC-NAME
spring.cloud.stream.bindings.nodeConfigEvents-in-0.contentType=application/json
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.nodeConfigEvents-in-0.group=LOCAL1
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.auto-startup=true
spring-cloud-stream-kafka version ---> 3.0.x
What am I doing wrong? Stream binding is not registered for this functional model, I see this question, having similar issue, though In comment @Oleg mentioned that it's fixed on 3.0 snapshot, and I am using 3.0.11.
My application is logging this info,
Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration
Can I not use both in the same project? Though they all are in different profiles.