0

I've got a function that essentially takes a DOM element, looks for some identifying stuff, and (in theory) returns an Observable that contains the aforementioned identifying stuff withLatestFromed with some other stuff.

My problem is that the returned Observable doesn't emit any whatchamacallits, even though the primaries$ and highlights$ observables that it's made from do.

I'm terribly sorry if this is poorly explained, I'm new to ReactiveX/RxJS and I'm doing my best; if you need further information, ask.

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()

  // Create return observable from highlights and primaries
  const ret$ = highlights$.withLatestFrom(primaries$)

  // These work
  primaries$.subscribe(x => { console.log("primaries:", x) })
  highlights$.subscribe(x => { console.log("highlights:", x) })

  // This gives me nothing
  ret$.subscribe(x => { console.log("return:", x) })

  return ret$
}

Thank you!

dontexist
  • 5,252
  • 4
  • 27
  • 51

2 Answers2

1

I observe that you are using some observables twice, in which case you probably should share those common source observable. For example, highlights$ will be subscribed twice, once while producing values for ret and once while producing values for primaries.

In addition to that, you can put your logging using the do or tap operator which will perform the logging side-effect without affecting the stream. You do need one subscribe to kick off the dataflow. When you subscribe to ret, it will subscribe to highlight and primaries and go up the chain of subscriptions.

Final words, multiple subscriptions are tricky in Rxjs. You need to be aware of the hot vs. cold dichotomy to understand what happens. You can find a detailed examination of the innerworkings here.

So something like :

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)
    .share()
    .tap(console.log.bind(console, 'highlights:'))

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()
    .tap(console.log.bind(console, 'primaries:'))

  // Create return observable from highlights and primaries
  const ret$ = highlights$.withLatestFrom(primaries$)

  // These work
  ret$.subscribe(x => { console.log("return:", x) })

  return ret$
}
Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
0

Building on what user3743222 said in their answer, turns out I also needed to pause/buffer highlights$ when creating ret$, and wait for primaries$ to emit something before continuing. Here's how I did that using Rx.Observable.prototype.pausableBuffered

function randomFunction(element) {
  // Create an observable sequence of text nodes from an array
  const textNodes$ = Rx.Observable.from(getAllTextNodesFrom(element))

  // Get "highlights" by doing stuff with text nodes
  const highlights$ = textNodes$
    .map(doSomeStuff)
    .zip(textNodes$, (res, node) => ({res, node}))
    .filter(({res, node}) => res.length && node.isConnected)
    .share()
    .tap(console.log.bind(console, 'highlights:'))

  // Get "primaries" by doing stuff with "highlights"
  const primaries$ = highlights$
    .map(x => x.res)
    .flatMap(x => x.filter(y => y.matches && y.isPrimary))
    .map(x => x.id)
    .toSet()
    .tap(console.log.bind(console, 'primaries:'))

  // Observable that outputs true when primaries$ outputs anything
  const primariesExist$ = primaries$.map(() => true)

  // Create return observable from highlights and primaries
  return highlights$
    .pausableBuffered(primariesExist$)
    .withLatestFrom(primaries$)
}
dontexist
  • 5,252
  • 4
  • 27
  • 51