5

I'd like to build a wrapper class, that does something before and after each emitted value of an Observable.

Here's what I came up with:

class Wrapper<T> {
    wrapped$: Observable<T>;

    _dataSubject = new Subject<T>();
    data$ = this._dataSubject.pipe(
        tap(_ => console.log("BEFORE"),
        //
        // map( ??? )
        //
    );

    constructor(wrapped$: Observable<T>) {
        this.wrapped$ = wrapped$.pipe(
            tap(_ => console.log("AFTER")
        );
    }
}

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")

The console output should be:

BEFORE
foo
AFTER

I can't figure out how to connect the $wrapped Observable with the _dataSubject.

But maybe I'm completely wrong and it needs a different approach.

Benjamin M
  • 23,599
  • 32
  • 121
  • 201
  • There's something quit odd with your code since you declare two sujects. You would never use the one created in _dataSubject with your current implementation. Thus, nothing will happen when you call next on your subject. What use case are you trying to achieve ? – Quentin Fonck Mar 22 '18 at 08:53
  • I know that my code isn't working at all, else I wouldn't have asked a question here :D I just wanted to show how I'd like to use the `Wrapper` class. Though the only really important lines are the last 4 (`let subject = ...` to `subject.next("foo");`) – Benjamin M Mar 22 '18 at 09:01

5 Answers5

12

What about something like this

import {Observable} from 'rxjs';

export class DoBeforeAfter<T> {
    wrapped$: Observable<T>;


    constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
        this.wrapped$ = Observable.of(null)
            .do(_ => console.log("BEFORE"))
            .switchMap(_ => wrapped$)
            .do(doFunction)
            .do(_ => console.log('AFTER'));
    }

}

to be consumed like this

const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));

beforeAfter.wrapped$.subscribe(
        null,
        error => console.error(error),
        () => console.log('DONE')

)

It looks a bit cumbersome, but maybe it can help

Picci
  • 16,775
  • 13
  • 70
  • 113
  • You're a genius. It's really that easy. Thank you. Sometimes it's impossible to see the wood for the trees. :D The only strage thing is `Observable.of(null)`. Doesn't look that clean, but at least it works as expected. – Benjamin M Mar 22 '18 at 09:13
  • I know the feeling of being deep into the trees – Picci Mar 22 '18 at 09:22
  • Why use `switchMap` here? – kenset Mar 14 '22 at 14:29
4

So, from what I understood, you can do something like this:

class Wrapper<T> {

    _dataSubject: Subject<T>;
    wrapped$: Observable<T>;

    constructor(wrapped$: Subject<T>) {
        this._dataSubject = wrapped$;
        this.wrapped$ = this._dataSubject.pipe(
          tap(_ => console.log("BEFORE")),
          tap(data => console.log(data)),
          tap(_ => console.log("AFTER"))
        );
    }
}

And then:

let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")
Quentin Fonck
  • 1,286
  • 9
  • 14
  • Yes, that's basically the same as Picci did. Works! – Benjamin M Mar 22 '18 at 09:24
  • Why did you undo my edit? There's no `asObservable` method you can use inside `pipe`. And additionally `pipe` already returns an `Observable` type (no `Subject`). – Benjamin M Mar 22 '18 at 09:53
  • You're right, it's not in the pipe, but it is a method on the subject. It is to prevent to do `wrapper.wrapped$.next('hello world');` And after testing it, it is no longer a subject at this point so you were right ^^ – Quentin Fonck Mar 22 '18 at 10:01
  • 1
    I know what you're trying to achieve, but still: `pipe` returns an `Observable` (even when you call `subject.pipe()`). The `pipe` function is part of the `Observable` class ( https://github.com/ReactiveX/rxjs/blob/master/src/internal/Observable.ts ), **not** part of the `Subject` class ( https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subject.ts ). Though your code will throw an error when using TypeScript. – Benjamin M Mar 22 '18 at 10:05
  • Yes you're right, I have just tested it in my current project. I forgot this part about the pipe function return value when I wrote my answer. I should test the code instead of making assumption when I post answer :) – Quentin Fonck Mar 22 '18 at 10:12
3

The best way (though complicated) is to create a new operator, similar to tap but that does something before and after the value is emitted.

You can see it working in the example (it's in ES6, as SO code snippets don't accept TypeScript, but you'll get the idea)

function wrap(before, after) {
    return function wrapOperatorFunction(source) {
        return source.lift(new WrapOperator(before, after));
    };
}
class WrapOperator {
    constructor(before, after) {
        this.before = before;
        this.after = after;
    }
    call(subscriber, source) {
        return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
    }
}

class WrapSubscriber extends Rx.Subscriber {
    constructor(destination, before, after) {
        super(destination);
        this.before = before;
        this.after = after;
    }
    _next(value) {
        this.before ? this.before(value) : null;
        this.destination.next(value);
        this.after ? this.after(value) : null;
    }
}

// Now:

const observable = Rx.Observable.from([1, 2, 3, 4]);

observable.pipe(
   wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));


// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:

observable.pipe(
  wrap(value => console.log('BEFORE REQUEST', value)),
  Rx.operators.flatMap(value => {
    const subject = new Rx.Subject();
    setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
    return subject;
  }),
  wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>

As stated, maybe a bit complicated, but it integrates seamlessly with RxJS operators and it is always a good example to know how to create our own operators :-)

For what you say in your comment, you can check the last example. There, I combine two wrap operators. The first one only uses the before callback, so it only executes something before a value is been emitted. As you see, because the source observable is from an array, the four before callbacks are executed immediately. Then, we apply flatMap. To it, we apply a new wrap, but this time with just the after callback. So, this callback is only called after the observables returned by flatMap yield their values.

Of course, if instead an observable from an array, you'd have one made from an event listener, you'd have:

  1. before callback is executed just before the event fired is pushed by the observable.
  2. The asynchronous observable executes.
  3. The asynchronous observable yields values after a time t.
  4. The after callback is executed.

This is where having operators pays off, as they're easily combined. Hope this suits you.

Oscar Paz
  • 18,084
  • 3
  • 27
  • 42
  • Thank you. Looks really complicated compared the the other answers. But it works well. – Benjamin M Mar 22 '18 at 09:29
  • The only downside is that `before` will wait to get executed until the wrapped Observable emits a value. Though if the wrapped observable takes 5 seconds, then `before` will get executed **after** the 5 seconds. Would be nice, if it could get executed **before** the wrapped observable emitted a value. – Benjamin M Mar 22 '18 at 09:49
  • Not sure if I understand. `before` is executed JUST before the value is emitted, and `after` JUST after. If the source observable is throttled, it'll still work like that. Not sure what exactly you mean – Oscar Paz Mar 22 '18 at 09:52
  • Difficult to explain. I'll try using a `Subject`. When you call `subject.next(...)`, then the `before` method should be instantly called. Now there might be an HTTP call (or some other long running function) within the `pipe` using for example `switchMap`. When this long running function is finished, the output of this function and the `after` function should be called. The result would then look like this: **1.** `BEFORE`, **2.** (a few seconds delay because there's some long running function), **3.** result of the long running function + `AFTER` – Benjamin M Mar 22 '18 at 10:26
  • 1
    Got it. Check my modifications. – Oscar Paz Mar 22 '18 at 11:04
0

Thanks for all your input!

I now came up with my own solution. The AFTER and DATA output are in the wrong order, but I figured out that this isn't important as long as they come at the same time

The code still needs some refactoring, but at least it works for now!

class Wrapper {
    _taskDoneSubject = new Subject<boolean>();
    taskDone$ = this._taskDoneSubject.asObservable();

    wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
        return switchMap<T, R>(val => { // can also be done using mergeMap
            this._taskDoneSubject.next(false);
            return wrapped(val).pipe(
                tap(_ => this._taskDoneSubject.next(true))
            );
        });
    }
}

Usage:

test(valueEmittingObservable: Observable<string>) {

    let wrapper = new Wrapper();
    wrapper.taskDone$.subscribe(val => console.log("task done? ", val);

    valueEmittingObservable.pipe(
        // wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
        wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
    ).subscribe(val => console.log("emitted value: ", val);

}

Output:

task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo

Or if it's emitting faster than 5 seconds:

task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
Benjamin M
  • 23,599
  • 32
  • 121
  • 201
0

If you wanted to do this without a class using RxJs 6:

return of(null)
    .pipe(tap(_ => {
        console.log('BEFORE');
    }))
    .pipe(switchMap(_ => wrappedObservable))
    .pipe(tap(_ => {
        console.log('AFTER');
    }))
kenset
  • 361
  • 2
  • 6