4

I have a spec-compliant ECMAScript Observable, specifically from the wonka library. I am trying to convert this type of observable to an rxjs 6 observable with no luck.

It seems this may have been possible with rxjs 5. I have tried this:

import { pipe, toObservable, fromArray } from 'wonka';
import { from } from 'rxjs';
...
from(
  pipe(
    fromArray([1, 2, 3]),
    toObservable
  )
);

I get this error in the browser:

ERROR TypeError: You provided an invalid object where a stream was expected. 
You can provide an Observable, Promise, Array, or Iterable.

and then this:

Argument of type 'observableT<any>' is not assignable to parameter
of type 'ObservableInput<any>'

in the Visual Code dialog.

I can convert it to a zen-observable by doing this:

npm i zen-observable
npm i --save-dev @types/zen-observable
import { from } from 'zen-observable';
...
getObservable(): Observable<any> {
  return from(
    pipe(
      fromArray([1, 2, 3]),
      toObservable
    ) as any) as any;
}

However, a zen-observable is not the same thing, and does not allow me to use all the rxjs methods etc...

How to I convert this to an rxjs observable?

Thanks J

Jonathan
  • 3,893
  • 5
  • 46
  • 77

3 Answers3

10

Write a conversion function that takes advantage of the fact that the subscription APIs are identical:

const zenToRx = <T>(zenObservable: Zen.Observable<T>): Rx.Observable<T> =>
  new Rx.Observable(
    observer => zenObservable.subscribe(observer)
  );
backtick
  • 2,685
  • 10
  • 18
0

Modifying backtick's answer, got it to work:

return new Observable((observer: any) => {
  pipe(
    fromArray([1, 2, 3]),
    toObservable
  ).subscribe(observer);
});
Jonathan
  • 3,893
  • 5
  • 46
  • 77
0

For some reason, none of the other answers, as of 08 Jun 2023, worked for me.

I ended up writing my own little injectable service class.

Here it is:

// Class: ZenToRxService (injectable service)
// Author: UChin Kim
// Date: 08 Jun 2023.
//
// Notes: The idea here is to "convert" a Zen Observable to an Rx
// Observable, so that you can use nifty Rx features, like pipes and
// utilities that make use of pipes, such as @ngneat/until-destroy.
//
// Strategically, we mirror the Zen Observable's events onto an
// Rx Observable. Easiest way to do this is to use the Rx Subject type, which
// is both an Observable and an event emitter. We create our own Zen
// subscription, where we simply emit the event value using the Subject, which
// is, again, both an event emitter and an Observable.
//
// You can take the resulting Subject<T> and handle it as an Rx Observable<T>.
// Or, you could handle it as an Rx Subject<T>. Whatever floats your boat :).
//
// The advantage of Rx Observables/Subjects over Zen Observables is that you
// can use the Rx pipes, and subsequently, do nifty stuff like use
// @ngneat/until-destroy to automatically clean up subscriptions, without
//  needing to maintain an explicit reference to them.
//
// Note that we automatically unsubscribe our custom Zen subscription when
// the Subject<T> object is unsubscribed or terminated.
//
// You are responsibile  for cleaning up the subscription that you obtain on
// the returned Subject<T> object.
//
// I assume you already know how to inject an Angular service into a component.
//
//
import { Injectable } from '@angular/core';
import { Subject, finalize } from 'rxjs';
import { Observable } from 'zen-observable-ts';

@Injectable({
  providedIn: 'root',
})
export class ZenToRxService {
  convertZenObservable<T>(zenObservable: Observable<T>): Subject<T> {
    const subject = new Subject<T>();
    const zenSubscription = zenObservable.subscribe((e: T) => subject.next(e));

    return subject.pipe(
      finalize(() => {
        zenSubscription.unsubscribe();
        console.log('zen subscription was unsubscribed!');
      })
    ) as Subject<T>;
  }
}
Gino
  • 1,593
  • 17
  • 22