0

I want to define a flow that writes to MongoDB, and only on success writes the IDs to Kafka. I'm using the JavaDSL, and I'd wish to have a FlowBuilder class that defines my pipeline at a high level. I'm searching for the features that will enable me to write a flow such as:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .process(writeToMongo) // <-- Searching for this kind of function
      .handle(writeToKafka)
      .get();
}

I've seen that Apache Camel works exactly like this and I wonder if Spring Integration has a simple and good solution to this basic problem either.

user7551211
  • 649
  • 1
  • 6
  • 25

1 Answers1

1

What you are looking for is a publishSubscribeChannel() with its capabilities to have several subscribers. By default, without an executor configured on the channel, the next subscriber is going to be called only after the previous and only if this one has succeeded.

It may look similar to what you express with that process():

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

Another option would be a gateway(), but then you need to to return something from there to continue. In Spring Integration if no reply, the flow is just stopped. It doesn't have a concept of reusing in for out if no out.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thank you. I've seen [in the docs](https://docs.spring.io/spring-integration/reference/html/gateway.html#reactor-mono) that there is a reference for Reactor when `gateway()` is explained, but without any `IntegrationFlows` usage. Would you recommend building a Reactive flow in such a way? Or shall I keep on the same way that you wrote here? – user7551211 Jan 21 '22 at 12:52
  • And if I may I'd like to ask another question: [I've seen here](https://stackoverflow.com/questions/50393906/spring-integration-java-dsl-dynamically-create-integrationflows?rq=1) that you wrote that the java DSL in Spring Integration is out of support. Is it true? I've seen that the [Java DSL project](https://github.com/spring-projects/spring-integration-java-dsl) is really inactive – user7551211 Jan 21 '22 at 12:56
  • I don't think the Reactor is relevant somehow in your question. You asked about a Camel's process equivalent, and I explained you how it is possible with Spring Integration. I think gateway with Reactor deserves its own SO question. – Artem Bilan Jan 21 '22 at 14:44
  • Yeah... Looks like that my message is a bit unclear. See here: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference. The point is that Java DSL is now fully a part of `spring-integration-core`. In the past it was a separate dependency to include. – Artem Bilan Jan 21 '22 at 14:47
  • I'm glad to see that (as a Java DSL user). I've opened another question [for my architectural concerns](https://stackoverflow.com/questions/70803881/main-processing-flow-programmatic-approach-when-using-spring-integration-with-pr). – user7551211 Jan 21 '22 at 15:40
  • I'm reading your answer to the second question and it makes me wonder: wouldn't `.handle(writeToMongo)` the perfect solution for this question? Seems like the equivalent of `.process`. Or does `.handle` will run even if I fail with the previous parts of the flow? – user7551211 Jan 21 '22 at 21:01
  • 1
    No. It won't. If you fail for the current message, then it is an error. Consider it like a sequential method calls in some of you own logic. Yes, you can treat that `handle()` as a Camel's process, only with a difference that Spring Integration don't go to the next endpoint if you return `null` for the current one. – Artem Bilan Jan 21 '22 at 21:14