2

I have around 400k employee ids .I can pass 1 employee id at a time.

How can I call the PI in parallel using Threads so I can improve the performance. Need some pointers.

saveInDatabase() methods saves the object in a database table.I have marked this method as Synchronized

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) {
        Long employeeId;
        for (Long i : employeeList) {

            employeeId = i;// url

            String url = "http://dummy.restapiexample.com/api/v1/employee/" + employeeId;

            ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, requestEntity,
                    String.class);

            saveInDatabase(responseEntity);

        }
    }
Ryuzaki L
  • 37,302
  • 12
  • 68
  • 98
Raj R
  • 63
  • 1
  • 3
  • 11

2 Answers2

3

Using directly the Thread API is error prone because low level.
Using parallelStream() may be interesting but it may also be an issue because the processed stream could consume all CPU core available for your application.
Which means that other HTTP client requests of your application could be served very lately.
And note also that the number of threads used in parallelStream() is a JVM implementation detail and doesn't make part of the public API.
The ExecutorService API that allows to specify a number of threads available in the pool looks a better/more robust alternative.

Spring Boot provides a built-in feature that wraps it.
You could extract the individual task to invoke into a method such as :

@Async
public Future<ResponseEntity<String>> getEmployee(long employeeId,  HttpEntity<String> requestEntity) {

       String url = "http://dummy.restapiexample.com/api/v1/employee/" + employeeId;
       ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, requestEntity,
                String.class);
       return new AsyncResult<ResponseEntity<String>>(responseEntity);
   }

Now call it :

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) {

        // Do async calls and store futures in a List
        List<Future<ResponseEntity<String>>> futures = new ArrayList<>();
        for (Long id : employeeList) {
            futures.add(getEmployee(id, requestEntity));    
        }

        // Then process list of future 
        for (Future<ResponseEntity<String>> future : futures) 
            try{ 
                saveInDatabase(future.get());
               }
               catch(Exception e){ 
                  //handle the exception
              }

         }    
    }

As a side note, doing the saveInDatabase() operation into a loop is not a right approach.
Instead of, you want to batch the database insertions because you have many of them to do. Something like :

private void callApi(List<Long> employeeList, HttpEntity<String> requestEntity) {

  List<ResponseEntity<String>> responseEntities = 
        employeeList.stream()
                    .map(id -> getEmploye(id))
                    .map(future -> {
                          try{return future.get();}
                            catch(Exception e){ 
                                //handle the exception
                                }
                             }
                     )
                    .collect(toList()); 
   saveInDatabase(responseEntities);            
}

To make the @Asynch feature working, you have to add @EnableAsync on a @Configuration class of your application.
Optionally you can define a Executor bean with the pool/queue configuration that suit to your need. Beware : if you don't define an Executor bean, Spring will create a SimpleAsyncTaskExecutor and use that (it creates a Thread by task and don't reuse them).

For example :

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("GithubLookup-");
        executor.initialize();
        return executor;
    }


}
davidxxx
  • 125,838
  • 23
  • 214
  • 215
  • Thanks for suggestion but where are you imp[lamenting Callable? – Raj R Jul 21 '19 at 13:41
  • in between your getEmployee() throwing compile error due to return type mismatch – Raj R Jul 21 '19 at 13:44
  • You don't need to. The code that you declare in the method annotated `@Async` is your Callable. The Spring framework add the glue and transmit it to the ExecutorService. – davidxxx Jul 21 '19 at 13:45
  • @Raj R I didn't write the code in an IDE but on the fly. Some typos may be present. Don't hesitate to solve them and to update my post with. I updated about the return. – davidxxx Jul 21 '19 at 13:50
  • `future.get()` is blocking. – Michael Jul 21 '19 at 14:58
  • If I am not mistaken, there is nothing running concurrently in your first `callApi` solution. It blocks on `future.get()`. – Michael Jul 21 '19 at 15:06
  • @Michael Indeed `future.get()` waits the returned. Good catch. Updated the first code consequently. – davidxxx Jul 21 '19 at 16:32
  • @davidxxx I am using your solution I just removed unnecessary parameters https://pastebin.com/CGfMcKxT `apiCallAndSave.callApi(employeeList); //employeeList is of size 400k in production` I just want to know how many Threads will be involved in parallel processing.Can I have control over it like with ExecutorService ? Do I need to enable Async in my aSpring Boot app explicitly .If yes how? – Raj R Jul 21 '19 at 17:36
  • The code given by you runs but it runs in a serial manner not parallel.:( – Raj R Jul 21 '19 at 18:14
  • @RajR You need to enable it with `@EnableAsync`. – Michael Jul 21 '19 at 18:21
  • @Michael I did bi=ut the code is still running in a serial manner – Raj R Jul 21 '19 at 18:23
  • @Raj R It should not. You should increase the spring log and check that the async features is well enabled. Also try to set the Executor bean to check whether it changes something – davidxxx Jul 22 '19 at 19:28
  • this was the reason https://stackoverflow.com/questions/57135727/async-annotation-not-making-asychromous-call – Raj R Jul 22 '19 at 19:45
  • but to process just 8000 records with 8 threads itr took 8 min – Raj R Jul 22 '19 at 19:46
  • You should use much more threads than CPU cores (maybe 2X but you should test different value ) because most of time is I/O to wait for the HTTP response. – davidxxx Jul 22 '19 at 20:12
1

You can use parallelStream() to make API calls concurrently

List<ResponseEntity<String>> result = employeeList.parallelStream()
            .map(id->restTemplate.exchange("http://dummy.restapiexample.com/api/v1/employee/"+id, HttpMethod.GET, requestEntity, String.class))
            .collect(Collectors.toList());

result.forEach(entity->saveInDatabase(entity));

But beware parallelStream() may also starve the CPU cores available for your application. If the application doesn't make only this task but is designed to serve other requests, it could be an issue.

So as suggested by @davidxxx use saveAll for batch insert

saveAll(result)
davidxxx
  • 125,838
  • 23
  • 214
  • 215
Ryuzaki L
  • 37,302
  • 12
  • 68
  • 98
  • can I use thread and do I have to mark saveInDatabase(entity as sunchrozied using parallet stream too? – Raj R Jul 21 '19 at 13:21
  • while using `parallelStream` each task is executed asynchronously(depends upon CPU), i believe no need of any synchronization for saving data into database @RajR – Ryuzaki L Jul 21 '19 at 13:26
  • will this reduce time?currently my batch is taking 3 hours to do this – Raj R Jul 21 '19 at 13:27
  • definitely it will reduce time, take a look at this @RajR How many cores do you have? – Ryuzaki L Jul 21 '19 at 13:29
  • https://dzone.com/articles/think-twice-using-java-8 but parallelStream can cause issues? – Raj R Jul 21 '19 at 13:30
  • I don't know I will take a look tomorrow.In between if suppose 10 threads at same time trying to Update 10 different records in data base,will it cause any problem – Raj R Jul 21 '19 at 13:31
  • database will handle that load, and everything is depends on how many cores DB you have and on how many cores your application is running? if your DB is of 4 core then at a time only 4 threads can run in parallel @RajR – Ryuzaki L Jul 21 '19 at 13:33
  • Ok thanks I will try this tomorrow at work.In between can you give me a pseudo code how can I achieve this using Callable? – Raj R Jul 21 '19 at 13:35
  • 1
    Parallel stream is not necessarily bad but a warning is needed. I added it. – davidxxx Jul 21 '19 at 14:00