1

Currently I have code like this:

testReports() {
  this.dataSeries.forEach((x, index) => {
    setTimeout(() => {
      x.Status = FileStatus.PENDING;

      this._service.validateReport(x.Location).subscribe({
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      });
    }, index * 1500)
  });
}

I was curious if there was a better way to do this with rxjs. I do not know know the number of observables obtained from 'this.dataSeries' as they essentially strings of locations that may change from an end user input. They are obtained and then the 'this._service.validateReport(x.Location).subscribe...' kicks off to get data. It all works but number of 1500 milliseconds is more of a guess and sometimes the thread is further along or not. Is there a better way to do some operation if you do NOT know the observables statically?

I looked up this answer: How to make one Observable sequence wait for another to complete before emitting? and concat seemed promising but I was not sure if you could use that with some method predetermined to make an array of observables before executing it. Any help is much appreciated.

djangojazz
  • 14,131
  • 10
  • 56
  • 94
  • Do you really need to do the requests sequentially, or just to know when they all complete or there's an error? If the latter, see forkJoin() – GreyBeardedGeek Jun 05 '23 at 18:06
  • @GreyBeardedGeek Unfortunately the middle tier is hooked up to an SSRS WCF service, yuck. So it literally kicks me off if I try to do too many operations concurrently. So going sequential is important for now. – djangojazz Jun 05 '23 at 18:20

2 Answers2

1

You can map your array of dataSeries to an array of the corresponding requests and then use concat to execute them all sequentialy.

testReports() {
  //map each element of the array to the request observable
  const requests = this.dataSeries.map((x) => 
    this._service.validateReport(x.Location).pipe(
      tap({
        subscribe: () => x.Status = FileStatus.PENDING,
        next: y => this.convertResponseToGridView(y, x),
        error: () => console.error('Issues in validation')
      })
    )
  );
  
  //execute them all sequentially
  concat(...requests).subscribe();
}
akotech
  • 1,961
  • 2
  • 4
  • 10
1

Despite it being "scarry" at first, having higher order observable (big word to say that you have an observable of observable) is actually often the best approach for those situations.

Another issue with your code is that you're mixing up imperative and reactive programming. Having a subscribe in a method is always a code smell. If that function is called more than once, you could end up with race conditions.

So the first thing here is to actually transform your method, into a subject. This way, you can emit an event from that observable saying hey, please test the reports.

public testReports$$ = new Subject<void>();

And from the view we can have something like this:

<button (click)="testReports$$.next()">Test report</button>

Now that we've got a way to be warn when there's an event, we need to react to this. For that, we will subscribe to our subject from the constructor.

constructor() {
  this.testReports$$.subscribe()
}

The first thing to think of here, is that for any subscribe, we must make sure we don't forget to unsubscribe to avoid memory leaks. To have that working in a reactive way, you can take advantage of the recent inject function and DestroyRef token to create a reusable function that looks like this:

export function untilDestroyed() {
  const subject = new Subject();

  inject(DestroyRef).onDestroy(() => {
    subject.next(true);
    subject.complete();
  });

  return <T>(obs: Observable<T>) =>
    obs.pipe(takeUntil<T>(subject.asObservable()));
}

Put it somewhere shared as you'll be able to reuse it.

Then we can update our stream on which we just subscribed:

constructor() {
  this.testReports$$.pipe(untilDestroyed()).subscribe()
}

Excellent, so far we know we'll keep our event stream open until the component is destroyed and then shut it down.

Next, we know we'll have to deal with an observable, so we have the choice between switchMap, mergeMap, concatMap, exhaustMap pretty much. I'm guessing here that if a user clicks twice on the button while the checks are still running, we should just redo everything all over again. That's what switchMap is for:

constructor() {
  this.testReports$$.pipe(
    switchMap(() => TODO),
    untilDestroyed()
  ).subscribe()
}

Then, let's focus on the bulk of the logic. I'll share the code and go over it, bear with me:

from(
  this.dataSeries.map((dataSerie) =>
    validateReport(dataSerie.Location).pipe(
      tap((res) => convertResponseToGridView(res, dataSerie)),
      catchError((e) => {
        console.error(`Issues in validation`, e);
        return EMPTY;
      })
    )
  )
).pipe(concatAll())

The main idea here is to loop on the data series and create for each entry a cold observable that will be ready to validate the report once subscribed to.

We then wrap all that up in a from, which will take the array of observables we've just built, and turn it into an observable, of observables. Effectively going from Observable<Array<DataSerie>> to Observable<Observable<DataSerie>>.

Finally, we use the higher order operator concatAll to take our inner Observable<DataSerie> and subscribe to them, one after the other (waiting for each to finish before subscribing to next!).

Here's the final code for our component:

@Component({
  selector: 'app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
  standalone: true,
})
export class AppComponent {
  public testReports$$ = new Subject<void>();

  private dataSeries: DataSerie[] = [
    { Location: { x: 0, y: 0 } },
    { Location: { x: 1, y: 1 } },
    { Location: { x: 2, y: 2 } },
  ];

  constructor() {
    this.testReports$$
      .pipe(
        switchMap(() =>
          from(
            this.dataSeries.map((dataSerie) =>
              validateReport(dataSerie.Location).pipe(
                tap((res) => convertResponseToGridView(res, dataSerie)),
                catchError((e) => {
                  console.error(`Issues in validation`, e);
                  return EMPTY;
                })
              )
            )
          ).pipe(concatAll())
        ),
        untilDestroyed()
      )
      .subscribe();
  }
}

Have a look to this live demo where I've mocked the data. Open up the console and click on the button "Test report". See that all the requests are done one after another.

maxime1992
  • 22,502
  • 10
  • 80
  • 121
  • 1
    Akotech and I replied at the same time. But Akotech's answer is simpler and I'd go for that. – maxime1992 Jun 05 '23 at 19:19
  • Yeah good call on the destroy. This was just me spit balling and having a rough draft more than a final release. – djangojazz Jun 05 '23 at 22:43
  • I'm curious on your example though if I ever want to do a cancel even and invoke a destroy if it's simpler since you are an outer and an inner collection. Sometimes I would want to say: "Stop the presses!" with a button and just stop everything. I wonder if that invoke may be simpler. – djangojazz Jun 06 '23 at 00:54
  • 1
    You could put a takeUntil on Akotech's answer too. I don't think in my case it adds much flexibility for this use case – maxime1992 Jun 06 '23 at 06:48