3

We try to publish and subscribe to MQTT protocol using smallrye reactive messaging. We managed to actually publish a message into a specific topic/channel through the following simple code

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {
    
    @Outgoing("pao")
    public Multi<String> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> "A Message in here");
    }
}

What we want to do is to call whenever we want the generate() method somehow with a dynamic topic, where the user will define it. That one was our problem but then we found these classes from that repo in github. Package name io.smallrye.reactive.messaging.mqtt

For example we found that there is a class that says it makes a publish call to a MQTT broker(Mosquitto server up).

Here in that statement SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false); We get the a red underline under the SendingMqttMessage<String> saying 'SendingMqttMessage(java.lang.String, java.lang.String, io.netty.handler.codec.mqtt.MqttQoS, boolean)' is not public in 'io.smallrye.reactive.messaging.mqtt.SendingMqttMessage'. Cannot be accessed from outside package

UPDATE(Publish done) Finally made a Publish request to the mqtt broker(a mosquitto server) and all this with a dynamic topic configured from user. As we found out the previous Class SendingMqttMessage was not supposed to be used at all. And we found out that we also needed and emitter to actually make a publish request with a dynamic topic.

    @Inject
    @Channel("panatha")
    Emitter<String> emitter;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createUser(Device device) {
        System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
        emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
        return Response.ok().status(Response.Status.CREATED).build();
    }

Now we need to find out about making a Subscription to a topic dynamically.

DKafetzis
  • 3
  • 2
Panagiss
  • 3,154
  • 2
  • 20
  • 34
  • 1
    Did you find any solution for this? I kind of have the same problem – Ahmed Anwar Nov 14 '20 at 15:05
  • Unfortunately No. Also i have left the project standby so i haven't searched more on that, but i couldn't find anything. Maybe we are doing something wrong idk – Panagiss Nov 15 '20 at 11:46

1 Answers1

1

first to sett us to the same page:
Reactive messaging does not work with topics, but with channels. That is important to note, because you can exclusively read or write to a channel. So if you want to provide both, you need to configure two channels pointing at the same topic, one incoming and one outgoing

To answer your question:

You made a pretty good start with Emitters, but you still lack the dynamic nature you'd like. In your example, you acquired that Emitter thru CDI.
Now that is all we need, to make this dynamic, since we cann dynamically inject Beans at runtime using CDI like this:

Sending Messages

private Emitter<byte[]> dynamicEmitter(String topic){
        return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

please also note, that i am creating a Emitter of type byte[], as this is the only currently supportet type of the smallrye-mqtt connector (version 3.4.0) according to its documentation.

Receiving Messages

To read messages from a reactive messaging channel, you can use the counterpart of the Emitter, which is the Publisher.
It can be used analog:

private Publisher<byte[]> dynamicReceiver(String topic){
        return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
    }

You can then process these Date in any way you like. As demo, it hung it on a simple REST Endpoint

@GET
    @Produces(MediaType.SERVER_SENT_EVENTS) 
    public Multi<String> stream(@QueryParam("topic") String topic) {
        return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new); 
    }
    
    @GET
    @Path("/publish")
    public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
        dynamicEmitter(topic).send(msg.getBytes());
        return true; 
    }

One more Thing

When creating this solution I hit a few pitfalls you should know about:

  1. Quarkus removes any CDI-Beans that are "unused". So if you want to inject them dynamically, you need to exclude those, or turne off that feature.
  2. All channels injected that way must be configured. Otherwise the injection will fail.
  3. For some Reason, (even with removal completely disabled) I was unable to inject Emitters dynamically, unless they are ever injected elsewhere.
Omega1001
  • 502
  • 6
  • 14