0

I'm trying to implement SSE in my test application. The server side has been set up and I'm just using the endpoint (api/v1/sse/document). The scenario is that when I do a scan, the scan should appear in my test client as well as my main application. The problem is that it only appears on my test client when I refresh the page. I've been writing some code but I'm still able to automatically update the events in my test client.

this is my code that I wrote

sse.service.ts

[document-list.component.ts]

public ngOnInit(): void {
        this.getDocuments();
        this.registerServerSentEvent();
    }

    public ngOnDestroy(): void {
        this.closeServerSentEvent();
    }

    /**
     * getDocuments: call the api service to get the list of documents.
     */
    public getDocuments(): void {
        this.aService.getDocuments().subscribe((documents: Documents[]) => {
            this.documents = documents;
        });
    }

    /**
     * markDocumentAsProcessed: call the api service to mark the document as processed.
     */
    public markDocumentAsProcessed(document: Documents): void {
        this.aService.markDocumentProcessed(document.document.id).subscribe({
            next: () => {
                // Remove the processed document from the list
                this.documents = this.documents.filter((doc) => doc.document.id !== document.document.id);
                this.closeDialog();
            },
            error: (error) => {
                console.log("markDocumentProcessed Error:", error);
                // Handle the error here
            },
        });
    }

    /**
     * showDetails: call the api service to get the document image and display it in a dialog.
     */
    public showDetails(document: Documents): void {
        this.aService.getDocumentImage(document.document.id).subscribe((image: Blob) => {
            const url = window.URL.createObjectURL(image);
            const safeUrl: SafeUrl = this.sanitizer.bypassSecurityTrustUrl(url);
            this.selectedDocument = {...document, imageDataURL: safeUrl};
            this.displayDialog = true;
        });
    }

    /**
     * closeDialog: close the dialog after pressing the process button.
     */
    private closeDialog(): void {
        this.displayDialog = false;
        this.selectedDocument = null;
    }

    private registerServerSentEvent(): void {
        const sseUrl = `${this.aService.config.url}api/v1/sse/document`;
        this.sseSubscription = this.sseService.getServerSentEvent(sseUrl).subscribe((event: MessageEvent) => {
            const documentEvent = JSON.parse(event.data);
            const eventType = documentEvent.type;
            const eventData = documentEvent.data;

            switch (eventType) {
                case "NewDocument": {
                    // Process new document event
                    break;
                }

                case "ViewDocument": {
                    // Process view document event
                    break;
                }

                case "WatchlistStatusUpdate": {
                    // Process watchlist status update event
                    break;
                }

                case "DocumentProcessed": {
                    // Process document processed event
                    const processedDocumentId = eventData.documentId;
                    this.updateProcessedDocument(processedDocumentId);
                    break;
                }
            }
        });
    }

    private updateProcessedDocument(processedDocumentId: string): void {
        // Find the processed document in the documents list
        const processedDocumentIndex = this.documents.findIndex((doc) => doc.document.id === processedDocumentId);
        if (processedDocumentIndex !== -1) {
            // Remove the processed document from the list
            this.documents.splice(processedDocumentIndex, 1);
            // Update any other UI-related logic or perform additional actions as needed
        }
    }

    private closeServerSentEvent(): void {
        if (this.sseSubscription) {
            this.sseSubscription.unsubscribe();
            this.sseService.closeEventSource();
        }
    }
}

and this is my service that gets the document and etc

[a.service.ts]

public getDocuments(): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http
            .get<Documents[]>(`${url}api/v1/document/`, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * markDocumentProcessed: calls the api service to mark the document as processed.
     *
     */
    public markDocumentProcessed(documentId: string): Observable<Documents[]> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        const requestBody = {
            DocumentId: documentId,
        };

        return this.http
            .post<Documents[]>(`${url}api/v1/document/processed`, requestBody, {
                headers: {
                    authorization: this.config.authorization,
                },
            })
            .pipe(
                map((data) => {
                    return data;
                }),
                catchError((error) => {
                    throw error;
                })
            );
    }

    /**
     * getDocumentImage: calls the api service to get the document image.
     *
     */
    public getDocumentImage(documentId: string): Observable<Blob> {
        let url = this.config.url;
        if (!url.endsWith("/")) {
            url += "/";
        }

        return this.http.get(`${url}api/v1/document/${documentId}/image/Photo`, {
            responseType: "blob",
            headers: {
                authorization: this.config.authorization,
            },
        });
    }

I get no errors in the console, I get a 200 with the endpoint but my test client doesn't recieve the events automatically and I have to refresh to see the changes.

Marc Glisse
  • 7,550
  • 2
  • 30
  • 53
mksk23
  • 5
  • 3

1 Answers1

0

The problem is that you are using the SSE endpoint as a regular GET endpoint. Of course you'll get updates only if you request them, in your case by reloading the page.

This is what you are doing (taken from your SSE service screenshot):

public getServerSentEvent(url: string): Observable<MessageEvent> {
  const token = this.aService.config.authorization;
  // WARNING - SSE doesn't have an API for Headers, this won't work if used as proper SSE
  const headers = new HttpHeaders({
    Authorization: token,
  });

  return new Observable<MessageEvent>((observer) => {
    // HERE - you are not creating the SSE Event Source, just getting from it
    this.http.get(url, {headers, responseType: "text"}).subscribe({
      //...
    })
  })
}

So you call HTTP GET on your SSE endpoint and then fake it to look like an SSE event. Which doesn't work as you can see.

You should open a new EventSource and this will actually work as SSE is supposed to:

// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
  // Close event source if current instance of SSE service has some
  if (this.eventSource) {
    this.closeSseConnection();
    this.eventSource = null;
  }
  // Open new channel, create new EventSource - provide your own URL
  this.eventSource = new EventSource(this.yourSSEurl);

  // Process default event
  this.eventSource.onmessage = (event: MessageEvent) => {
    this.zone.run(() => this.processSseEvent(event));
  };

  // Add your own EVENTS handling...
  /*
    enum SSE_EVENTS [
      NewDocument: 'NewDocument',
      //...
    ]
  */
  Object.keys(SSE_EVENTS).forEach(key => {
    this.eventSource.addEventListener(SSE_EVENTS[key], event => {
      this.zone.run(() => this.processSseEvent(event));
    });
  });

  // Process connection opened
  this.eventSource.onopen = () => {
    this.reconnectFrequencySec = 1;
  };

  // Process error
  this.eventSource.onerror = (error: any) => {
    this.reconnectOnError();
  };
}

See my old answer: Handling SSE reconnect on error. There you can find pretty much working SSE service with everything you need.

mat.hudak
  • 2,803
  • 2
  • 24
  • 39