5

I have a code to fetch book and library card associated with it:

// mimic http requests
const fetchBook = (bookId: number) => {
    const title = 'Book' + bookId;
    return timer(200).pipe(mapTo({ bookId, title }));
}
const fetchLibraryCard = (bookId: number) => {
    const borrowerName = 'Borrower of Book' + bookId;
    return timer(300).pipe(mapTo({ borrowerName }));
}

const bookId$ = new Subject<number>();

const book$ = bookId$.pipe(
    switchMap(bookId => fetchBook(bookId)),
    shareReplay(1)
);

// e.g. 'Refresh library card' button
const libraryCardUpdater$ = new BehaviorSubject<void>(undefined);

const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$]).pipe(
    switchMap(([bookId]) => fetchLibraryCard(bookId)),
    shareReplay(1)
);

combineLatest([book$, libraryCard$]).subscribe(([book, libraryCard]) => {
    console.log('book:', book.title, '| borrower:', libraryCard.borrowerName)
})

bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);

The problem that I get inconsistent state in subscriber:

book: Book1 | borrower: Borrower of Book1  <-- OK
book: Book2 | borrower: Borrower of Book1  <-- Not OK
book: Book2 | borrower: Borrower of Book2  <-- OK
book: Book2 | borrower: Borrower of Book2  <-- OK, but redundant
book: Book3 | borrower: Borrower of Book2  <-- Not OK
book: Book3 | borrower: Borrower of Book3  <-- OK

I think about something like pushing undefined to libraryCard$ at the same moment when bookId$ is changed.

But how to do that in a reactive manner?

Update:

Library card should be always consistent with fetched book (or be undefined at loading time). bookId$ can be changed by user action at any time. Also library card can be updated at any time manually by user (libraryCardUpdater$). libraryCardUpdater$ emitting should re-fetch card, but shouldn't re-fetch book

Update2: I just realized that library card can be fetched sequentially after book. It is acceptable, although not perfect solution for end-user.

Artem
  • 1,773
  • 12
  • 30

6 Answers6

10

Testing your code in https://thinkrx.io/rxjs/ gives

enter image description here

where the last row is the same as your console.logs.

Changing to withLatestFrom instead of combineLatest removes unsynchronized book/card (#2 - 1/2/ & #5 - 2/3)

enter image description here

This is the code, with changes

  • labels abbreviated
  • timings divided by 10
  • added id to cardUpdater$ and used Subject() with explicit .next() at start (cosmetic - still works with original BehaviorSubject).
const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay } 
  = require('rxjs/operators');

// mimic http requests
const fetchBook = (bookId) => {
 return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (bookId) => {
  return timer(30).pipe(mapTo({ name: `c${bookId}` }));
}

const bookId$ = new rx.Subject();

const book$ = bookId$.pipe(
  switchMap(bookId => fetchBook(bookId)),
  shareReplay(1)
);

// e.g. 'Refresh library card' button
const cardUpdater$ = new rx.Subject();

const libraryCard$ = bookId$.combineLatest(cardUpdater$)
.pipe(
  switchMap(([bookId, cardId]) => fetchLibraryCard(bookId)),
  shareReplay(1)
)

const combined$ = libraryCard$.withLatestFrom(book$)
.pipe(
  map(([card,book]) => `b${book.title[1]}|c${card.name[1]}`),
)

// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
libraryCard$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
combined$.subscribe(rxObserver('combined'))

// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);

One thing that puzzles me is this emit you want to remove.

book: Book2 | borrower: Borrower of Book2  <-- OK, but redundant

It's triggered by cardUpdater$ event, can be removed with distinctUntilChanged() in combined$, but doing so makes the card refresh superfluous.

It feels like you want a cardId which changes on card refresh, and re-issues the same book on the new card.

Something like this has a more orthogonal feel

const { rxObserver } = require('api/v0.3');
const rx = require('rxjs');
const { timer } = rx;
const { switchMap, map, mapTo, combineLatest, withLatestFrom, shareReplay } 
  = require('rxjs/operators');

const fetchBook = (bookId) => {
 return timer(20).pipe(mapTo({ bookId, title: 'b' + bookId }));
}
const fetchLibraryCard = (cardId) => {
  return timer(30).pipe(mapTo({ name: `c${cardId}` }));
}

const bookId$ = new rx.Subject();
const book$ = bookId$.pipe(
  switchMap(bookId => fetchBook(bookId)),
  shareReplay(1)
);

const cardUpdater$ = new rx.Subject();
const card$ = cardUpdater$.pipe(
  switchMap(cardId => fetchLibraryCard(cardId)),
  shareReplay(1)
);

const issue$ = book$.merge(card$).pipe(
  switchMap(() => card$.withLatestFrom(book$)),
  map(([card,book]) => `${book.title}|${card.name}`),
)

// Marbles
bookId$.subscribe(rxObserver('id'))
book$.map(book=>book.title).subscribe(rxObserver('book'))
cardUpdater$.subscribe(rxObserver('card update'))
card$.map(card=>card.name).subscribe(rxObserver('libraryCard$'))
issue$.subscribe(rxObserver('combined'))

// Events
bookId$.next(1);
cardUpdater$.next(1)
setTimeout(() => bookId$.next(2), 50);
setTimeout(() => cardUpdater$.next(2), 100);
setTimeout(() => bookId$.next(3), 150);

Eddy Gilmour
  • 860
  • 2
  • 13
4

An example with zip

// mimic http requests
const fetchBook = (bookId: number) => 
    const title = 'Book' + bookId;
    return timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }));
}
const fetchLibraryCard = (bookId: number) => {
    const borrowerName = 'Borrower of Book' + bookId;
    return timer(300).pipe(mapTo({ borrowerName }));
}

// Borrow a book
const bookId$ = new Subject<number>();

// Refresh library card
const libraryCardUpdater$ = new BehaviorSubject<void>();

// re-emit book2 on card update to allow zip to pair card and book correctly
const book$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
  switchMap(([bookId, _]) => fetchBook(bookId)),
  shareReplay(1)
);

const libraryCard$ = combineLatest([bookId$, libraryCardUpdater$])
.pipe(
  map(([bookId, _]) => fetchCard(bookId)),
  shareReplay(1)
);

book$.zip(libraryCard$)
.pipe(
  map(([book,libraryCard]) => `book: ${book.title}| borrower:${libraryCard.borrowerName}`),
)
.subscribe(console.log)

bookId$.next(1);
setTimeout(() => bookId$.next(2), 500);
setTimeout(() => libraryCardUpdater$.next(), 1000);
setTimeout(() => bookId$.next(3), 1500);

Output

book: Book1 | borrower: Borrower of Book1  <-- OK
book: Book2 | borrower: Borrower of Book2  <-- OK
book: Book2 | borrower: Borrower of Book2  <-- OK, but repeat due to libraryCardUpdater$
book: Book3 | borrower: Borrower of Book3  <-- OK
user16695029
  • 3,365
  • 5
  • 21
1

You have to turn things around. Your source of truth need to be the bookId$, and from that constructued observable you can get the book and libraryCard:

const bookId$ = new ReplaySubject<number>(1);

const libraryCardUpdater$ = new Subject<void>();

const libraryCardBook$ = combineLatest([
  bookId$.pipe(
    distinctUntilChanged(),
    switchMap(bookId => fetchBook(bookId))
  ),
  libraryCardUpdater$.pipe(
    switchMap(() => this.bookId$),
    switchMap((bookId) => fetchLibraryCard(bookId))
  )
]).pipe(
  map(([ book, libraryCard ]) => ({ book, libraryCard })),
  startWith({ book: undefined, libraryCard: undefined }),
  shareReplay(1)
);

const book$ = libraryCardBook$.pipe(map(({ book }) => book);
const libraryCard$ = libraryCardBook$.pipe(map(({ libraryCard }) => libraryCard);
Raz Luvaton
  • 3,166
  • 4
  • 21
  • 36
Poul Kruijt
  • 69,713
  • 12
  • 145
  • 149
  • Your recommendation is subscription to `libraryCardBook$` pair? The problem that in real app the book and libraryCard belong to different services. So I have to add the knowledge about library cards (and dozens of other related entities actually) to booksService, I don't think this is a good idea – Artem Aug 11 '21 at 11:01
  • @Artem you want both to be available and related to each other. Then you need a service which combines both. You can keep whatever logic in the corresponding services, but in my understanding, the result should be done as in my answer :) – Poul Kruijt Aug 11 '21 at 17:17
  • Yes, some service should combine values, but it shouldn't fetch all related entities at once. – Artem Aug 11 '21 at 18:51
  • @Artem but how can it not, as they are linked with each other. At least the libraryCard is linked to the bookId – Poul Kruijt Aug 11 '21 at 19:22
0

Rather than combineLatest() (which emits when any observable emits a value), I would use zip() which emits when all observables emit their value.

Also, I've updated some of your code to make things more declarative/reactive. While I know they were placeholders for an API call, there's no need for those functions when you can switchMap() from your bookId Subject.

const bookId = new Subject<number>();

const book$ = bookId.pipe(
  switchMap((bookId) =>
    timer(200).pipe(mapTo({ bookId, title: `Book${bookId}` }))
  ),
  shareReplay(1)
);

const libraryCard$ = bookId.pipe(
  switchMap((bookId) =>
    timer(300).pipe(mapTo({ borrowerName: `Borrower of Book${bookId}` }))
  ),
  shareReplay(1)
);

zip(book$, libraryCard$).subscribe(([book, libraryCard]) => {
  console.log("book:", book.title, "| borrower:", libraryCard.borrowerName);
});

bookId.next(1);
setTimeout(() => bookId.next(2), 500);
setTimeout(() => bookId.next(2), 1000); // library card refresh
setTimeout(() => bookId$.next(3), 1500);
Joshua McCarthy
  • 1,739
  • 1
  • 9
  • 6
  • Generally, does not work. See updates in question – Artem Aug 11 '21 at 12:00
  • @Artem How are users able to update the state of a library card when it should be in tandem with the book ID? – Joshua McCarthy Aug 11 '21 at 12:04
  • User can clicks on 'Refresh card' for example and get new state of card from server. It is synthetic code – Artem Aug 11 '21 at 12:06
  • Refresh means... getting the same library card again? That can be a method that just does a `take(1)` on `libraryCard$`. But since we're dealing with observables, I'm not sure I understand why this refresh requirement exists. – Joshua McCarthy Aug 11 '21 at 12:09
  • Borrower can return book while user on page. It is synthetic example. The real app is much more complex and not about books – Artem Aug 11 '21 at 12:12
  • And in the case that a borrower returns a book, then the library card should emit `undefined`? – Joshua McCarthy Aug 11 '21 at 12:13
  • Yes, it can switched to `undefined` or to another person. I search for solution that resets `libraryCard$` at the very same moment when `bookId$` changed. Now I do that in imperative way with Subjects, but I don't like that solution – Artem Aug 11 '21 at 12:17
  • Then I would have that refresh method re-emit the same id from the `bookId` Subject. The only downside is that `book$` makes a duplicate API call. But given your console must log two values that must be synchronous to one another, that creates limitations. – Joshua McCarthy Aug 11 '21 at 12:32
  • @Artem Saw the bounty posted on this, so I'm curious as to what requirement is not being met in this answer. – Joshua McCarthy Aug 17 '21 at 09:57
0

Here's my take on this:

const bookId$ = new Subject<number>();
const libraryCardUpdater$ = new Subject<void>();

const result$ = bookId$.pipe(
  switchMap(() => {
    const book$ = concat(of(null), fetchBook(bookId));

    const libraryCard$ = concat(
      of(null),
      concat(of(null), libraryCardUpdater$).pipe(
        concatMap(() => fetchLibraryCard(bookId))
      )
    );

    return combineLatest([book$, libraryCard$]);
  })
);

This will guarantee the following:

  • whenever bookId$ changes, we start by emitting only once as expected: [null, null]
  • then, there's a race between fetchBook and fetchLibraryCard calls. So we could either end up with [someBook, null] or [null, someLibraryCard]. At some point they should both be there and there will be another emission with [someBook, someLibraryCard]
  • in the case where we've got for example [someBook, null] and the bookId changes, we'll then emit straight away [null, null]. Same for all the cases where the bookId changes
  • as for the libraryCardUpdater$, it'll only ever trigger fetchLibraryCard and nothing else

Hopefully that matches what you wanted, always happy to edit my answer if I've missed anything

maxime1992
  • 22,502
  • 10
  • 80
  • 121
  • Note: for the race between `fetchBook` and `fetchLibraryCard` this will allow you to display something as soon as you've got one of them. If you prefer to wait to have both before showing anything you can pipe a filter and make sure both values are here. – maxime1992 Aug 14 '21 at 11:14
  • It works but I don't like that book and card is fetched within same pipe. See comments - https://stackoverflow.com/a/68740504/5171110 – Artem Aug 18 '21 at 09:07
  • Happy to review any other solutions but so far I'm fairly confident that the answers I've seen have at least 1 thing that is wrong (at runtime). I believe this one does what you want for all the different cases (and there are quite a few to keep in mind). No pb though, I hope you find what you want – maxime1992 Aug 18 '21 at 20:25
-1

Sequential (not simultaneous) fetches make things easier:

const libraryCard$ = combineLatest([book$, libraryCardUpdater$]).pipe(
    switchMap(([book]) => concat(of({ borrowerName: <string>undefined }), fetchLibraryCard(book.bookId))),
    shareReplay(1)
);

The only thing that I need then is to add zero debouncer:

combineLatest([book$, libraryCard$]).pipe(debounceTime(0)).subscribe(...
Artem
  • 1,773
  • 12
  • 30