I am testing out using Spring Integration to tie together disperate modules within the same Spring-Boot application, for now, and services into a unified flow starting with a single-entry point.
I am looking for the following clarifications with Spring Integration if possible:
- Is the below code the right way to structure flows using the DSL?
- In "C" below, can i bubble up the result to the "B" flow?
- Is using the DSL vs. the XML the better approach?
- I am confused as to how to correctly "terminate" a flow?
Flow Overview
In the code below, I am just publishing a page to a destination. The overall flow goes like this.
- Publisher flow listens for the payload and splits it into parts.
- Content flow filters out pages and splits them into parts.
- AWS flow subscribes and handles the part.
- File flow subscribes and handles the part.
Eventually, there may be additional and very different types of consumers to the Publisher flow which are not content which is why I split the publisher from the content.
A) Publish Flow (publisher.jar):
This is my "main" flow initiated through a gateway. The intent, is that this serves as the entry point to begin trigger all publishing flows.
- Receive the message
- Preprocess the message and save it.
- Split the payload into individual entries contained in it.
- Enrich each of the entries with the rest of the data
- Put each entry on the output channel.
Below is the code:
@Bean
IntegrationFlow flowPublish()
{
return f -> f
.channel(this.publishingInputChannel())
//Prepare the payload
.<Package>handle((p, h) -> this.save(p))
//Split the artifact resolved items
.split(Package.class, Package::getItems)
//Find the artifact associated to each item (if available)
.enrich(
e -> e.<PackageEntry>requestPayload(
m ->
{
final PackageEntry item = m.getPayload();
final Publishable publishable = this.findPublishable(item);
item.setPublishable(publishable);
return item;
}))
//Send the results to the output channel
.channel(this.publishingOutputChannel());
}
B) Content Flow (content.jar)
This module's responsibility is to handle incoming "content" payloads (i.e. Page in this case) and split/route them to the appropriate subscriber(s).
- Listen on the publisher output channel
- Filter the entries by Page type only
- Add the original payload to the header for later
- Transform the payload into the actual type
- Split the page into its individual elements (blocks)
- Route each element to the appropriate PubSub channel.
At least for now, the subscribed flows do not return any response - they should just fire and forget but i would like to know how to bubble up the result when using the pub-sub channel.
Below is the code:
@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
return MessageChannels.publishSubscribe("assetPublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
return MessageChannels.publishSubscribe("pagePublisherChannel").get();
//return MessageChannels.queue(10).get();
}
@Bean
IntegrationFlow flowPublishContent()
{
return flow -> flow
.channel(this.publishingChannel)
//Filter for root pages (which contain elements)
.filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
//Put the publishable details in the header
.enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
//Transform the item to a Page
.transform(PackageEntry.class, PackageEntry::getPublishable)
//Split page into components and put the type in the header
.split(Page.class, this::splitPageElements)
//Route content based on type to the subscriber
.<PageContent, String>route(PageContent::getType, mapping -> mapping
.resolutionRequired(false)
.subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
.subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
.defaultOutputToParentFlow())
.channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}
C) AWS Content (aws-content.jar)
This module is one of many potential subscribers to the content specific flows. It handles each element individually based off of the routed channel published to above.
- Subscribe to the appropriate channel.
- Handle the action appropriately.
There can be multiple modules with flows that subscribe to the above routed output channels, this is just one of them.
As an example, the the "contentPageChannel" could invoke the below flowPageToS3 (in aws module) and also a flowPageToFile (in another module).
Below is the code:
@Bean
IntegrationFlow flowAssetToS3()
{
return flow -> flow
.channel(this.assetChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<PageContent>handle((p, h) ->
{
return this.publishS3Asset(p);
})));
}
@Bean
IntegrationFlow flowPageToS3()
{
return flow -> flow
.channel(this.pageChannel)
.publishSubscribeChannel(c -> c
.subscribe(s -> s
.<Page>handle((p, h) -> this.publishS3Page(p))
.enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
.handle(this.s3MessageHandler())));
}