2

I have been using @StreamListener to consume single messages, Now I want to consume in a batch. As per documentation, @StreamListener doesn't work with batches, So moving to functional style, but I am not able to consume the message. When I run the application, I don't see these kafka properties set, like I see for other consumers which are using @Streamlistener

Here is my code:

@Component
@Slf4j
@Configuration
@RequiredArgsConstructor
@Profile({"processing"})
public class ActivityBatchSubscriber {

  public final ActivityBatchProcessor activityBatchProcessor;
 

  @Bean
  public Consumer<List<Message<Event>>> nodeConfigEvents() {
  
     return messages -> {
         try {
            log.info("Received activity message with message length {} attempt",
            messages.size());
            Message<Event> eventGenericMessage = messages.get(0);
            Event payload = eventGenericMessage.getPayload();
            log.info("payload" + payload);
             List<Event> eventList = messages.stream().map(Message::getPayload).collect(Collectors.toList());
          activityBatchProcessor.processActivity(eventList);

        } catch (Exception e) {
           log.error(e.getMessage());
        }
     };
   }
}

In "processing" profile , have these properties.

spring.cloud.stream.function.definition=nodeConfigEvents
spring.cloud.stream.bindings.nodeConfigEvents-in-0.destination=TOPIC-NAME
spring.cloud.stream.bindings.nodeConfigEvents-in-0.contentType=application/json
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.batch-mode=true
spring.cloud.stream.bindings.nodeConfigEvents-in-0.group=LOCAL1
spring.cloud.stream.bindings.nodeConfigEvents-in-0.consumer.auto-startup=true

spring-cloud-stream-kafka version ---> 3.0.x

What am I doing wrong? Stream binding is not registered for this functional model, I see this question, having similar issue, though In comment @Oleg mentioned that it's fixed on 3.0 snapshot, and I am using 3.0.11.

My application is logging this info,

Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration

Can I not use both in the same project? Though they all are in different profiles.

@Oleg Zhurakousky @Gary

APK
  • 155
  • 1
  • 15
  • Does this spring-cloud-stream-binder-kafka functional stye starts from a particular version??? – APK Sep 19 '21 at 10:30
  • You cannot use both functional model and `EnableBinding` in the same application even if they are under different profiles. – sobychacko Sep 20 '21 at 15:19

0 Answers0