My question:
I have implemented RSockets on my back-end (java spring boot) and on my front-end (React.js).
My front-end and back-end are able to communicate. However, I would like to know if it is possible to broadcast messages to multiple connected clients from my back-end.
Is this possible?
Asked
Active
Viewed 428 times
2

August Jelemson
- 962
- 1
- 10
- 29
1 Answers
1
Yes it is possible.
Here is an example of a rsocket endpoint using spring webflux & kotlin coroutines which multicasts events to all it's observers.
This uses MutableSharedFlow, but this can be achieved with a PublishSubject or a Flowable in rxjava3.
In this example a rsocket server broadcasts a timestamp every 30 seconds to all the connected clients.
Controller:
@Controller
class ApiResource(
private val chatManager: ChatManager
) {
@MessageMapping("broadcast")
suspend fun broadcast(): Flow<Message> =
chatManager.broadcastStream()
}
Stream manager as well as a demo publisher:
@Service
class ChatManager {
val broadcast = MutableSharedFlow<Message>()
fun broadcastStream() = broadcast
@Scheduled(fixedRate = 30 * 1000)
fun sendBroadcast() = runBlocking{
broadcast.emit(
Message(
"SpringServer",
System.currentTimeMillis().toString(),
"broadcast"
)
)
}
}
React component BroadcastApp.tsx
:
import React, {useEffect, useState} from 'react';
import {
BufferEncoders, encodeAndAddCustomMetadata, encodeAndAddWellKnownMetadata,
MESSAGE_RSOCKET_COMPOSITE_METADATA, MESSAGE_RSOCKET_ROUTING,
RSocketClient,
toBuffer, createBuffer
} from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
const metadataMimeType = MESSAGE_RSOCKET_COMPOSITE_METADATA.string // message/x.rsocket.composite-metadata.v0
const dataMimeType = 'application/json';
const websocketEndpoint = 'ws://localhost:5051/rsocket';
const transport = new RSocketWebSocketClient({
url: websocketEndpoint,
}, BufferEncoders);
const endpoint = 'broadcast';
interface Message {
senderId: string;
text: string;
channelId: string;
}
type CONNECT_STATUS = 'disconnected' | 'connecting' | 'connected';
function BroadcastApp() {
const defaultMetadata = encodeAndAddWellKnownMetadata(
createBuffer(0),
MESSAGE_RSOCKET_ROUTING,
toBuffer(String.fromCharCode(endpoint.length) + endpoint)
)
const constructMetadataWithChannelId = (cid: string) => encodeAndAddCustomMetadata(
defaultMetadata,
'messaging/x.chat.client-id',
toBuffer(String.fromCharCode(cid.length) + cid)
)
const client = new RSocketClient({
setup: {
keepAlive: 60000,
lifetime: 180000,
dataMimeType,
metadataMimeType,
},
transport,
});
const [connectStatus, setConnectStatus] = useState<CONNECT_STATUS>('disconnected');
const [msgs, setMsgs] = useState<string[]>([]);
useEffect(() => {
if (connectStatus === 'connecting') {
console.log(`rsocket client connecting...`);
client
.connect()
.subscribe({
onError: error => {
console.log(`error: client connect: ${error}`)
setConnectStatus('disconnected');
},
onSubscribe: cancel => {},
onComplete: (sock) => {
sock.requestStream({
metadata: constructMetadataWithChannelId('broadcast')
}).subscribe({
onSubscribe: (subscription) => {
console.log(`rsocket client connected ✅`);
setConnectStatus('connected');
subscription.request(1000)
},
onNext: (event:any) => {
console.log(`received event from channel: ${JSON.stringify(event)}`);
const value = JSON.parse(event.data) as Message;
setMsgs(prev => [value.text, ...prev]);
},
onError: (error) => {
console.log(`err with rsocket subscribe: ${error}`)
}
});
}
});
}
}, [connectStatus])
const handleConnect = () => {
setConnectStatus('connecting');
};
const handleDisconnect = () => {
alert('todo: implement disconnect');
}
return (
<div style={{ padding: 20}}>
<div>
{(connectStatus === 'connected') ? (
<div style={{margin: 10, fontSize: 18}}>
<button
onClick={handleDisconnect}
style={{
padding: 10,
borderRadius: 10,
fontSize: 28,
margin: 10,
}}
>
Disconnect
</button>
</div>
) : (<div>
{connectStatus === 'disconnected' ? (<button
onClick={handleConnect}
style={{
padding: 10,
borderRadius: 10,
fontSize: 28,
}}
>
Connect
</button>) : (
<></>
)}
</div>)}
</div>
<div>
{msgs.map(item => (
<div
style={{
backgroundColor: 'lightgreen',
fontSize: 18,
width: 300,
padding: 10,
borderRadius: 10,
margin: 10,
}}>
{item}
</div>
))}
</div>
</div>
);
}
export default BroadcastApp;