3

I want to use this extension: [Quarkus Smallrye Reactive Messaging Kafka]

But in my application the name of the topics is not known in advance, it is specified according to the message received from the user at runtime. How can I specify the topic name and settings related to the topic without annotations and programmatically? (Only for send a message to Kafka -> Produce)

@ApplicationScoped
public class PriceGenerator {

    private Random random = new Random();

    // Don't want to use this 
    // "generated-price" not known at build time
    @Outgoing("generated-price")                       
    public Multi<Integer> generate() {                  
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onOverflow().drop()
                .map(tick -> random.nextInt(100));
    }

}

or these configs should set at runtime and programmatically

mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

Because I did not know the way, I used the native Kafka driver

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-kafka-client</artifactId>
    </dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topicName.toString(), messageFactory.MessageToString(message)));

EbiPenMan
  • 49
  • 5
  • You've got the topic name in the producer.send call. What's the issue? – OneCricketeer Feb 18 '21 at 14:59
  • @OneCricketeer Thanks for your reply. I use `quarkus-kafka-client` dependency. but I don't want to use it. I want to use `smallrye-kafka`. I used `quarkus-kafka-client` because I had no choice. `smallrye-kafka` use annotation for the topic name. – EbiPenMan Feb 19 '21 at 16:21
  • @OneCricketeer I'm looking for a way to use `smallrye-kafka` to manually set the topic name without the use of annotations. – EbiPenMan Feb 19 '21 at 16:35
  • I'm not super familiar with the smallrye library, but I think it requires the annotations so isn't able to dynamically redefine the outgoing channel – OneCricketeer Feb 20 '21 at 13:53
  • 3
    now it's possible with https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/kafka/kafka.html#_dynamic_topic_names – bdeweer Mar 09 '21 at 12:03
  • @bertranddeweer thank you. Yes, that's what I wanted. But your link gives me a 404 error. I put the link with the new version https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.9/kafka/kafka.html#_dynamic_topic_names – EbiPenMan May 23 '22 at 04:17

1 Answers1

1

You're able to override the value of topic dynamically at startup or at any time you need, but here is a snippet of code to indicates how to override the predefined value of topic:

@ApplicationScoped
public class AppLifecycleBean {

    void onStart(@Observes StartupEvent ev) {               
        System.setProperty("mp.messaging.outgoing.generated-price.topic", "example");
    }

    void onStop(@Observes ShutdownEvent ev) {               
    }

}
LW001
  • 2,452
  • 6
  • 27
  • 36
  • This is "hacky", because you are overriding application config. Also not sure, if you can use this "any-time", because once the outgoing channel is bound to some magic handler object, it wont respond to changes in configuration – apocalypz Jan 25 '22 at 18:25