2

I am trying to get messages from kafka and send it to RSocket using Spring. Posting Server side on Spring Java and client side with React

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

    @Bean
    public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
                                                            RsocketConsumerProperties rsocketConsumerProperties) {
        RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
        return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
    }
}
@EnableBinding(Sink.class)
public class Listener {

    @Autowired
    private Function<Integer, Mono<Integer>> rsocketConsumer;


    @StreamListener(Sink.INPUT)
    public void fireAndForget(Integer val) {
        System.out.println(val);
        rsocketConsumer.apply(val).subscribe();
    }
}
@Controller
public class ServerController {

    @MessageMapping("data")
    public Mono<Integer> hello(Integer integer) {
        return Mono.just(integer);
    }

}

What do i do wrong in server side because my client is connected but not able to get new messages

  client.connect().subscribe({
    onComplete: socket => {
        socket.fireAndForget({
          data: { message: "hello from javascript!" },
          metadata: null
        });
      },
      onError: error => {
        console.log("got error");
        console.error(error);
      },
      onSubscribe: cancel => {
        /* call cancel() to abort */
        console.log("subscribe!");
        console.log(cancel);
        // cancel.cancel();
      }
    });
   
meuhedet meuhedet
  • 465
  • 2
  • 5
  • 9

1 Answers1

2

You do this requester.route("input").data("Welcome to Rsocket").send(); where we have this:

   /**
     * Perform a {@link RSocket#fireAndForget fireAndForget} sending the
     * provided data and metadata.
     * @return a completion that indicates if the payload was sent
     * successfully or not. Note, however that is a one-way send and there
     * is no indication of whether or how the event was handled on the
     * remote end.
     */
    Mono<Void> send();

You see - Mono? That means that it has to be subscribed to initiate a reactive stream processing. See project Reactor for more info: https://projectreactor.io/

On the other hand it is not clear what is server and what is client in your case... you do this:

    /**
     * Build an {@link RSocketRequester} with an
     * {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
     * the given URL. The requester can be used to make requests
     * concurrently. Requests are made over a shared connection that is also
     * re-established as needed when further requests are made.
     * @param uri the URL to connect to
     * @return the created {@code RSocketRequester}
     * @since 5.3
     */
    RSocketRequester websocket(URI uri);

And I would say it means client in the code you show. The server is on the other side where that 7000 port is opened for ws:// protocol. So, be sure that you understand and configure all the parts properly. For example I don't see why do you need a @RestController in your Listener class...

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Hi Artem. Thanks for you reply. I corrected restcontroller as oit was not correct. What i am trying to do is to get messeges from kafka and then send them to rsocket. What would be you advice how to do it? I want to use spring to read messages from kafka and send them to react (javascript) – meuhedet meuhedet Dec 23 '20 at 14:58
  • You go right way with the `RSocketRequester`. Only what you miss is a `subscribe()` to that `send()` result. you also may take a look into the latest Spring Cloud Stream with functional API and also investigate this project: https://spring.io/projects/spring-cloud-stream-applications. It has an `RScoket Sink` application for tasks like yours. Together with Kafka Binder you would achieve your goal with no code writing! – Artem Bilan Dec 23 '20 at 15:03
  • Hi Again. I am trying to use this one. Look like it what i need. https://spring.io/blog/2020/08/03/creating-a-function-for-consuming-data-and-generating-spring-cloud-stream-sink-applications. But I getting an error Field rsocketConsumer in com.healthservices.streaming.client.listener.Listener required a bean of type 'org.springframework.cglib.core.internal.Function' that could not be found. Tring to inject rsocketConsumer – meuhedet meuhedet Dec 23 '20 at 15:26
  • Why do you write a custom app? What is wrong with that RSocket Sink from out-of-the-box? Anyway I think you are missing this one `spring.cloud.function.definition=rsocketConsumer` in the `application.properties` for your custom sink. – Artem Bilan Dec 23 '20 at 15:50
  • 1
    @meuhedetmeuhedet this is a helpful answer. You should consider accepting it (Using the V icon and for sure upvote it – royB Dec 23 '20 at 19:02
  • i see that it fires with rsocketConsumer.apply(val).subscribe(); Checking it with controller. But do not see it in react app – meuhedet meuhedet Dec 24 '20 at 09:32
  • Probably that’s different story which deserves its own SO thread... – Artem Bilan Dec 24 '20 at 18:34
  • https://stackoverflow.com/questions/65463438/get-message-from-kafka-send-to-rsocket-and-revice-it-from-react-client – meuhedet meuhedet Dec 27 '20 at 06:38