1

Here is the environment first:

  • A service publishing high amount of messages to a queue. This is a Spring boot application running in a container and I have access to it (means it is possible to modify this)
  • Another service consuming this queue, processing messages to create an xml report, and sending this report to a rest API endpoint. This is also a Spring boot application running in a container and I also have access to it.
  • The endpoint is sometimes responding really slow. I don't have knowledge about the internal system of the endpoint and I don't have access to it.

My consumer was something like that:

@Component
public class MyConsumer {

   private MyReportCreator reportCreator;
   private MyReportSender reportSender;

   @Autowired
   public MyConsumer(MyReportCreator reportCreator, MyReportSender reportSender) {
      this.reportCreator = reportCreator;
      this.reportSender = reportSender;
   }

   public void consume(MyMessage message) {
      try {
         String report = reportCreator.create(message);
         reportSender.send(report);
      } catch(Exception e) {
         logger.error("Error occured while reporting", e);
      }
   } 
}

The problem is; consumer consumes the queue one by one but publisher is pushing thousands of messages every second so the queue stacks up very fast. To solve this I added @EnableAsync annotation to the application and @Aysnc annotation to the MyReportSender.send method and configured a TaskExecuter like that:

@Component
public class MyReportSender {

   private RestTemplate restTemplate;

   @Autowired
   public MyReportSender(RestTemplate restTemplate) {
      this.restTemplate = restTemplate;
   }

   @Async
   public void send(String report) {
      \\ sends the report using restTemplate.exchange, ignores result
   }

}
@SpringBootApplication
@EnableAsync
public class MyApp {

    public static void main(String[] args) {
        SpringApplication.run(MyApp.class, args);
    }

    @Bean   
    public Executor taskExecutor() {    
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
        executor.setCorePoolSize(100);  
        executor.setMaxPoolSize(100);   
        executor.setQueueCapacity(200); 
        executor.initialize();  
        return executor;    
    }
}

The second I released this change, the queue is melted immediately but after some time, I got this exception:

Platform exception message: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed

Platform exception trace: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$584/29229306@7dd254ed
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:344)
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:290)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$6f1f0df4.send(<generated>)
    at com.personal.MyReportSender.sendReport(MyReportSender.java:95)
    at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246)
    at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:494)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.personal.MyReportSender$$EnhancerBySpringCGLIB$$604ed475.send(<generated>)
    at com.personal.MyConsumer.consume(MyConsumer.java:50)
    at com.personal.CommonMessageProcessor.process(CommonMessageProcessor.java:44)
    at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:52)
    at com.personal.CommonMessageReceiver.receiveMessage(CommonMessageReceiver.java:43)
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363)
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3cc54c9a rejected from java.util.concurrent.ThreadPoolExecutor@1a9718ae[Running, pool size = 100, active threads = 100, queued tasks = 200, completed tasks = 3705]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:341)
    ... 37 more

So the task executor is rejecting the message when it has 100 messages being processed and 200 messages in its internal queue and gets a new message. Not good.

What I want is basically to consume this queue like 100 message at a time, it doesn't have to be this @Async solution. Any ideas, comments, help is appreciated...

ibrahimb
  • 170
  • 1
  • 10
  • Possible duplicate of [What could be the cause of RejectedExecutionException](https://stackoverflow.com/questions/8183205/what-could-be-the-cause-of-rejectedexecutionexception) – Simon Martinelli Jun 18 '19 at 13:25
  • 1
    To my understanding this is use case for reactive programming, where spring-webflux should help. This will help to create 'back-pressure' on producer to only produce data which consumer can consume. – Guru Jun 18 '19 at 14:50
  • The reason why this fails makes sense, because now you're happily fetching new messages from the RabbitMQ queue, only to build up the internal `ThreadPoolTaskExecutor` queue until 200 items are reached. I'm not really familiar with RabbitMQ, but shouldn't you just increase the amount of concurrent listeners through `spring.rabbitmq.listener.simple.concurrency`? – g00glen00b Jun 18 '19 at 16:13

0 Answers0