we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation
but we didn't find a suitable solution for fixed batch size.
Sample of the involved beans
@Configuration
public class SampleConfiguration {
...
@Bean
MessagingTemplate messagingTemplate(ApplicationContext context) {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(context);
return messagingTemplate;
}
@Bean
IntegrationFlow sftpSource() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
.remoteDirectory("upload")
.patternFilter("*.csv")
.maxFetchSize(1),
spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
.autoStartup(true))
.split(Files
.splitter()
.markers()
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker))
.enrichHeaders(h -> h.errorChannel("errorChannel"))
.handle((String payload, MessageHeaders headers) -> {
String header = headers.get("fileHeader", String.class);
String rowWithHeader = header + "\n" + payload;
try (StringReader reader = new StringReader(rowWithHeader)) {
CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
.withType(MyPojo.class)
.withSeparator(';')
.build();
return beanReader.iterator().next();
}
})
.handle(MongoDb
.outboundGateway(mongoTemplate)
.entityClass(MyPojo.class)
.collectionNameFunction(m -> "mypojo")
.collectionCallback(
(collection, message) -> {
MyPojo myPojo = (MyPojo) message.getPayload();
Document document = new Document();
mongoTemplate.getConverter().write(myPojo, document);
return collection.insertOne(document);
}))
.channel("logChannel")
.get();
}
@Bean
IntegrationFlow logFiles() {
return IntegrationFlow
.from("logChannel")
.handle(message -> log.info("message received: {}", message))
.get();
}
@Bean
IntegrationFlow logErrors() {
return IntegrationFlow
.from("errorChannel")
.handle(message -> {
MessagingException exception = (MessagingException) message.getPayload();
log.error("error message received: {} for message {}", exception.getMessage(),
exception.getFailedMessage());
})
.get();
}
...
}
Update - aggregation step
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message
.getHeaders()
.get(FileHeaders.REMOTE_FILE))
.releaseStrategy(group -> group.size() >= 5)
.groupTimeout(200L)
.sendPartialResultOnExpiry(true))
.handle(MongoDb...)