I have integrated AWS SQS to run a batch job using spring batch 5 remote partitioning. I have used LocalStack for SQS service. My Worker configuration is working fine and Manager Configuration application is exiting without errors. I have referred below resources.
- https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotepartitioning/aggregating
- https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-partitioning
My Manager configuration as below.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.aws.outbound.SqsMessageHandler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageHandler;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
@Profile("manager")
@Configuration
public class ManagerIntegrationFlowConfiguration {
/**
* Configure outbound flow (requests going to workers)
* <p>
* Creates and returns a DirectChannel for managing outbound requests.
*
* @return the DirectChannel instance for managing outbound requests
*/
@Bean
public DirectChannel managerRequestsMessageChannel() {
return MessageChannels.direct().getObject();
}
/**
* Creates an IntegrationFlow for sending messages to an outbound channel using AMQP.
*
* @param requestsMessageHandler the AWS SQS MessageHandler used for sending messages
* @param managerRequestsMessageChannel the direct channel where the messages will be sent to
* @return the created IntegrationFlow
*/
@Bean
public IntegrationFlow outboundFlow(MessageHandler requestsMessageHandler, DirectChannel managerRequestsMessageChannel) {
return IntegrationFlow.from(managerRequestsMessageChannel)
.handle(requestsMessageHandler)
.get();
}
/**
* Creates a MessageHandler for sending messages to AWS SQS using the provided SqsAsyncClient.
*
* @param sqsAsyncClient the AWS SQS client used for sending messages
* @return the created MessageHandler
*/
@Bean
public MessageHandler requestsMessageHandler(SqsAsyncClient sqsAsyncClient) {
SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsyncClient);
sqsMessageHandler.setQueue("requests.fifo");
return sqsMessageHandler;
}
/**
* Configure inbound flow (requests coming from workers)
* <p>
* Creates and returns a DirectChannel for managing inbound requests.
*
* @return the DirectChannel instance for managing inbound requests
*/
@Bean
public DirectChannel managerRepliesMessageChannel() {
return MessageChannels.direct().getObject();
}
/**
* Creates an IntegrationFlow for receiving inbound messages from an AMQP (Advanced Message Queuing Protocol) channel.
*
* @param sqsMessageDrivenChannelAdapter the SqsMessageDrivenChannelAdapter to establish the connection with the AWS SQS queue
* @param managerRepliesMessageChannel a DirectChannel to which the inbound messages should be sent
* @return the created IntegrationFlow
*/
@Bean
public IntegrationFlow inboundFlow(SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter, DirectChannel managerRepliesMessageChannel) {
return IntegrationFlow
.from(sqsMessageDrivenChannelAdapter)
.channel(managerRepliesMessageChannel)
.get();
}
/**
* Creates an instance of SqsMessageDrivenChannelAdapter for establishing a connection with an AWS SQS queue.
*
* @param sqsAsyncClient the SqsAsyncClient used for the connection with the AWS SQS queue
* @return the created SqsMessageDrivenChannelAdapter instance
*/
@Bean
public SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter(SqsAsyncClient sqsAsyncClient) {
return new SqsMessageDrivenChannelAdapter(sqsAsyncClient, "replies.fifo");
}
}
Manager Job Configuration
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.integration.channel.DirectChannel;
import java.util.Objects;
@RequiredArgsConstructor
@Profile("manager")
@Configuration
public class ManagerJobConfiguration {
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private final Environment env;
/**
* Creates a step for the manager.
*
* @param partitioner the partitioner used for partitioning the step
* @param managerRequestsMessageChannel the channel for sending requests to the manager
* @param managerRepliesMessageChannel the channel for receiving replies from the manager
* @return the step created for the manager
*/
@Bean
public Step managerStep(MultiResourcePartitioner partitioner, DirectChannel managerRequestsMessageChannel, DirectChannel managerRepliesMessageChannel) {
return managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", partitioner)
.gridSize(5) // == Worker Nodes Count ==
.outputChannel(managerRequestsMessageChannel)
.inputChannel(managerRepliesMessageChannel)
.build();
}
}
Job launcher
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Profile("manager")
@Configuration
public class ManagerJobLauncher {
/**
* Creates a remote partitioning job.
*
* @param jobRepository the job repository used for managing the job
* @param managerStep the step used for managing the remote partitioning job
* @return the remote partitioning job created
*/
@Bean
public Job remotePartitioningJob(JobRepository jobRepository, Step managerStep) {
return new JobBuilder("remotePartitioningJob", jobRepository)
.start(managerStep)
.incrementer(new RunIdIncrementer())
.build();
}
}
Partitioner Configuration
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.Resource;
@Profile("manager")
@Configuration
public class ManagerResourcePartitioner {
/**
* Creates a {@link MultiResourcePartitioner} object with the provided resources.
*
* @param resources an array of resources to be partitioned
* @return a {@link MultiResourcePartitioner} object with the provided resources
*/
@Bean
@StepScope
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}/*") Resource[] resources) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName("file");
partitioner.setResources(resources);
return partitioner;
}
}
Application Logs As follows.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.1.2)
2023-08-09T15:55:15.571+05:30 INFO 23252 --- [ restartedMain] c.b.BatchRemotePartitioningApplication : Starting BatchRemotePartitioningApplication using Java 17.0.7 with PID 23252
2023-08-09T15:55:15.575+05:30 INFO 23252 --- [ restartedMain] c.b.BatchRemotePartitioningApplication : The following 1 profile is active: "master"
2023-08-09T15:55:15.659+05:30 INFO 23252 --- [ restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-08-09T15:55:15.687+05:30 INFO 23252 --- [ restartedMain] .s.b.d.c.l.DockerComposeLifecycleManager : Using Docker Compose file 'D:\Personal\spring-batch\batch-remote-partitioning\compose.yaml'
2023-08-09T15:55:18.193+05:30 INFO 23252 --- [ restartedMain] o.s.b.c.c.annotation.BatchRegistrar : Finished Spring Batch infrastructure beans configuration in 3 ms.
2023-08-09T15:55:18.497+05:30 INFO 23252 --- [ restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-08-09T15:55:18.504+05:30 INFO 23252 --- [ restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-08-09T15:55:19.372+05:30 INFO 23252 --- [ restartedMain] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2023-08-09T15:55:19.523+05:30 INFO 23252 --- [ restartedMain] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@33fa9ed2
2023-08-09T15:55:19.525+05:30 INFO 23252 --- [ restartedMain] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2023-08-09T15:55:19.564+05:30 INFO 23252 --- [ restartedMain] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: POSTGRES
2023-08-09T15:55:19.609+05:30 INFO 23252 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2023-08-09T15:55:20.070+05:30 INFO 23252 --- [ restartedMain] o.s.b.d.a.OptionalLiveReloadServer : LiveReload server is running on port 35729
2023-08-09T15:55:20.256+05:30 INFO 23252 --- [ restartedMain] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-09T15:55:20.257+05:30 INFO 23252 --- [ restartedMain] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-08-09T15:55:20.258+05:30 INFO 23252 --- [ restartedMain] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-08-09T15:55:20.258+05:30 DEBUG 23252 --- [ restartedMain] a.c.s.l.DefaultListenerContainerRegistry : Starting DefaultListenerContainerRegistry
2023-08-09T15:55:20.261+05:30 DEBUG 23252 --- [ restartedMain] a.c.s.l.DefaultListenerContainerRegistry : DefaultListenerContainerRegistry started
2023-08-09T15:55:20.289+05:30 INFO 23252 --- [ restartedMain] c.b.BatchRemotePartitioningApplication : Started BatchRemotePartitioningApplication in 5.334 seconds (process running for 6.496)
2023-08-09T15:55:20.298+05:30 DEBUG 23252 --- [ionShutdownHook] a.c.s.l.DefaultListenerContainerRegistry : Stopping DefaultListenerContainerRegistry
2023-08-09T15:55:20.299+05:30 INFO 23252 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-09T15:55:20.299+05:30 INFO 23252 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-08-09T15:55:20.300+05:30 INFO 23252 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2023-08-09T15:55:20.338+05:30 INFO 23252 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-08-09T15:55:20.342+05:30 INFO 23252 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
pom.xml File
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.batch</groupId>
<artifactId>batch-remote-partitioning</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>batch-remote-partitioning</name>
<description>batch-remote-partitioning</description>
<properties>
<java.version>17</java.version>
<snakeyaml.version>2.0</snakeyaml.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-sqs</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-aws</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>batch-remote-partitioning</finalName>
<plugins>
<plugin>
<groupId>org.graalvm.buildtools</groupId>
<artifactId>native-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
When master Application starts it's gonna shutdown immediately. Without processing the data. But when I configure this same configuration with RabbitMQ this is working. Seeking advice from the experts.