1

I have a Observable from a server sent event which returns data und errors using the following solution Allow creation of Observable from EventSource (server-sent events): enhancement

To solve connection issues I want to separate the

const eventStream = new EventSource(evtSourceAdress); part into an separate Observable

if connection succeeds do further logic

using the solution from

streamHandler(evtSourceAdress: string) {

    return new Observable<MessageEvent<any>>(subscriber => {

      const eventStream = new EventSource(evtSourceAdress);

      eventStream.onmessage = e => subscriber.next(e);
      eventStream.onerror = e => subscriber.error(e);
      /* add if else clause if error is returned */


      return () => {
        if (eventStream.readyState === 1) {

          /* TODO add reconnect  */
          eventStream.close()
        }
      }


    })

  }

I modified the code to get the eventStream as an Observable

 openEventSource(url: string): Observable<any> {
    return new Observable<MessageEvent>(subscribe => {
       const sse = new EventSource(url);
      //  return sse.readyState
    })
    // return new Observable<MessageEvent>(subscriber => {
    //   const sse = new EventSource(url);
  }


this.eventStreamHandlerServ.openEventSource('http://localhost:3000/sse/event').subscribe(e => console.log(e))

which does not return anything

adding a return statement return sse.readyState

gives me Type 'number' is not assignable to type 'TeardownLogic'.

is there a way to only get the event as an observable ?

considered answers Creating an RxJS Observable from a (server sent) EventSource

Sator
  • 636
  • 4
  • 13
  • 34
  • Well, did you try to literally copy the code inside the linked github comment and paste it in your solution (without modifying it at all) and give that a go? – Octavian Mărculescu Sep 19 '22 at 12:34
  • the solution returns data and error from the eventsource not the eventsource itself which is what I want – Sator Sep 19 '22 at 12:41
  • 1
    Ah, okay. So you want the source, and then set up the `onmessage` and `onerror` handlers yourself? Why do you need an observable for that? Can't you just `return new EventSource(url);` and be done with it? – Octavian Mărculescu Sep 19 '22 at 12:44
  • underlying issue is that ` const sse = new EventSource(url);` always tries to connect , I want to try first if connection is available if so do further logic else try to reconnect if fails unsubscribe – Sator Sep 20 '22 at 08:45

1 Answers1

1

You can retry the connection using retry.

openEventSource(url: string): Observable<any> {
  return new Observable<any>((subscriber) => {
    const eventSource = new EventSource(url);
    eventSource.onmessage = (ev) => subscriber.next(ev.data);
    eventSource.onerror = (ev) => subscriber.error(ev);

    return () => eventSource.close();
  });
}

And in order to introduce the retry logic, you just use the retry operator mentioned above:

this.eventSourceService
  .openEventSource(url)
  // retry the connection 5 times, with 500 milliseconds delay
  .pipe(retry({ count: 5, delay: 500 }))
  .subscribe({
    next: (data) => {
      console.log('data from event source', data);
    },
    error: (error) => {
      console.log('event source has an error', error);
    },
  });
Octavian Mărculescu
  • 4,312
  • 1
  • 16
  • 29