0

I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListner does not work out in the listener and the work starts again. If you reduce the working time, then everything works well. How can this be fixed? Important: Jobs is completed correctly and without exceptions!

Configuration

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.queue-name}")
    private String queueName;

    @Value("${spring.rabbitmq.exchange-name}")
    private String exchangeName;

    @Value("${spring.rabbitmq.routing-key}")
    private String routingKey;


    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }


    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter(jsonMapper());
    }

    @Bean
    public JsonMapper jsonMapper() {
        JsonMapper jsonMapper = new JsonMapper();
        jsonMapper.registerModule(new JavaTimeModule());
        jsonMapper.setDateFormat(new StdDateFormat())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return jsonMapper;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        return connectionFactory;
    }

    @Bean
    public AmqpTemplate rabbitTemp() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }


}

@RabbitListner, for example if from = to.minusDays(6) working well

 @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
    public void processQueue(FieldDto newField) {
        PolygonDto polygonDto = mapper.map(newField);
        PolygonDto save = polygonService.save(polygonDto);

        LocalDateTime to = DateUtil.now();
        LocalDateTime from = to.minusYears(6);

        evalScriptTypeList.forEach(type -> {
            try {
                startJob(save, to, from, type);
            } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
                logger.error(e.getMessage(), e.getCause());
            }
        });

    }

    private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {

      JobParameters jobParameters = new JobParametersBuilder()
                .addString("t", to.toString())
                .addString(JOB_PARAMETER_DATE_TO, to.toString())
                .addString(JOB_PARAMETER_DATE_FROM, from.toString())
                .addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
                .addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
                .addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
                .toJobParameters();

        jobLauncher.run(uploadSatelliteImageJob, jobParameters);
    }

I do not know mb will work manually acknowledgement, but mb is there a way to fix the automatic acknowledgement?

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • See if `@RabbitListener(ackMode = "NONE")` helps you. IT might be great if you share with us some error you got in your case. IT also might be possible that RabbitMQ has a limitation in timing for message processing before its ack is expired... – Artem Bilan Mar 31 '23 at 14:21
  • See more info here: https://stackoverflow.com/questions/68952297/rabbitmq-delivery-acknowledgement-timeout. Looks it is indeed there is something like `consumer-timeout` on the broker. – Artem Bilan Mar 31 '23 at 14:24
  • 1
    Yes, you're right. Rabbitmq has a timeout for confirmation by default it is 30 minutes. I found two solutions either to increase the timeout or to confirm the messages at the beginning of processing and in case of a problem solve this problem myself – Денис Дергачев Apr 01 '23 at 16:36
  • Good. You can place that as answer to your own question. – Artem Bilan Apr 02 '23 at 17:51

1 Answers1

0

I used manual commit

    @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
    public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {

            channel.basicAck(tag, false);

            //your code

        } catch (Exception e) {
            channel.basicReject(tag, true);
            logger.error(e.getMessage(), e.getCause());
        }
    }
HereAndBeyond
  • 794
  • 1
  • 8
  • 17