5

I am trying to send data from kafka using Spring cloud stream to Rsocket and then represent data on React

Here is my configuration.

@Configuration
public class RsocketConsumerConfiguration {
    
    @Bean
    public Sinks.Many<Data> sender(){
        return Sinks.many().multicast().directBestEffort();
    }
    

}

@Controller public class ServerController {

@Autowired
private Sinks.Many<Data> integer;

@MessageMapping("integer")
public Flux<Data> integer() {
    return  integer.asFlux();
}
@EnableBinding(IClientProcessor.class)
public class Listener {

    @Autowired
    private Sinks.Many<Data> integer;

    @StreamListener(IClientProcessor.INTEGER)
    public void integer(Data val) {
        System.out.println(val);
        integer.tryEmitNext(val);
    }

}

   let  client = new RSocketClient({
    transport: new RSocketWebSocketClient(
        {
            url: 'ws://localhost:7000/ws',
            wsCreator: (url) => new WebSocket(url),
            debug: true,
        },
        BufferEncoders,
    ),
    setup: {
        dataMimeType: "application/json",
        metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
        keepAlive: 5000,
        lifetime: 60000,
    },
});

  client
            .then(rsocket => {
                console.log("Connected to rsocket");
                rsocket.requestStream({
                    metadata: Buffer.from(encodeCompositeMetadata([
                        [MESSAGE_RSOCKET_ROUTING, encodeRoute("integer")],
                    ])),
                 
                })
                    .subscribe({
                        onSubscribe: s => {
                            s.request(2147483647)
                        },
                        onNext: (p) => {
                            let newData = {
                                time: new Date(JSON.parse(p.data).time).getUTCSeconds(),
                                integer: JSON.parse(p.data).integer
                            }
                           newData.integer >100?setInteger(currentData => [newData, ...currentData]):setInt(currentData => [newData, ...currentData])
                           console.log(newData)
                        },
                        onError: (e) => console.error(e),
                        onComplete: () => console.log("Done")
                    });

spring.cloud.stream.bindings.integer.destination=integer Not able to see it in react app. Please advise. What I am doing wrong?

meuhedet meuhedet
  • 465
  • 2
  • 5
  • 9

1 Answers1

1

Given the data appears to be going directly from Kafka (via Spring) to the client, perhaps it would make more sense to stream Kafka messages via an internet-messaging broker to Internet-facing clients over WebSockets.

Disclosure: I am not the author of that article, but do work at that company where the author works. We see this use case frequently, so expect this approach may be useful.

Matthew O'Riordan
  • 7,981
  • 4
  • 45
  • 59