For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
deferred
object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise
was settled before chaining with then
or after. The state of promise cannot be changed after it was settled.
As the answer suggests, the initial choices are ReplaySubject
and AsyncSubject
.
For the given setup (a demo)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
This results in desirable behaviour:
subject.error('one');
subject.next('two');
Early error one
Late error one
This results in undesirable behaviour:
subject.error('one');
subject.next('two');
subject.complete();
Early error one
Late result two
This results in undesirable behaviour:
subject.next('two');
subject.complete();
subject.next('three');
Early result two
Late result three
The results from ReplaySubject
differ but are still inconsistent with expected results. next
values and error
errors are treated separately, and complete
doesn't prevent the observers from receiving new data. This may work for single next
/error
, the problem is that next
or error
may be called multiple times unintentionally.
The reason why first()
is used is because subscribe
s are one-time subscriptions, and I would like to remove them to avoid leaks.
How should it be implemented with RxJS observables?