0

I have integrated AWS SQS to run a batch job using spring batch 5 remote partitioning. I have used LocalStack for SQS service. My Worker configuration is working fine and Manager Configuration application is exiting without errors. I have referred below resources.

My Manager configuration as below.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.aws.inbound.SqsMessageDrivenChannelAdapter;
import org.springframework.integration.aws.outbound.SqsMessageHandler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageHandler;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Profile("manager")
@Configuration
public class ManagerIntegrationFlowConfiguration {


    /**
     * Configure outbound flow (requests going to workers)
     * <p>
     * Creates and returns a DirectChannel for managing outbound requests.
     *
     * @return the DirectChannel instance for managing outbound requests
     */
    @Bean
    public DirectChannel managerRequestsMessageChannel() {
        return MessageChannels.direct().getObject();
    }

    /**
     * Creates an IntegrationFlow for sending messages to an outbound channel using AMQP.
     *
     * @param requestsMessageHandler        the AWS SQS MessageHandler used for sending messages
     * @param managerRequestsMessageChannel the direct channel where the messages will be sent to
     * @return the created IntegrationFlow
     */
    @Bean
    public IntegrationFlow outboundFlow(MessageHandler requestsMessageHandler, DirectChannel managerRequestsMessageChannel) {
        return IntegrationFlow.from(managerRequestsMessageChannel)
                .handle(requestsMessageHandler)
                .get();
    }

    /**
     * Creates a MessageHandler for sending messages to AWS SQS using the provided SqsAsyncClient.
     *
     * @param sqsAsyncClient the AWS SQS client used for sending messages
     * @return the created MessageHandler
     */
    @Bean
    public MessageHandler requestsMessageHandler(SqsAsyncClient sqsAsyncClient) {
        SqsMessageHandler sqsMessageHandler = new SqsMessageHandler(sqsAsyncClient);
        sqsMessageHandler.setQueue("requests.fifo");

        return sqsMessageHandler;
    }

    /**
     * Configure inbound flow (requests coming from workers)
     * <p>
     * Creates and returns a DirectChannel for managing inbound requests.
     *
     * @return the DirectChannel instance for managing inbound requests
     */
    @Bean
    public DirectChannel managerRepliesMessageChannel() {
        return MessageChannels.direct().getObject();
    }

    /**
     * Creates an IntegrationFlow for receiving inbound messages from an AMQP (Advanced Message Queuing Protocol) channel.
     *
     * @param sqsMessageDrivenChannelAdapter the SqsMessageDrivenChannelAdapter to establish the connection with the AWS SQS queue
     * @param managerRepliesMessageChannel   a DirectChannel to which the inbound messages should be sent
     * @return the created IntegrationFlow
     */
    @Bean
    public IntegrationFlow inboundFlow(SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter, DirectChannel managerRepliesMessageChannel) {
        return IntegrationFlow
                .from(sqsMessageDrivenChannelAdapter)
                .channel(managerRepliesMessageChannel)
                .get();
    }

    /**
     * Creates an instance of SqsMessageDrivenChannelAdapter for establishing a connection with an AWS SQS queue.
     *
     * @param sqsAsyncClient the SqsAsyncClient used for the connection with the AWS SQS queue
     * @return the created SqsMessageDrivenChannelAdapter instance
     */
    @Bean
    public SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter(SqsAsyncClient sqsAsyncClient) {
        return new SqsMessageDrivenChannelAdapter(sqsAsyncClient, "replies.fifo");
    }
}

Manager Job Configuration

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.integration.partition.RemotePartitioningManagerStepBuilderFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.integration.channel.DirectChannel;

import java.util.Objects;

@RequiredArgsConstructor
@Profile("manager")
@Configuration
public class ManagerJobConfiguration {

    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private final Environment env;


    /**
     * Creates a step for the manager.
     *
     * @param partitioner                   the partitioner used for partitioning the step
     * @param managerRequestsMessageChannel the channel for sending requests to the manager
     * @param managerRepliesMessageChannel  the channel for receiving replies from the manager
     * @return the step created for the manager
     */
    @Bean
    public Step managerStep(MultiResourcePartitioner partitioner, DirectChannel managerRequestsMessageChannel, DirectChannel managerRepliesMessageChannel) {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", partitioner)
                .gridSize(5) // == Worker Nodes Count ==
                .outputChannel(managerRequestsMessageChannel)
                .inputChannel(managerRepliesMessageChannel)
                .build();
    }
}

Job launcher

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile("manager")
@Configuration
public class ManagerJobLauncher {

    /**
     * Creates a remote partitioning job.
     *
     * @param jobRepository the job repository used for managing the job
     * @param managerStep   the step used for managing the remote partitioning job
     * @return the remote partitioning job created
     */
    @Bean
    public Job remotePartitioningJob(JobRepository jobRepository, Step managerStep) {
        return new JobBuilder("remotePartitioningJob", jobRepository)
                .start(managerStep)
                .incrementer(new RunIdIncrementer())
                .build();
    }
}

Partitioner Configuration

import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.Resource;

@Profile("manager")
@Configuration
public class ManagerResourcePartitioner {

    /**
     * Creates a {@link MultiResourcePartitioner} object with the provided resources.
     *
     * @param resources an array of resources to be partitioned
     * @return a {@link MultiResourcePartitioner} object with the provided resources
     */
    @Bean
    @StepScope
    public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}/*") Resource[] resources) {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();

        partitioner.setKeyName("file");
        partitioner.setResources(resources);

        return partitioner;
    }

}

Application Logs As follows.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.1.2)

2023-08-09T15:55:15.571+05:30  INFO 23252 --- [  restartedMain] c.b.BatchRemotePartitioningApplication   : Starting BatchRemotePartitioningApplication using Java 17.0.7 with PID 23252
2023-08-09T15:55:15.575+05:30  INFO 23252 --- [  restartedMain] c.b.BatchRemotePartitioningApplication   : The following 1 profile is active: "master"
2023-08-09T15:55:15.659+05:30  INFO 23252 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2023-08-09T15:55:15.687+05:30  INFO 23252 --- [  restartedMain] .s.b.d.c.l.DockerComposeLifecycleManager : Using Docker Compose file 'D:\Personal\spring-batch\batch-remote-partitioning\compose.yaml'
2023-08-09T15:55:18.193+05:30  INFO 23252 --- [  restartedMain] o.s.b.c.c.annotation.BatchRegistrar      : Finished Spring Batch infrastructure beans configuration in 3 ms.
2023-08-09T15:55:18.497+05:30  INFO 23252 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-08-09T15:55:18.504+05:30  INFO 23252 --- [  restartedMain] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-08-09T15:55:19.372+05:30  INFO 23252 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2023-08-09T15:55:19.523+05:30  INFO 23252 --- [  restartedMain] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@33fa9ed2
2023-08-09T15:55:19.525+05:30  INFO 23252 --- [  restartedMain] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2023-08-09T15:55:19.564+05:30  INFO 23252 --- [  restartedMain] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: POSTGRES
2023-08-09T15:55:19.609+05:30  INFO 23252 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2023-08-09T15:55:20.070+05:30  INFO 23252 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2023-08-09T15:55:20.256+05:30  INFO 23252 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-09T15:55:20.257+05:30  INFO 23252 --- [  restartedMain] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-08-09T15:55:20.258+05:30  INFO 23252 --- [  restartedMain] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-08-09T15:55:20.258+05:30 DEBUG 23252 --- [  restartedMain] a.c.s.l.DefaultListenerContainerRegistry : Starting DefaultListenerContainerRegistry
2023-08-09T15:55:20.261+05:30 DEBUG 23252 --- [  restartedMain] a.c.s.l.DefaultListenerContainerRegistry : DefaultListenerContainerRegistry started
2023-08-09T15:55:20.289+05:30  INFO 23252 --- [  restartedMain] c.b.BatchRemotePartitioningApplication   : Started BatchRemotePartitioningApplication in 5.334 seconds (process running for 6.496)
2023-08-09T15:55:20.298+05:30 DEBUG 23252 --- [ionShutdownHook] a.c.s.l.DefaultListenerContainerRegistry : Stopping DefaultListenerContainerRegistry
2023-08-09T15:55:20.299+05:30  INFO 23252 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-09T15:55:20.299+05:30  INFO 23252 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-08-09T15:55:20.300+05:30  INFO 23252 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2023-08-09T15:55:20.338+05:30  INFO 23252 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-09T15:55:20.342+05:30  INFO 23252 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

pom.xml File

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.batch</groupId>
    <artifactId>batch-remote-partitioning</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>batch-remote-partitioning</name>
    <description>batch-remote-partitioning</description>

    <properties>
        <java.version>17</java.version>
        <snakeyaml.version>2.0</snakeyaml.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-starter</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-sqs</artifactId>
            <version>3.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-aws</artifactId>
            <version>3.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-docker-compose</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>batch-remote-partitioning</finalName>
        <plugins>
            <plugin>
                <groupId>org.graalvm.buildtools</groupId>
                <artifactId>native-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

When master Application starts it's gonna shutdown immediately. Without processing the data. But when I configure this same configuration with RabbitMQ this is working. Seeking advice from the experts.

praneeth
  • 203
  • 1
  • 4
  • 14

0 Answers0