3

we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation but we didn't find a suitable solution for fixed batch size.

Sample of the involved beans

@Configuration
public class SampleConfiguration {

  ...

  @Bean
  MessagingTemplate messagingTemplate(ApplicationContext context) {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setBeanFactory(context);
    return messagingTemplate;
  }

  @Bean
  IntegrationFlow sftpSource() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost("localhost");
    factory.setPort(22);
    factory.setUser("foo");
    factory.setPassword("foo");
    factory.setAllowUnknownKeys(true);
    SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
    return IntegrationFlow
        .from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
                .remoteDirectory("upload")
                .patternFilter("*.csv")
                .maxFetchSize(1),
            spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
                .autoStartup(true))
        .split(Files
            .splitter()
            .markers()
            .charset(StandardCharsets.UTF_8)
            .firstLineAsHeader("fileHeader")
            .applySequence(true))
        .filter(payload -> !(payload instanceof FileSplitter.FileMarker))
        .enrichHeaders(h -> h.errorChannel("errorChannel"))
        .handle((String payload, MessageHeaders headers) -> {
          String header = headers.get("fileHeader", String.class);
          String rowWithHeader = header + "\n" + payload;
          try (StringReader reader = new StringReader(rowWithHeader)) {
            CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
                .withType(MyPojo.class)
                .withSeparator(';')
                .build();
            return beanReader.iterator().next();
          }
        })
        .handle(MongoDb
            .outboundGateway(mongoTemplate)
            .entityClass(MyPojo.class)
            .collectionNameFunction(m -> "mypojo")
            .collectionCallback(
                (collection, message) -> {
                  MyPojo myPojo = (MyPojo) message.getPayload();
                  Document document = new Document();
                  mongoTemplate.getConverter().write(myPojo, document);
                  return collection.insertOne(document);
                }))
        .channel("logChannel")
        .get();
  }

  @Bean
  IntegrationFlow logFiles() {
    return IntegrationFlow
        .from("logChannel")
        .handle(message -> log.info("message received: {}", message))
        .get();
  }

  @Bean
  IntegrationFlow logErrors() {
    return IntegrationFlow
        .from("errorChannel")
        .handle(message -> {
          MessagingException exception = (MessagingException) message.getPayload();
          log.error("error message received: {} for message {}", exception.getMessage(),
              exception.getFailedMessage());
        })
        .get();
  }
  ...
}

Update - aggregation step

.aggregate(aggregatorSpec -> aggregatorSpec
            .correlationStrategy(message -> message
                .getHeaders()
                .get(FileHeaders.REMOTE_FILE))
            .releaseStrategy(group -> group.size() >= 5)
            .groupTimeout(200L)
            .sendPartialResultOnExpiry(true))
.handle(MongoDb...)
tommaso.normani
  • 260
  • 2
  • 12

1 Answers1

1

I think you need to look into a FileAggregator which is able to collect lines into a list according to the FileMarkerReleaseStrategy: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator.

Then yes, you can cast a payload into a List in your collectionCallback() and perform batch insert into a MongoDB.

UPDATE

To be able to chunk from the aggregator using the same correlation key (file name), we need to remove already released group from the store. See AggregatorSpec.expireGroupsUponCompletion() option and respective docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks for the hint. I took a look into the link provided, but I am under the impression that this `FileAggregator` processes the file all at once. In our scenario we have to process potentially huge files and we would like to create chunks and save those in MongoDB using bulk write operations. Is `FileAggregator`useful in this scenario? – tommaso.normani May 24 '23 at 15:37
  • OK. Then it is not. You can use a regular aggregator then. Probably the `FileHeaders.FILENAME` header still could be a good correlation key. Then you can aggregated processed lines into some chunks. If there is not enough items for the group to be fulfilled, you can use a `groupTimeout` option to release whatever you have so far: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#agg-and-group-to – Artem Bilan May 24 '23 at 15:47
  • I have updated the question by adding the `aggregation` before the writing step. It seems to me that after the first chunk creation, no more chunks are aggregated. Should I need a new group for every chunk (I think it require a different `correlation key` if I've understood it properly)? Thanks – tommaso.normani May 25 '23 at 14:06
  • 3
    See `AggregatorSpec.expireGroupsUponCompletion()` option. So, you don't need a new correlation key for each chunk: the released group will be removed and you are good to build a new one. See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy – Artem Bilan May 25 '23 at 14:17
  • 1
    Thank you, it works now. Could you edit your answer with the details of your last comment so I can mark it as Accepted? – tommaso.normani May 25 '23 at 14:50
  • Added an UPDATE into my answer. – Artem Bilan May 25 '23 at 15:01