9

I'm creating few microservices using nestjs.

For instance I have x, y & z services all interconnected by grpc but I want service x to send updates to a webapp on a particular entity change so I have considered server-sent-events [open to any other better solution].

Following the nestjs documentation, they have a function running at n interval for sse route, seems to be resource exhaustive. Is there a way to actually sent events when there's a update.

Lets say I have another api call in the same service that is triggered by a button click on another webapp, how do I trigger the event to fire only when the button is clicked and not continuously keep sending events. Also if you know any idiomatic way to achieve this which getting hacky would be appreciated, want it to be last resort.

[BONUS Question]

I also considered MQTT to send events. But I get a feeling that it isn't possible for a single service to have MQTT and gRPC. I'm skeptical of using MQTT because of its latency and how it will affect internal message passing. If I could limit to external clients it would be great (i.e, x service to use gRPC for internal connections and MQTT for webapp just need one route to be exposed by mqtt). (PS I'm new to microservices so please be comprehensive about your solutions :p)

Thanks in advance for reading till end!

Nikhil.Nixel
  • 555
  • 2
  • 11
  • 25

4 Answers4

15

You can. The important thing is that in NestJS SSE is implemented with Observables, so as long as you have an observable you can add to, you can use it to send back SSE events. The easiest way to work with this is with Subjects. I used to have an example of this somewhere, but generally, it would look something like this

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

Pardon the pseudo-code nature of the above, but in general it does show how you can use Subjects to create observables for SSE events. So long as the @SSE() endpoint returns an observable with the proper shape, you're golden.

Jay McDoniel
  • 57,339
  • 7
  • 135
  • 147
  • 1
    is there any way to isolate that event subject to a particular user? For SSE in controller I'm doing something like @Sse('/status/update/:userId'). Thinking I should use a map. Any better solutions? – Nikhil.Nixel Apr 21 '21 at 20:02
  • The map doesn't sound like a terrible idea to me. I don't know much else about sse filtering, so it might be something to look into – Jay McDoniel Apr 21 '21 at 20:06
  • 1
    @Nikhil.Nixel have you found any solution to isolate for a particular user? Currently in the same situation as you are – Mohaimin Oct 23 '21 at 10:44
  • 1
    I did something like this, @Sse('/status/update/:userId') then I could store subjects in map with key as userId, if you have large number of users its not ideal for the RAM since it will keep hogging those spaces if you have too many subjects because of too many users – Nikhil.Nixel Oct 23 '21 at 14:05
  • @Mohaimin look the above comment – Nikhil.Nixel Oct 23 '21 at 14:05
  • I don't think SSE is ideal for these cases for isolating to specific entity. You could have better luck with using plain old sockets @Mohaimin – Nikhil.Nixel Oct 23 '21 at 14:20
  • How did you send back data for that specific key(userId)? And about the socket implementation, yeah thinking that too but at this point I'm just curious :P – Mohaimin Oct 23 '21 at 18:28
  • it returns the ids for me, but I don't use ids in my code. I want to know, how to create custom observables. – Bennison J Feb 27 '23 at 10:25
  • @BennisonJ Observables are just another construct, like Promises, to handle Async code. There are several creation operators like `of`, `from`, and `timer` depending on your needs. – Jay McDoniel Feb 27 '23 at 16:05
  • Actually, I am new to observable, can you explain to me how to create custom observables in nest js sse? – Bennison J Feb 27 '23 at 19:07
  • @BennisonJ by using a [creation operator](https://www.learnrxjs.io/learn-rxjs/operators/creation) – Jay McDoniel Feb 27 '23 at 20:46
  • @JayMcDoniel I have been facing an problem nests when using observable in sse, I asked that as a question on this platform, kindly check that and if know solution let me know https://stackoverflow.com/questions/75607348/server-sent-event-sse-with-nestjs – Bennison J Mar 01 '23 at 18:04
15

There is a better way to handle events with SSE of NestJS:

Please see this repo with code example:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

Where basically you have a service:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe(channel: string) {
        return fromEvent(this.emitter, channel);
    }

    emit(channel: string, data?: object) {
        this.emitter.emit(channel, {data});
    }

}

Obviously, channel can be any string, as recommendation use path style.

For example: "events/for/<user_id>" and users subscribed to that channel will receive only the events for that channel and only when are fired ;) -

Fully compatible with @UseGuards, etc. :)

Additional note: Don't inject any service inside EventsService, because of a known bug.

NingaCodingTRV
  • 312
  • 2
  • 7
0

Yes, this is possible, instead of using interval, we can use event emitter. Whenever the event is emitted, we can send back the response to the client.

notification.controller.ts

import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';

import { Sse, Controller, Res } from '@nestjs/common';

@Public()
@Controller()
export class NotificationController {
  constructor(private notificationService: NotificationService) {}
  @Sse('notifications')
  async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
    return await this.notificationService.handleConnection();
  }
}

notification.service.ts

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';

@Injectable()
export class NotificationService {
  notificationEvent: Subject<any> = new Subject();
  async handleConnection() {
    setInterval(() => {
      this.notificationEvent.next({ data: { message: 'Hello World' } });
    }, 1000);
    return this.notificationEvent.asObservable();
  }
}
  • here I used the setInterval, just to emit a new event. You can emit the new event in your code wherever you want without the setInterval method.
  • I found some problem with this approach, and I shared the link below, check the below link as well. sse in nestJs
Bennison J
  • 414
  • 1
  • 12
-1
  @Sse('sse-endpoint')
  sse(): Observable<any> {
    //data have to strem
    const arr = ['d1','d2', 'd3']; 
    return new Observable((subscriber) => {
        while(arr.len){
            subscriber.next(arr.pop()); // data have to return in every chunk
        }
        if(arr.len == 0) subscriber.complete(); // complete the subscription
    });
  }
  • Answers should include an explanation of why a code snippet is a solution, please edit this answer to add some context – hardillb May 13 '22 at 10:00