2

I have a springboot with   springdata  mongodb application where I am connecting to mongo change stream to save the changes to a audit collection.  My application is running multiple instances (2 instances) and will be scaled up to n number instances when the load increased.   When records are created in the original collection (“my collection”), the listeners will be triggered in all running instances and creates duplicate records.  Following is my setup

build.gradle

…
// spring data mingodb version 3.1.5
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
…

Listener config

@Configuration
@Slf4j
public class MongoChangeStreamListenerConfig {

  @Bean
  MessageListenerContainer changeStreamListenerContainer(
      MongoTemplate template,
      PartyConsentAuditListener consentAuditListener,
      ErrorHandler errorHandler) {

    MessageListenerContainer messageListenerContainer =
        new MongoStreamListenerContainer(template, errorHandler);

    ChangeStreamRequest<PartyConsentEntity> request =
        ChangeStreamRequest.builder(consentAuditListener)
            .collection("my-collection")
            .filter(newAggregation(match(where("operationType").in("insert", "update", "replace"))))
            .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
            .build();
    messageListenerContainer.register(request, MyEntity.class, errorHandler);
    log.info("mongo stream listener is registered");
    return messageListenerContainer;
  }

  @Bean
  ErrorHandler getLoggingErrorHandler() {
    return new ErrorHandler() {
      @Override
      public void handleError(Throwable throwable) {
        log.error("error in creating audit records {}", throwable);
      }
    };
  }
}

Listener container

public class MongoStreamListenerContainer extends DefaultMessageListenerContainer {

  public MongoStreamListenerContainer(MongoTemplate template, ErrorHandler errorHandler) {
    super(template, Executors.newFixedThreadPool(15), errorHandler);
  }

  @Override
  public boolean isAutoStartup() {
    return true;
  }
}

ChangeListener

@Component
@Slf4j
@RequiredArgsConstructor
/**
 * This class will listen to mongodb change stream and process changes. The onMessage will triggered
 * when a record added, updated or replaced in the mongo db.
 */
public class MyEntityAuditListener
    implements MessageListener<ChangeStreamDocument<Document>, MyEntity> {


  @Override
  public void onMessage(Message<ChangeStreamDocument<Document>, MyEntity > message) {
    var update = message.getBody();
    log.info("db change event received");
    if (update != null) {
      log.info("creating audit entries for id {}", update.getId());
      // This will execute in all the instances and creating duplicating records

    }
  }
}

Is there a way to control the execution on one instance at a given time and share the load between nodes?. It would be really nice to know if there is a config from spring data mongodb to control the flow.

Also, I have checked the following post in stack overflow and I am not sure how to use this with spring data. Mongo Change Streams running multiple times (kind of): Node app running multiple instances

Any help or tip to resolve this issue is highly appreciated. Thank you very much in advance.

dmca
  • 31
  • 3

0 Answers0