How to do an infinite requestStream() in RSocket-JS ?
RSocket-JS does not provide by default any "re-subscription" mechanism when a connection is closed : Meaning when your connection is closed or completed, you can not receive events on this connection anymore.
This is due to the fact that events are managed by a Subscription / Subscriber mechanism, internal to R-Socket : When the connection is in 'CLOSED' or 'ERROR' status, the subscription is cancelled.
You can see it in their RSocketClient.js, line 91 : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-core/src/RSocketClient.js
This means that if you would like to provide an infinite requestStream(), you need to wrap it with a retry
mechanism, in order to create a new Connection in case you lost the previous one (Which can
happens quite often, for example when the server is not available during an update, you loose the connection, ...).
But you also need to take care of this subscription, otherwise, your solution will not run properly either.
What happens when subscription.request() runs out?
The number you provide calling subscription.request()
is the number of events your
subscription will be able to listen to. In other words, the maximum number of time you will go through the
onNext()
method to manage an event.
After you reached this maximum number, if I'm correct, your connection will still be alive, but you won't
receive any events in your onNext()
method anymore.
Meaning this needs to be taken into account as well when you're creating your retry mechanism to
create your infinite requestStream.
Then how to create the infinite requestStream() ?
To create your infinite requestStream() and to properly manage the subscription issue as well, you can inspire you from an example provided by RSocket, which takes care of the connection and its related Subscription : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/ReconnectExample.js
Personally, I used it in the current project I'm working on, but I re-worked it a bit, in order to add a few additional things :
- A maximum number of retry, in order to still kill the connection in case of a big issue.
- An exponential time interval before each retry, including some randomness to avoid all WebSocket clients to reconnect at the same time (and avoid a flooding of requests at server side).
Cleaning up the specific solution code, client requestStream request would give something like :
export function infiniteRequestStream(maxRetry: number, retryIntervalInMs: number): Promise<ICureRSocket<any, any>> {
const auth = encodeSimpleAuthMetadata(username, password)
const setup = {
keepAlive: 1000000,
lifetime: 100000,
dataMimeType: APPLICATION_JSON.string,
metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string
};
const clientFactory = () => new RSocketClient({
serializers: {
data: {
deserialize: JsonSerializer.deserialize,
serialize: (x) => Buffer.from(JSON.stringify(x))
},
metadata: IdentitySerializer,
},
setup,
transport: new RSocketWebSocketClient({
url: `ws://localhost:8080/rsocket`,
debug: true,
wsCreator: (url) => {
return new WebSocket(url) as any;
},
}, BufferEncoders)
});
const requestStreamFlowable = (socket: ICureRSocket<unknown, string | Buffer | Uint8Array>, auth: Buffer) => {
return new Flowable((subscriber) => {
socket.requestStream({
data: Buffer.from('request-stream'),
metadata: encodeCompositeMetadata([
[MESSAGE_RSOCKET_ROUTING, encodeRoute('your-rsocket-route')],
[MESSAGE_RSOCKET_AUTHENTICATION, auth]
])
}
).subscribe(subscriber)
});
}
return new Promise(async (resolve, reject) => {
const socket = new ICureRSocket(clientFactory, reject);
await socket.connect(maxRetry, retryIntervalInMs)
const request = requestStreamFlowable(socket, auth);
request
.lift(actual => new ResubscribeOperator(request, actual))
.subscribe({
onSubscribe: (sub) => sub.request(2147483647),
onComplete: () => console.log(`Request-Stream Completed`),
onNext: (payload: any) => {
console.log(`Your next event is ${payload}`)
},
onError: error => {
console.log(`Request-Stream Error ${error}`)
}
})
resolve(socket)
})
}
And find the complete implementation here : https://github.com/icure-io/icure-medical-device-js-sdk/blob/master/src/utils/rsocket.ts