0

I am working with RXJS and trying to deal with sequential operations. There was a similar question RxJS queueing dependent tasks and I would have posted a comment, but I'm new :) Although I have it nearly working, what I lack is the ability to take an action after a parent operation. In the procedural world, I would conceptualize this as a nested loop, where there is some processing on each iteration of the outer loop.

With the code below, it prints "processed parent" after each child where I wanted it to run after its children are all done. I believe this is due to how concatMap flattens everything so it loses track, but I don't see a good way to to keep track of when a single parent finishes. One not so good way I came up with was to use doOnCompleted(), however that doesn't appear to let you return an observable to keep chaining.

import {Observer, Observable} from 'rx';

let parents = [
  { children: ['a', 'b'] },
  { children: ['c', 'd'] }
];

let asyncCleanup = (data) => {
  return Observable.of(data)
    .do(() => console.log('cleaned up', data));
};

let processParent = (parent) => {
  let children = Observable.from(parent.children);
  return children
    .concatMap((child) => processChild(child))
    .concatMap(() => asyncCleanup(parent));
};

let processChild = (child) => {
  return Observable.from(child)
    .do(() => console.log('processed', child));
};

Observable.from(parents)
  .concatMap((parent) => processParent(parent))
  .subscribe(
    (data) => null,
    (err) => console.error(err),
    () => console.log('completed')
  );

Current output:
processed a
cleaned up { children: [ 'a', 'b' ] }
processed b
cleaned up { children: [ 'a', 'b' ] }
processed c
cleaned up { children: [ 'c', 'd' ] }
processed d
cleaned up { children: [ 'c', 'd' ] }
completed

Desired output:
processed a
processed b
cleaned up { children: [ 'a', 'b' ] }
processed c
processed d
cleaned up { children: [ 'c', 'd' ] }
completed

Updated: Found a solution to this by using toArray() before .concatMap(() => asyncCleanup(parent)). Also tried bufferWithCount(parent.children.length) which worked too and is a little better to read in my opinion.

Community
  • 1
  • 1
  • could you update your question with the expected output vs. the current output? It could just be that your log is erroneous, not your functions. – user3743222 Apr 12 '16 at 09:11
  • Also you could add `finally` at the end of `processParent(parent)` to do something when the observable completes, so you could use that in lieu of `.do(() => console.log('processed parent', parent));` – user3743222 Apr 12 '16 at 09:18
  • I added the current and desired output. I'm using the logs to help me trace how the observables are working since I'm not really sure how to debug them yet. I also tried finally(), and it worked like doOnCompleted, where a console.log works, but something returning another observable does not. In my real project, I wanted to be able to do some database cleanup steps at that point. – Josh Eilers Apr 12 '16 at 15:59
  • I found a solution that works for the purposes of this SO question, I'll have to see how well it works in practice. If I do a toArray() before the cleanup, it works. – Josh Eilers Apr 12 '16 at 16:09
  • `finally` also works if the observable terminates due to error, so you might want to see how that fits your use case – user3743222 Apr 12 '16 at 16:11

0 Answers0