I'm trying to test latency of messages sent through a shared/hot observable. I noticed when I have multiple observers on a single shared observable, a single observer gets invoked n times from a single message (where n is the number of observers on the shared observable).
I ran the code below with 10 Observers and 1 Message Per Observer, and each observer gets invoked 10 times per message (implying 100 total observer.next() calls). From my understanding of observers/observable, each observer should only get invoked once per message. Am I just using the share()
operator incorrectly here? Or is my understanding of it in general flawed?
const getMessageLatency = (observersCount, messagesPerObserver) => {
const completedMessages = [];
const source = new Subject();
const sharedObservable = source.pipe(
tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
share()
);
// Setup observers
for (i = 0; i < observersCount; ++i) {
sharedObservable
.pipe(
tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
filter((message) => message.id === getObserverId(i)),
tap(() => console.log(`Filtered for ${getObserverId(i)}`))
)
.subscribe((message) => {
const date = new Date();
message.endTime = date.getMilliseconds();
completedMessages.push(message);
})
}
// send out messages
for (i = 0; i < observersCount; ++i) {
for (j = 0; j < messagesPerObserver; ++j) {
const date = new Date();
const message = {
id: getObserverId(i),
startTime: date.getMilliseconds()
}
// send message
source.next(message);
}
}
// process data (get average message latency)
const totalMessageLatency = completedMessages.reduce(
(accumulatedLatency, currentMessage) => {
const currentMessageLatency =
currentMessage.endTime - currentMessage.startTime;
return accumulatedLatency + currentMessageLatency;
}, 0);
const averageLatency = totalMessageLatency / completedMessages.length;
console.log("==============================================================================");
console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
console.log(`Total Messages Received: ${completedMessages.length}`);
console.log(`Average Latency per Message: ${averageLatency}`);
console.log("==============================================================================");
return averageLatency;
}
Once this is done running, if "Total Messages Sent" is x, then "Total Message Received" will be x^2