2

I'm working on an application using Spring Boot 1.5.13.RELEASE and Spring Integration 4.3.16.RELEASE.

I'm pretty new to Spring Integration and I have encountered a problem.

So the basic idea is that on some external triggers (could be and HTTP call) I need to create an IntegrationFlow which will consume messages from a rabbitMQ queue, do some work with 'em and then (maybe) produce to another rabbitMQ endpoint.

Now this is supposed to happen a lot of times so I will have to create multiple IntegrationFlows.

I am using IntegrationFlowContext to register each one of the IntegrationFlows like this:

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

I have to clarify that this can be happening concurrently, creating multiple IntegrationFlows at the same time.

So each time I'm trying to create an IntegrationFlow, my "source" is a Gateway that looks like this:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

It's not a @Bean (yet) but I expect it to get registered when each IntegrationFlow is registered.

My "target" is an AmqpOutBoundAdapter that looks like this:

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}

Now this one IS a bean already and is injected each time I'm trying to create a flow.

And my flow(s) looks like this:

public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}

My problem is that while the application starts fine and creates the first IntegrationFlows,etc. later on, I'm getting this kind of exceptions:

java.lang.IllegalStateException: Could not register object [org.springframework.integration.transformer.MessageTransformingHandler#872] under bean name 'org.springframework.integration.transformer.MessageTransformingHandler#872': there is already object [org.springframework.integration.transformer.MessageTransformingHandler#872] bound

I even tried setting an id to each of the components used, which is supposed to be used as beanName , like this:

.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

But, even though bean name problems with components like .filter were resolved, I still get the same exception about a MessageTransformingHandler.


UPDATE

I didn't mention the fact that once each IntegrationFlow is done with its work, it is getting removed using the IntegrationFlowContext like this:

flowContext.remove(flowId);

So what seems to have (kind of) worked is synchronizing both the flow registration block and the flow removing block by using the same object as a lock.

So my class responsible for registering and removing flows looks like this:

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...

My problem now is this kind of lock is kinda heavy for the application, since I'm getting quite a lot:

...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

which doesn't happen as fast as I'd like to.

But I guess that is expected since each time a thread acquires the lock, it will take some time to either register the flow and all its components or deregister the flow and all its components.

Alexreve
  • 46
  • 1
  • 5

1 Answers1

0

You also have there this one:

.transform(Transformers.toJson(jsonObjectMapper))

How does it work if you add an .id() there as well?

On the other hand, since you say that this happens concurrently, I wonder if you can make some piece of your code synchonized, e.g. wrap that flowContext.registration(integrationFlow).id(callUUID).register();.

The bean definition and registration process is really not thread-safe and intended to be used only from the one, initializing thread in the beginning of application lifecycle.

We probably really need to make an IntegrationFlowContext as thread-safe in its register(IntegrationFlowRegistrationBuilder builder) function or, at least, its registerBean(Object bean, String beanName, String parentName) since this is exactly a place where we generate bean name and register it.

Feel free to raise a JIRA on the matter.

Unfortunately, Spring Integration Java DSL extension project is already out of support and we can add a fix only to the current 5.x generation. Nevertheless I believe that synchonized workaround should work here, therefore no need to back port it into the Spring Integration Java DSL extension.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks for your reply! I already tried adding `.id()` to all components including the other transformer but to no avail. I also tried using `synchronized` to my method responsible for registering the flows but that didn't work either (to my surprise also). Maybe I'm missing something. Will play around some more, and will make sure to write back in case I find a solution. – Alexreve May 17 '18 at 15:43
  • Maybe you can share more stack trace on the matter? I mean after your fixes for ids and with the `synchronized` – Artem Bilan May 17 '18 at 15:45
  • Sure! I will do it tomorrow as I'm currently unable to do so! – Alexreve May 17 '18 at 15:49
  • @Alexreve, the fix is here: https://github.com/spring-projects/spring-integration/pull/2448. No so simple, therefore I don't see how we could backport it. Would be great to have your code synchronized on the matter. – Artem Bilan May 17 '18 at 21:30
  • That's great! Though I'm not sure we can move to version 5.x.x right away. – Alexreve May 18 '18 at 10:23
  • OK. I think I'll revise my fix today to make it lock-free. At least when the `flow id` is provided. For you I'd suggest to revise your flows and place ids for all the endpoints you declare there to avoid the original conflict and leave your code lock-free as well. – Artem Bilan May 18 '18 at 13:34
  • Already tried that. Also tried setting **different locks** for `flowContext.registration(...).id(...).register();` and `flowContext.remove(...); ` but then I'm getting `java.util.ConcurrentModificationException: null` (I guess they both modify the same Collection or sth) – Alexreve May 18 '18 at 14:56
  • That's correct. I see what you mean. We need to use there a `ConcurrentMap`. That's one more race condition, indeed. Thanks! – Artem Bilan May 18 '18 at 14:58
  • Faced the same issue on a project that uses version 4.3.20.RELEASE. Only able to solve it by locking the registration process. – sikrip Sep 17 '20 at 06:56