I have the following helper rxjs operator:
import { share } from 'rxjs/operators';
export const shareResetOnError = <T>() => share<T>({
resetOnError: true,
resetOnComplete: false
});
I also have the following spec for this operator:
import { Observable } from 'rxjs';
import { shareResetOnError } from './rxjs';
fdescribe('shareResetOnError', () => {
it('should share last emitted value', async () => {
const expectedValue = 123;
let count = 0;
const observable = new Observable(subscriber => {
count++;
subscriber.next(-expectedValue);
subscriber.next(expectedValue);
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(count).toBe(1);
});
it('should reset value on error', async () => {
const expectedError = new Error('test');
const expectedValue = 123;
let expectError = true;
let errorsCount = 0;
let valuesCount = 0;
const observable = new Observable(subscriber => {
if (expectError) {
errorsCount++;
subscriber.error(expectedError);
} else {
valuesCount++;
subscriber.next(expectedValue);
}
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 4; i++) {
await expectAsync(observable.toPromise()).toBeRejectedWithError(expectedError.message);
}
expect(errorsCount).toBe(4);
expectError = false;
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(valuesCount).toBe(1);
});
});
For some reason expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue)
is failing because observable
is resolved to undefined instead of expectedValue
. I've also tried lastValueFrom
instead of toPromise
but it makes no difference. Before switching from rxjs 6 to 7 I had the following definition for shareResetOnError
:
import { AsyncSubject, ConnectableObservable, Observable, pipe, Subscription } from 'rxjs';
import { refCount } from 'rxjs/operators';
function publishLastResetOnError<T>() {
return (source: Observable<T>) => {
let subject: AsyncSubject<T>;
let subscription: Subscription;
resetSubject();
return new ConnectableObservable(source, () => subject);
function resetSubject() {
subscription?.unsubscribe();
subject = new AsyncSubject<T>();
subscription = subject.subscribe({
error: resetSubject
});
}
};
}
export const shareResetOnError = <T>() => pipe(publishLastResetOnError<T>(), refCount());
It was working as expected and spec wasn't failing. Why observable.toPromise()
is not resolved to expected value with rxjs 7 operator?