2

I want to find the function traverse with apply for Observable[A] in JavaScript. First, in words:

I have an Observable[A list] and I want to apply a function (A -> Observable[B]) on every item on the list, in parallel, and join them all up to an Observable[B list] which I can then consume.

Or in type signatures, I would like to find this function: traverse : Observable[A list] -> (A -> Observable[B]) -> Observable[B list]. How to do it?

Henrik
  • 9,714
  • 5
  • 53
  • 87
  • Little unclear on what you are looking for. You want something that is a mapping function of `Observable[A] -> Observable[B]` into `Observable[List[A]] -> Observable[List[B]]`? – paulpdaniels Nov 11 '15 at 16:43
  • I want to supply a function that maps the value in a list to an observable value in a list. To a RxJS function that takes this above function, and an observable of a list of a value, and returns an observable that has been 'flatmapped' and 'waited for' for each of the values in the transformed list. I.e. I'm looking for 'traverseM' for Observables. – Henrik Nov 11 '15 at 17:16

2 Answers2

2

You could use source.flatMap(function(a_list){ return Rx.Observable.zip(a_list.map(f))})) for example.

source :: Observable List A
a_list :: List A
f :: A -> Observable B
a_list.map(f) :: List Observable B
Rx.Observable.zip :: List Observable B -> Observable List B
flatMap :: Observable List A -> (List A -> Observable List B) -> Observable List B

Rx.Observable.combineLatest should work the same instead of zip. flatMap could be replaced by Rx.Observable.flatMapLatest if it existed in the first place (only exists at instance level), but could be replaced by Rx.Observable.concatMap if you want to have some order respected.

So you can match the required signature but is that the logic you want? Zip will complete whenever one of the observables in List Observable B will complete. combineLatest will repeat previously emitted values for all observables in List Observable B except one. Do any one of these work for your use case? You could also engineer a version with zip which completes when all observables in the list complete : cf. RXJS: alternately combine elements of streams (remove the last line .concatMap(function (list) { return fromArray(list); })).

Cf. http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html for operators combining sequences.

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • I think this is correct, besides zip being used; this page https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md seems to imply zip would be of type `List Observable B -> Observable (B * B * ... * B) where ... is (bList.length - 3) number of Bs. – Henrik Nov 11 '15 at 17:13
  • About `zip` yes, you get out the same number of B that you passed in. If you don-t provide a selector function, what is returned is a list of the B, which in their terms is an array. So signature is `List Observable B -> Observable List B`. If you provide a selector function, you can also return an array of B there yourself. – user3743222 Nov 11 '15 at 19:36
  • Last note, if you think of List A as a row of As (column 0), the time dimension makes the columns. That means column `j` will be for each `a` in row `i`, the `j`th emitted value by the `i`th `Observable B`. The `zip` version here gives you the aforedescribed matrix column by column. The version you gave in your solution gives it row by row. – user3743222 Nov 11 '15 at 20:18
1

Here's how I solved it in the end, something I may very well PR to RxJS as 'traverse' and get more feedback on:

Rx.Observable.prototype.traverse = function(f) {
  return this.
    // await the value O[b list]
    concatMap(xs =>
      Rx.Observable.from(xs). // lift into Rx monad
        concatMap(x => f(x)). // flatten to O[b] in same order as xs
        toArray()); // map from O[b] to O[b list]
};

const myStream = // O[A list] -> O[B list]
  filesSubj.traverse(file => fileReader(file).asDataURL())
Henrik
  • 9,714
  • 5
  • 53
  • 87
  • Note that `toArray` will wait to have all the elements of the source, i.e. will wait for the source to complete before emitting. The array of B you emit in output might potentially then be out of order as varying A input will lead to observables with varying time to complete, while I suppose in this understanding of traverse, you might want to have a predictible order. That should easily be solved with `concatMap` though. – user3743222 Nov 11 '15 at 19:50
  • @user3743222 that's a really good point; more details http://fernandocejas.com/2015/01/11/rxjava-observable-tranformation-concatmap-vs-flatmap/ I've updated flatMap -> concatMap since f may produce seq of obss completing after differing intervals, but the outer flatMap only will every get a single Obs – Henrik Nov 12 '15 at 07:58
  • `the outer flatMap only will every get a single Obs` : why is that? The source is Observable List A, so you could have several List A right? If you want also to keep order between their traversal, you should use `concatMap` there too. In any case, if you want to propose your custom operator for the general case, you should document possible changes in the relative ordering between the source and the output, if they happen. – user3743222 Nov 12 '15 at 10:04