0

We have a usecase where we need to read data from some paginated API and write to some downstream Kafka topic.

We have been able to implement the solution via spring batch integration remote partitioning where the manager takes care of partitioning the task by creating executionContext which containes the page number and offset to read data for. The manager creates this executionContext and puts them on the messagingChannel(I can use rabbitMQ and Kafka topic whichever provides a solution). The workers(more than 1) pick that executionContext from the messagingChannel and complete the task of reading the data from the API and writing it to the required Kafka topic.

The above implementation works just fine. This also works fine if I run the same job for different clients one after another. The challenge comes when we want to run the same job for multiple clients in parallel. For example, we launch the jobs for 2 clients in parallel. It creates 1 manager and 2 workers for each client. Now the issue comes when both of the managers pushed the executionContext on the same messagingChannel and workers don't know which one to pick and execute. Also, both the jobs share the same database spring batch tables, so I suspect it would create problems at that level as well.

Any input or references on how to implement running multiple spring batch reporter partitioning jobs in parallel.

Update[18 Jan 2022]

I tried adding @StepScoped to MessageChannelPartitionHandler at here and below is the error I am getting:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'partitioningMessageHandler': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:101) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1821) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getObjectForBeanInstance(AbstractAutowireCapableBeanFactory.java:1266) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.resolveTargetBeanFromMethodWithBeanAnnotation(AbstractMethodAnnotationPostProcessor.java:536) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:154) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:912) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at spring.batch.integration.Manager.main(Manager.java:11) ~[main/:na]
Caused by: java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
    at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.validateFallbackMethods(MessagingMethodInvokerHelper.java:751) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:740) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:294) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.<init>(MethodInvokingMessageListProcessor.java:63) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.<init>(MethodInvokingMessageGroupProcessor.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:211) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:198) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:186) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:60) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:171) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 20 common frames omitted
GauravKumar
  • 46
  • 1
  • 8

1 Answers1

1

In such a setup, the MessageChannelPartitionHandler should be step-scoped. There is a note about that in the Javadoc:

Note: The reply channel for this is instance based.
Sharing this component across multiple step instances may result in the
crossing of messages. It's recommended that this component be step or job scoped.

Making this bean step-scoped should fix the issue.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • We are not using the `MessageChannelPartitionHandler` explicitly using remotePartitioningManagerStepBuilderFactory.build() method to create one. Also, instead of `MessageChannel` we are using `DirectChannel`. Do you see an issue with this as well. If possible, can you refer me to any example which supports running multiple jobs in parallel with remote partioning? – GauravKumar Jul 07 '21 at 11:13
  • Also, the Javadoc you mentioned talks about the reply channel but won't we have an issue with 2 manager instances sending the request for client A and client B on the same queue? The workers are also specific to clients and do not understand how to process executionContext from another client request. – GauravKumar Jul 07 '21 at 11:23
  • Please share your code and show how you run two jobs in parallel to cause the issue and I will try to help. – Mahmoud Ben Hassine Jul 07 '21 at 11:41
  • The latest code is available at [here](https://github.com/gkumarpatel/spring-batch-parallel-jobs/blob/master/spring-batch-parallel-jobs/manager/src/main/java/spring/batch/integration/configuration/ManagerConfiguration.java) where I tried adding @StepScope to `MessageChannelPartitionHandler`. But it is not able to assign the partitionHandler to `aggregatorFactoryBean.setProcessorBean(partitionHandler)` because the bean is step scoped as is not available at this point. – GauravKumar Jan 18 '22 at 08:13
  • could you please have a look and help me out configuring MessageChannelPartitionHandler with @StepScope? – GauravKumar Jan 19 '22 at 05:11