16

For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:

const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise was settled before chaining with then or after. The state of promise cannot be changed after it was settled.


As the answer suggests, the initial choices are ReplaySubject and AsyncSubject.

For the given setup (a demo)

var subject = new Rx.AsyncSubject;
var deferred = subject.first();

deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

This results in desirable behaviour:

subject.error('one');
subject.next('two');

Early error one

Late error one

This results in undesirable behaviour:

subject.error('one');
subject.next('two');
subject.complete();

Early error one

Late result two

This results in undesirable behaviour:

subject.next('two');
subject.complete();
subject.next('three');

Early result two

Late result three

The results from ReplaySubject differ but are still inconsistent with expected results. next values and error errors are treated separately, and complete doesn't prevent the observers from receiving new data. This may work for single next/error, the problem is that next or error may be called multiple times unintentionally.

The reason why first() is used is because subscribes are one-time subscriptions, and I would like to remove them to avoid leaks.

How should it be implemented with RxJS observables?

Estus Flask
  • 206,104
  • 70
  • 425
  • 565

2 Answers2

3

You are probably looking for a Rx.ReplaySubject(1) (or an Rx.AsyncSubject() depending on your use case).

For a more detailed explanation of subjects, see What are the semantics of different RxJS subjects?.

Basically, a subject can be passed around by reference, like a deferred. You can emit values (resolve would be an 'next' (Rxjs v5) or 'onNext' (Rxjs v4) followed by 'complete' or 'onCompleted()') to it, as long as you hold that reference.

You can have any amount of subscribers to a subject, similar to the then to a deferred. If you use a replaySubject(1), any subscribers will receive the last emitted value, which should answer your it doesn't matter if deferred.promise was settled before chaining with then or after.. In Rxjs v4, a replaySubject will emit its last value to a subscriber subscribing after it has completed. I am not sure about the behaviour in Rxjs v5.

Update

The following code executed with Rxjs v4 :

var subject = new Rx.AsyncSubject();
var deferred = subject;

deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});

subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

produces the following output:

First result one
Second result one

However, the same code executed with Rxjs v5 does not :

First result one
Second result four

So basically that means that subjects' semantics have changed in Rxjs v5!!! That really is a breaking change to be aware of. Anyways, you could consider moving back to Rxjs v4, or use the turnaround suggested by artur grzesiak in his answer. You could also file an issue on the github site. I would believe that the change is intentional, but in the advent it is not, filing the issue might help clarify the situation. In any case, whatever behaviour chosen should definitely be documented properly.

The question about subjects' semantics features a link showing the async subject in relation with multiple and late subscription

Community
  • 1
  • 1
user3743222
  • 18,345
  • 5
  • 69
  • 75
  • Thank you, finally, a use case for subjects. So the only difference between replay and async in this case is that replay subscriptions will be triggered synchronously, and async will be triggered on the next tick, isn't it? I've tried [replay subject here](http://plnkr.co/edit/gQQwiZFumXSVFy2JFTbI), added `first` to clear subscriptions. The problem I see is that deferred resolution can be (accidentally) changed with `next`, it would be different with promises. How can it be treated? Please, update the answer if possible. – Estus Flask Jul 01 '16 at 14:13
  • I already answered that, the equivalent of 'resolving' which by the way is a terminology that does not fit observables, would be next followed by complete. Read the link on subjects, it helps understand. Once an subject is complete, all the following next are ignored. For the difference between async and replay, read the link and the documentation – user3743222 Jul 02 '16 at 02:27
  • Like 99% the behavior is a bug in RxJS5 - created issue: https://github.com/ReactiveX/rxjs/issues/1800. Still to mirror `resolve-reject-promise` structure of `deferred` properly you need some sort of additional control mechanism. – artur grzesiak Jul 04 '16 at 05:59
  • Alright I saw the issue seems to be already solved. Now I would be curious to know if that impacts other areas of Rxjs v5. Anyways, back to the additional control mechanism you mention, why would that be? Once a subject is completed ('resolved') you can send anything else through it, that will be ignored. Same for erroring ('reject'). Errors are final, you send another error or else, it won't go through. And you don't have to use 'first' as it is done here, because calling `complete` does that already. So basically the control mechanism you mention is already built in. – user3743222 Jul 05 '16 at 20:11
  • No - you still need it as you do not want to pass the whole `deferred` around. You create `deferred` and share only the `observable/promise`. But only those who have access to the `deferred` itself can `resolve/reject` it. With a good discipline you could use `.asObservable()` and pass it around, but definitely not the whole `AsyncSubject`. – artur grzesiak Jul 06 '16 at 04:14
  • I understand that the whole point of a 'deferred' object is to be passed around by reference to any function whose responsibility is to 'resolve' or reject it (which I would term as a 'producer') and does not have the deferred in its scope. For any function who is only a 'consumer' of the deferred value, `.asObservable()` precisely does the job as you say of hiding the subject's observer interface and prevent unwanted side-effects. – user3743222 Jul 09 '16 at 17:34
  • The implementation of subject is such that if you have several producers (i.e. 'resolvers'), the second and subsequent 'resolve' or 'error' calls will fail SILENTLY. If that is not a desirable behaviour, then an extra guard is useful, for example to throw an exception, or at least issue a warning as you have in your proposed code. Otherwise I think that regular behaviour of subject and the use of `.asObservable()` is enough. Now it is also true that an explicit guard or wrapped-up promise-like API around subject is never going to hurt and I am big believer in defensive programming. – user3743222 Jul 09 '16 at 17:45
  • Last thing I want to say, is though I do not doubt at all that estus has a good reason to use an observable/subject here, but there are a number of cases where using directly a promise instead of a surrogate object is a better choice. Promises can be bridged to observable and the other way around. Cf. `.toPromise()` in Rxjs 4 : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/topromise.md. The operator seems to exist also in Rxjs 5 http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-toPromise – user3743222 Jul 09 '16 at 17:49
  • Thanks for the detailed answer. The comments are pretty much informative as well. I keep promises in mind, probably will use `toPromise` to sort this out. – Estus Flask Jul 11 '16 at 13:53
  • I have a feeling that @arturgrzesiak is right on his assumptions that it is a bug that may be fixed later. I hope that it is so, `next` after `complete` doesn't make much sense to me. I will keep my eye on rxjs 5 development. – Estus Flask Jul 11 '16 at 13:56
2

As @user3743222 wrote AsyncSubject maybe used in deferred implementation, but the thing is it has to be private and guarded from multiple resolves / rejects.

Below is a possible implementation mirroring resolve-reject-promise structure:

const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}

// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));

// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);

// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>
artur grzesiak
  • 20,230
  • 5
  • 46
  • 56