0

I'm trying to test latency of messages sent through a shared/hot observable. I noticed when I have multiple observers on a single shared observable, a single observer gets invoked n times from a single message (where n is the number of observers on the shared observable).

I ran the code below with 10 Observers and 1 Message Per Observer, and each observer gets invoked 10 times per message (implying 100 total observer.next() calls). From my understanding of observers/observable, each observer should only get invoked once per message. Am I just using the share() operator incorrectly here? Or is my understanding of it in general flawed?

const getMessageLatency = (observersCount, messagesPerObserver) => {
    const completedMessages = [];
    const source = new Subject();
    const sharedObservable = source.pipe(
        tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
        share()
    );

    // Setup observers
    for (i = 0; i < observersCount; ++i) {
        sharedObservable
        .pipe(
            tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
            filter((message) => message.id === getObserverId(i)),
            tap(() => console.log(`Filtered for ${getObserverId(i)}`))
        )
        .subscribe((message) => {
            const date = new Date();
            message.endTime = date.getMilliseconds();
            completedMessages.push(message);
        })
    }

    // send out messages
    for (i = 0; i < observersCount; ++i) {
        for (j = 0; j < messagesPerObserver; ++j) {
            const date = new Date();
            const message = {
                id: getObserverId(i),
                startTime: date.getMilliseconds()
            }

            // send message
            source.next(message);
        }
    }

    // process data (get average message latency)
    const totalMessageLatency = completedMessages.reduce(
        (accumulatedLatency, currentMessage) => {
            const currentMessageLatency = 
                currentMessage.endTime - currentMessage.startTime;
            return accumulatedLatency + currentMessageLatency;
        }, 0);
    const averageLatency = totalMessageLatency / completedMessages.length;

    console.log("==============================================================================");
    console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
    console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
    console.log(`Total Messages Received: ${completedMessages.length}`);
    console.log(`Average Latency per Message: ${averageLatency}`);
    console.log("==============================================================================");

    return averageLatency;
}

Once this is done running, if "Total Messages Sent" is x, then "Total Message Received" will be x^2

buuchan
  • 51
  • 5
  • I suspect part of the problem/misunderstanding relates to the `for` loop `var`. See https://stackoverflow.com/q/750486/6680611 – cartant Sep 19 '18 at 20:45
  • mmm...Thank you! – buuchan Sep 19 '18 at 20:55
  • Also, be aware that logging to the console is super slow and that milliseconds are coarse - use [`performance.now()`](https://developer.mozilla.org/en-US/docs/Web/API/Performance/now) – cartant Sep 19 '18 at 20:57
  • Thanks! I removed all the console logs that occur during the test. Also switched to performance.now() (or at least the Node equivalent that i could find). Here's the actual test file in question for reference: https://github.com/KeijiBranshi/npm-packages/blob/master/rxjs-filter-test/index.js – buuchan Sep 19 '18 at 22:57

1 Answers1

0

Added let to the declarations of my for-loops. Y'all can tell I'm new to JavaScript too.

Thank you cartant

buuchan
  • 51
  • 5