1

I am trying to get the Non-Blocking response from the Micronaut kafka implementation, however the return value in not working.

public class ProductManager implements IProductManager{
    private final ApplicationContext applicationContext;
    
    public ProductManager(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    public ProductViewModel findFreeText(String text) {
       
        final ProductViewModel model = new ProductViewModel();
        IProductProducer client = applicationContext.getBean(IProductProducer.class);
         client.findFreeText(text).subscribe(item -> {
          System.out.println(item);
        });
         return model;
    }
}

The subscribe method is not working, the debugger never comes to this point. I want to get the value back from the kafka listener

kafka producer

@KafkaClient
public interface IProductProducer {
    
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    Flowable<ProductViewModel> findFreeText(String text);
}

Kafka Listener

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class ProductListener {
    private static final Logger LOG = LoggerFactory.getLogger(ProductListener.class);

    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    public Flowable<Product>> findByFreeText(String text) {
        LOG.info("Listening value = ", text);
        
        return Flowable.just(new Product("This is the test", 0,"This is test description"));
    }
}

Micronaut documentation for non-blocking method

https://docs.micronaut.io/1.0.0.M3/guide/index.html#_reactive_and_non_blocking_method_definitions

San Jaisy
  • 15,327
  • 34
  • 171
  • 290
  • Are you using Micronaut 1.0.0.M3? – Jeff Scott Brown Oct 07 '20 at 15:46
  • @JeffScottBrown Na I am using Micronaut 2.1.0, that was just documentation I was referring to – San Jaisy Oct 07 '20 at 16:02
  • This isn't an answer to your question but you should use the docs which correspond to the version of the software you are using. Those likely include https://micronaut-projects.github.io/micronaut-kafka/2.1.0/guide/index.html and https://docs.micronaut.io/2.1.0/guide/index.html. – Jeff Scott Brown Oct 07 '20 at 16:45
  • Yeah make sense, however those documents also have the same instruction – San Jaisy Oct 07 '20 at 16:59
  • This doesn't explain why your breakpoint never gets hit, but isn't the `findFreeText` method in `ProductManager` prone to return before `model` ever gets populated? – Jeff Scott Brown Oct 07 '20 at 20:25
  • @JeffScottBrown Even I don't know the reason why it is never get called. Do you have any sample code to return the value from the Kafka listener. – San Jaisy Oct 08 '20 at 01:18
  • "Do you have any sample code to return the value from the Kafka listener." - https://github.com/micronaut-projects/micronaut-kafka/blob/606599bb1898342278d001faad25c645bf0da944/kafka/src/test/groovy/io/micronaut/configuration/kafka/docs/consumer/reactive/ProductListener.java – Jeff Scott Brown Oct 08 '20 at 13:39
  • @JeffScottBrown thanks for the link, I am still not able to solve the issue, can you please have a look on my code what mistake I am making, here is the repo - https://github.com/anandjaisy/micronautKafka/tree/develop/src/fete-bird-product/src/main/java/fete/bird/service – San Jaisy Oct 08 '20 at 16:07

0 Answers0