32

Can effect wait two actions like Promise.all? Example:

@Effect()
pulic addUser() {
   return this.actions$.ofType(user.ADD)
      .switchMap(() => {
         return this.userService.add();
      })
      .map(() => {
         return new user.AddSuccessAction();
      });
}

@Effect()
pulic addUserOptions() {
   return this.actions$.ofType(userOptions.ADD)
      .switchMap(() => {
         return this.userOptionsService.add();
      })
      .map(() => {
         return new userOptions.AddSuccessAction();
      });
}

@Effect()
public complete() {
   return this.actions$.ofType(user.ADD_SUCCESS, userOptions.ADD_SUCCESS)
      // how to make it works like Promise.all ?
      .switchMap(() => {
         return this.statisticService.add();
      })
      .map(() => {
         return new account.CompleteAction();
      });
}

UPDATED What I want to achieve is simillar behavior to Promise.all. How to dispatch two effects in parallel, wait until all the effects are resolved, then dispatch a third action. Something like https://redux-saga.js.org/docs/advanced/RunningTasksInParallel.html With promises it was quite obviouse:

Promise.all([fetch1, fetch2]).then(fetch3);

Is it possible in ngrx/effects? Or is it a wrong way in ngrx/effects?

ANSWER

There are few options which you can use:

1) Do not use generic actions.

Follow these rules from Myke Ryan's presentation: https://youtu.be/JmnsEvoy-gY

Pros: easier to debug

Cons: tons of boilerplate and actions

2) Use complex stream with nested actions.

Check this article: https://bertrandg.github.io/ngrx-effects-complex-stream-with-nested-actions/

Here is simple example for two actions:

@Effect()
public someAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        map((action: actions.SomeAction) => action.payload),
        mergeMap((payload) => {
            const firstActionSuccess$ = this.actions$.pipe(
                ofType(actions.FIRST_ACTION_SUCCESS),
                takeUntil(this.actions$.pipe(ofType(actions.FIRST_ACTION_FAIL))),
                first(),
            );

            const secondActionsSuccess$ = this.actions$.pipe(
                ofType(actions.SECOND_ACTION_SUCCESS),
                takeUntil(this.actions$.pipe(ofType(actions.SECOND_ACTION_FAIL))),
                first(),
            );

            const result$ = forkJoin(firstActionSuccess$, secondActionsSuccess$).pipe(
                first(),
            )
                .subscribe(() => {
                    // do something
                });

            return [
                new actions.FirstAction(),
                new actions.SecondAction(),
            ];
        }),
    );
}

Pros: you can achieve what you want

Cons: complex stream is too complex to support :) looks ugly and may quickly become to hell, observables won't unsubscribe until succes or fail actions, it means that in theory any third-party actions can emit signals to these observables.

3) Use aggregator pattern.

Check Victor Savkin's presentation about State Management Patterns and Best Practices with NgRx: https://www.youtube.com/watch?v=vX2vG0o-rpM

Here is simple example:

First you need to create actions with correlationId param. CorrelationId should be uniq, it may be some guid for example. This ID you will use in your chain of actions to identify your actions.

export class SomeAction implements Action {
    public readonly type = SOME_ACTION;

    constructor(public readonly correlationId?: string | number) { }
    // if you need payload, then make correlationId as a second argument
    // constructor(public readonly payload: any, public readonly correlationId?: string | number) { }
}

export class SomeActionSuccess implements Action {
    public readonly type = SOME_ACTION_SUCCESS;

    constructor(public readonly correlationId?: string | number) { }
}

export class FirstAction implements Action {
    public readonly type = FIRST_ACTION;

    constructor(public readonly correlationId?: string | number) { }
}

export class FirstActionSuccess implements Action {
    public readonly type = FIRST_ACTION_SUCCESS;

    constructor(public readonly correlationId?: string | number) { }
}

// the same actions for SecondAction and ResultAction

Then our effects:

@Effect()
public someAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        mergeMap((action: actions.SomeAction) => {
            return [
                new actions.FirstAction(action.corelationId),
                new actions.SecondAction(action.corelationId),
            ];
        }),
    );
}

@Effect()
public firstAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.FIRST_ACTION),
        switchMap((action: actions.FirstAction) => {
            // something
            ...map(() => new actions.FirstActionSuccess(action.correlationId));
        }),
    );
}
// the same for secondAction

@Effect()
public resultAction(): Observable<Action> {
    return this.actions$.pipe(
        ofType(actions.SOME_ACTION),
        switchMap((action: actions.SomeAction) => {
            const firstActionSuccess$ = this.actions$.pipe(
                ofType(actions.FIRST_ACTION_SUCCESS),
                filter((t: actions.FirstActionSuccess) => t.correlationId === action.correlationId),
                first(),
            );

            const secondActionsSuccess$ = this.actions$.pipe(
                ofType(actions.SECOND_ACTION_SUCCESS),
                filter((t: actions.SecondActionSuccess) => t.correlationId === action.correlationId),
                first(),
            );

            return zip(firstActionSuccess$, secondActionsSuccess$).pipe(
                map(() => new actions.resultSuccessAction()),
            )
        }),
    );
}

Pros: the same as point 2, but no third-party actions.

Cons: the same as point 1 and 2

4) Do not use effects for API. Use good old services which emulate effects but return Observable.

In you service:

public dispatchFirstAction(): Observable<void> {
    this.store.dispatch(new actions.FirstAction(filter));

    return this.service.someCoolMethod().pipe(
        map((data) => this.store.dispatch(new actions.FirstActionSuccess(data))),
        catchError((error) => {
            this.store.dispatch(new actions.FirstActionFail());

            return Observable.throw(error);
        }),
    );
}

So you can combine it anywhere later, like:

const result1$ = this.service.dispatchFirstAction();
const result2$ = this.service.dispatchSecondAction();

forkJoin(result1$, result2$).subscribe();

5) Use ngxs: https://github.com/ngxs/store

Pros: less boilerplate, this feels like angular stuff, it grows fast

Cons: has got less features than ngrx

E. Efimov
  • 443
  • 1
  • 5
  • 10
  • 1
    Your question is not clear. Can you provide more details ? – Harshit Jun 16 '17 at 17:34
  • I am trying to achieve the same. Were you able to do so? i also looked at the video where it makes use of zip operator but i am not able to achieve the result – Karan Garg Jul 07 '18 at 14:29
  • Hi @KaranGarg. I updated my answer. – E. Efimov Jul 18 '18 at 22:22
  • Hello there, I see you have added an answer(s) right into the body of your question. It's better to create an own answer(s) instead, since this is how this site works. – Neurotransmitter Jul 03 '20 at 12:01
  • "Use complex stream with nested actions", I don't find it is a "really complex" stream, it perfects fit my need where others don't really pleased me: 1: I can not maintain all that duplicated with no real "good" reasons. 2: . 3: I don't want to add this correlation logic which will require to manage their lifecycles. 4 and 5: I love NgRx too much for that. Thank you very much @E.Efimov ! – Hadrien TOMA Jan 07 '21 at 12:39

4 Answers4

15

I am new to RXJS but what about this.

You can remove {dispatch: false} if you change the tap to a switchMap.

@Effect({dispatch: false})
public waitForActions(): Observable<any> {
    const waitFor: string[] = [
        SomeAction.EVENT_1,
        SomeAction.EVENT_2,
        SomeAction.EVENT_3,
    ];

    return this._actions$
        .pipe(
            ofType(...waitFor),
            distinct((action: IAction<any>) => action.type),
            bufferCount(waitFor.length),
            tap(console.log),
        );
}
Neurotransmitter
  • 6,289
  • 2
  • 51
  • 38
codeBelt
  • 1,727
  • 16
  • 22
  • 1
    How can I 'restore' this effect? If I emit 3 actions for the first time, it works, but when I do it the second time it won't trigger. – bartosz.baczek Mar 21 '19 at 10:38
  • 1
    This could probably be from the distinct. Is it needed? ofType is already distinct by definition – Nico Jan 07 '20 at 18:30
  • @Nico without `distinct` it will fire if there was at least one of `waitFor` actions dispatched in in last `waitFor.length`, not **all** of them. – Neurotransmitter Jul 03 '20 at 15:39
  • @bartosz.baczek Check out my answer its worked not just once. https://stackoverflow.com/a/64464568/9092944 – Moshe Yamini Oct 21 '20 at 13:50
10

This worked for me in ngrx 8

waitFor2Actions$ = createEffect(() =>
    combineLatest([
      this.actions$.pipe(ofType(actions.action1)),
      this.actions$.pipe(ofType(actions.action2)),
    ]).pipe(
      switchMap(() => ...),
    )
  );
Moshe Yamini
  • 608
  • 9
  • 13
  • 1
    this won't work. Initially, it will wait for the both the actions to be dispatched but then it will trigger again and again even if one of the action emits a value(i.e. dispatches). – RollerCosta Feb 24 '23 at 09:52
5

Using Observable.combineLatest works for me.

@Effect()
  complete$ = this.actions$.ofType<Action1>(ACTION1).combineLatest(this.actions$.ofType<Action2>(ACTION2),
    (action1, action2) => {

      return new Action3();
    }
  ).take(1);

take(1) results in dispatching Action3() only once.

deumax
  • 59
  • 1
  • 4
2

Another combineLatest version with pipes and switchMap

import { Observable, of } from 'rxjs'
import { combineLatest, switchMap, withLatestFrom } from 'rxjs/operators'

@Effect()
someEffect$: Observable<Actions> = this.actions$.pipe(
  ofType(Action1),
  combineLatest(this.actions$.ofType(Action2)),
  switchMap(() => of({ type: Action3 }))
)
Roman Rhrn Nesterov
  • 3,538
  • 1
  • 28
  • 16