2

How can I implement an internal event bus to do async operations in a webflux spring stack?

I want a service to emit an event:

@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
  }
}

And a different component, not known by the publisher service, should be able to decide to react on that event.

@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
  override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
    // do stuff
  }
}

In a MVC application I would use ApplicationEventPublisher to publish the event (publishEvent) and @EventListener+@Async on the handler (onDeleteEntry).

What is the equivalent in a reactive stack?

The other option I think about is running an embedded messaging service, because that should imply async semantics. But this feels like a lot of overhead for a simple scenario.


I found these SO threads

but they don't answer this scenario, because they assume that the listener is known by the publisher. But I need loosely coupling.

I also found these spring issues

And specifically see this comment promising suggesting this:

Mono.fromRunnable(() -> context.publishEvent(...))

From what I understand I could then just use @EventListener since I am totally fine with not propagating the reactive context.

But can please someone explain the implications for the thread-bounding and if this is even legal in a reactive stack?


UPDATE

From testing it feels fine to do this:

@Service
class FeedServiceImpl(
  val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
  @EventListener
  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler started")
    runBlocking {
      // do stuff that takes some time
      delay(1000)
    }
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    entryRepository.deleteById(entryId)
    applicationEventPublisher.publishEvent(
      FeedEntryDeletedEvent(
        timestamp = time.utcMillis(),
        entryId = entryId,
      )
    )
    log.info("ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }
}

Note that handle is not a suspend function, because @EventListener must have a single argument and coroutines introduce the continuation parameter behind the scene. The handler then launches a new blocking coroutine scope which is fine because it is on a different thread because of the @Async.

Output is:

2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl  : ThreadId: 38
2021-05-13 12:15:20.755  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler started
2021-05-13 12:15:20.755  INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl   : Publisher done
2021-05-13 12:15:21.758  INFO 20252 --- [         task-1] ...FeedServiceImpl   : ThreadId: 54
2021-05-13 12:15:21.759  INFO 20252 --- [         task-1] ...FeedServiceImpl   : Handler done

UPDATE 2

The other approach without using @Async would be this:

  @EventListener
//  @Async
  override fun handle(e: FeedEntryDeletedEvent) {
    log.info("Handler start")
    log.info("Handler ThreadId: ${Thread.currentThread().id}")
    runBlocking {
      log.info("Handler block start")
      delay(1000)
      log.info("Handler block ThreadId: ${Thread.currentThread().id}")
      log.info("Handler block end")
    }
    log.info("Handler done")
  }

  override suspend fun deleteEntry(entryId: Long) {
    feedRepository.deleteById(entryId)
    Mono.fromRunnable<Unit> {
      applicationEventPublisher.publishEvent(
        FeedEntryDeletedEvent(
          timestamp = time.utcMillis(),
          entryId = entryId,
        )
      )
    }
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe()
    log.info("Publisher ThreadId: ${Thread.currentThread().id}")
    log.info("Publisher done")
  }

2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher ThreadId: 38
2021-05-13 13:06:54.503  INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl  : Publisher done
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler start
2021-05-13 13:06:54.504  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler ThreadId: 53
2021-05-13 13:06:54.505  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block start
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block ThreadId: 53
2021-05-13 13:06:55.539  INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl  : Handler block end
2021-05-13 13:06:55.540  INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl  : Handler done

However, I still don't understand the implications for the application under load and it feels wrong to mix reactive operations with handlers that do runBlocking { }.

Stuck
  • 11,225
  • 11
  • 59
  • 104

1 Answers1

7

Reactor offers Sink. You can use it like an event bus. Have a look at the following example.

@Configuration
public class EventNotificationConfig {

    @Bean
    public Sinks.Many<EventNotification> eventNotifications() {
        return Sinks.many().replay().latest();
    }

}

You create a Bean of a Sink in a @Configuration class. This can be used to emit new events and it can be turned into a Flux for subscribers.

@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {

    private final @NonNull Sinks.Many<EventNotification> eventNotifications;


    /**
     * Provide a flux with our notifications.
     *
     * @return a Flux
     */
    public Flux<EventNotification> getNotifications() {
        return eventNotifications.asFlux();
    }

    /**
     * Emit a new event to the sink.
     *
     * @param eventId
     * @param status
     * @param payload
     */
    public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
        eventNotifications.tryEmitNext(EventNotification.builder()
          .eventId(eventId)
          .status(status)
          .payload(payload).build());
    }

}

You can keep a maximum of one Sink instance in your application. Subscribing to different kinds of events can be achieved with filters that the various subscribers can apply to the Flux.


@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {

    private final @NonNull NotificationUsecase notificationUsecase;


    /**
     * Start listening to events as soon as class EventListener
     * has been constructed.
     *
     * Listening will continue until the Flux emits a 'completed'
     * signal.
     */
    @PostConstruct
    public void init() {

        this.listenToPings()
                .subscribe();
        this.listenToDataFetched()
                .subscribe();
    }


    public Flux<EventNotification> listenToPings() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
                .doOnNext(notification -> log.info("received PING: {}", notification));
    }

    public Flux<EventNotification> listenToDataFetched() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {}", notification));
    }
}

    
    public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
        return this.notificationUsecase.getNotifications()
                .subscribeOn(Schedulers.boundedElastic())
                .flatMap(notification -> reactiveMongoRepository
                    .saveAndReturnNewObject(notification)
                    .doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
                    .zipWith(Mono.just(notification)))
                .map(tuple->tuple.getT2())
                .filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
                .doOnNext(notification -> log.info("received data: {} - saved ", notification));
    }

Emitting new events is equally simple. Just call the emit-method:



notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);


Erunafailaro
  • 1,835
  • 16
  • 21
  • 1
    How can a component implement a listener for a specific event and another component a listener for a different event? How can components push their different events to the bus? Also I would be interested in how this integrates with messaging e.g. spring cloud stream? – Stuck Sep 03 '21 at 06:02
  • 1
    I added some examples on how to subscribe to different kinds of notifications and on how to emit new ones. Could you please be more specific with your question concerning Spring Cloud? My answer basically makes use of Flux, which is a very basic Spring Flux api component. How to use it together with cloud stream is explained in their documentation. But if you have a special use case, please ask. – Erunafailaro Sep 03 '21 at 06:42
  • Thanks for the examples. How does a `@Component` call `listenToPings` to trigger listening? Regarding cloud integration I am fine with considering this out of scope for the question. But I am interested in how this solution relates to the threading model compared to the `@EventListener` + `runBlocking`? In my shown approach the publisher shifts off to the `boundedElastic` scheduler, meaning that the publisher is not blocked and there is a thread for each published event, but multiple handlers for the same event share the same thread. How does it behave in your proposed approach? – Stuck Sep 03 '21 at 07:08
  • I expanded the listener example. On way to trigger listening is to use @PostConstruct. As long as the is not "complete" signal emitted to the sink, the listening will continue as long as the application runs. You can choose other ways of course, but this one fits to a lot of use cases, I think. – Erunafailaro Sep 03 '21 at 07:33
  • Threading is always an issue you should pay particular attention to. WebFlux offers a lot of ways to tackle this question. Here, I just added .subscribeOn() to the listener implementation to make it use a thread pool. But also the subscribers might use their own thread pools. So my answer to the question of how to handle threading is: try the tools that WebFlux offers. They can be very helpful. – Erunafailaro Sep 03 '21 at 07:33
  • 1
    Thanks. In the last comment you mean "also the [~~subscribers~~ -> publishers] might use their own thread pools", right? – Stuck Sep 03 '21 at 07:36
  • Yes, I meant the publishers. – Erunafailaro Sep 03 '21 at 08:46
  • 1
    Great example @Erunafailaro. Thank you. I want to ask a follow up question on your example. What if the listener is charged with taking the data it received and persisting some new objects. The result of the persisting of new objects is a Flux. How do you fit that in with the current context of Flux? I mean Flux is not connected to anything? – Bjorn Harvold Feb 21 '22 at 02:30
  • @BjornHarvold , see above the method `listenToDataFetchedAndWriteToDatabase()`- if I got you right, that should do the trick. – Erunafailaro Feb 21 '22 at 16:51