I am trying to develop a spring boot app. I have written all core implementations in core java without spring framework. I am using that jar in this spring boot app. I would like to manage the concurrency of my rest controller. So, configured ThreadPoolTaskExecutor accordingly in the main class. Ideally, I want only 2 concurrent requests to get into the execute()
method, which I annotated Async
. I was testing for 2 concurrent requests at a time but I see in the log that my requests are entering execute()
all at once. All the tasks are memory intensive. So those are failing with heap memory issues. I am trying to figure out the ideal concurrency number. I would like to know if my configuration is correct or am I missing something? Thank you.
Here's my main class:
@SpringBootApplication
@EnableAsync
public class RestapiApplication implements AsyncConfigurer {
public static void main(String[] args) {
ApplicationContext ctx = SpringApplication.run(RestapiApplication.class, args);
System.out.println("Rightdata Middleware ready to accept requests:");
}
@Bean(name = "executor1")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(2);
taskExecutor.setCorePoolSize(2);
taskExecutor.setThreadNamePrefix("LULExecutor-");
taskExecutor.setQueueCapacity(100);
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
And here's my REST controller:
@RestController
@RequestMapping("/end2end")
public class End2EndRestController {
/**
* The log.
*/
private final Logger log = LoggerFactory.getLogger(this.getClass());
@RequestMapping(method = RequestMethod.POST)
public JSONObjectPOJO process(@RequestBody String end2EndScenarioString) throws InterruptedException, ExecutionException {
final JSONObjectPOJO jsonObjectPOJO = convertToJavaObject(end2EndScenarioString);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
try {
execute(jsonObjectPOJO);
} catch (Exception e) {
e.getMessage();
}
}});
executor.shutdown();
return jsonObjectPOJO;
}
@Async("executor1")
private void execute(JSONObjectPOJO jsonObjectPOJO) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> futureTarget;
Future<?> futureSource;
futureSource = processSource(executorService);
futureTarget = processTarget(executorService);
manageSourceProcessingResults(futureSource);
manageTargetProcessingResults(futureTarget);
executorService.shutdown();
//Do rest of the tasks.
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected Future<?> processSource(executorService){
//Get appropriate class instance with call() - coreActionClass.
Future<?> futureSource = executorService.submit(coreActionClass);
return futureSource;
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected Future<?> processTarget(executorService){
//Get appropriate class instance with call() - coreActionClass.
Future<?> futureTarget = executorService.submit(coreActionClass); //callable method in core.
return futureTarget;
}
private void manageSourceProcessingResults(Future<?> futureSource) {
try{
futureSource.get();
} catch(Exception e){
e.printStackTrace();
}
}
private void manageTargetProcessingResults(Future<?> futureTarget) {
try{
futureTarget.get();
} catch(Exception e){
e.printStackTrace();
}
}
}
UPDATE- 1:
I have now changed the code to following:
@RestController
@RequestMapping("/end2end")
public class End2EndRestController {
/**
* The log.
*/
private final Logger log = LoggerFactory.getLogger(this.getClass());
@RequestMapping(method = RequestMethod.POST)
public JSONObjectPOJO process(@RequestBody String end2EndScenarioString) throws InterruptedException, ExecutionException {
final JSONObjectPOJO jsonObjectPOJO = convertToJavaObject(end2EndScenarioString);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
try {
execute(jsonObjectPOJO);
} catch (Exception e) {
e.getMessage();
}
}});
executor.shutdown();
return jsonObjectPOJO;
}
}
And AsyncService class:
public class AsyncService {
@Async("executor1")
public void execute(JSONObjectPOJO jsonObjectPOJO) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> futureTarget;
Future<?> futureSource;
futureSource = processSource(executorService);
futureTarget = processTarget(executorService);
manageSourceProcessingResults(futureSource);
manageTargetProcessingResults(futureTarget);
executorService.shutdown();
//Do rest of the tasks.
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected Future<?> processSource(executorService){
//Get appropriate class instance with call() - coreActionClass.
Future<?> futureSource = executorService.submit(coreActionClass);
return futureSource;
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected Future<?> processTarget(executorService){
//Get appropriate class instance with call() - coreActionClass.
Future<?> futureTarget = executorService.submit(coreActionClass); //callable method in core.
return futureTarget;
}
private void manageSourceProcessingResults(Future<?> futureSource) {
try{
futureSource.get();
} catch(Exception e){
e.printStackTrace();
}
}
private void manageTargetProcessingResults(Future<?> futureTarget) {
try{
futureTarget.get();
} catch(Exception e){
e.printStackTrace();
}
}
}
- My understanding is that when I configure
maxpoolsize(2)
no more than 2 requests would be in the execute() method at one time. For a new request to enter, one of the earlier requests has to complete its execution. Is my understanding correct? Would theasync
apply to the inner executor service? - I am of the view that at one time only 2 requests are handled and each of those requests can spawn 2 different threads and complete its task. Please clarify.