4
for (int i=0; i<100000; i++) {
    // REST API request. 
restTemplate.exchange(url, HttpMethod.GET, request, String.class);
}

I have a situation where I have to request a resource for 100k users and it takes 70 minutes to finish. I tried to clean up my code as much as possible and I was able to reduce it only by 4 minutes).

Since each request is independent of each other, I would love to send requests in parallel (may be in 10s, 100s, or even 1000s of chunks which every finishes quickly). I'm hoping that I can reduce the time to 10 minutes or something close. How do I calculate which chunk size would get the job done quickly?

I have found the following way but I can't tell if the program processes all the 20 at a time; or 5 at a time; or 10 at a time.

IntStream.range(0,20).parallel().forEach(i->{
     ... do something here
});

I appericiate your help. I am open to any suggestions or critics!!

UPDATE: I was able to use IntStream and the task finished in 28 minutes. But I am not sure this is the best I could go for.

WowBow
  • 7,137
  • 17
  • 65
  • 103
  • *I open to any suggestions or critics!!* - I guess you are getting silent critics – Scary Wombat Dec 09 '16 at 01:17
  • The `IntStream` method will process a number of requests equal to the number of cores in your computer at a time. If you want to achieve more speed, use **Async** Http clients. Therefore, no blocking happens till you get the response. – Imesha Sudasingha Dec 09 '16 at 01:17
  • @ScaryWombat Lol. I second that! – WowBow Dec 09 '16 at 01:20
  • @ImeshaSudasingha Any working examples you can provide ? Thanks. – WowBow Dec 09 '16 at 01:21
  • @ScaryWombat I guess the silent critcs got "scared" after they saw the "Wombat" commenting. lol – WowBow Dec 09 '16 at 01:27
  • @WowBow did you use a blocking client for requests? or an async client? – Imesha Sudasingha Dec 09 '16 at 01:28
  • I used the IntStream I provided above and I was able to get the job running in 28 minutes (good). So I guess the IntStream decides how many chunks to run. Right ? Not sure about that. – WowBow Dec 09 '16 at 01:30
  • Nope. What I'm asking is, just put the code which is used to call the REST api in the question. I just want to know whether you are using blocking requests in which case it is your bottleneck – Imesha Sudasingha Dec 09 '16 at 01:32
  • parallel stream uses the default ForkJoinPool. You can change it by using `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "YOUR_NUMBER")` – choasia Dec 09 '16 at 01:36
  • You could put your requests in a queue then use "consumer" threads to concurrently read from the queue, submit the requests, and process the results. Once you have it working you could change the number of consumer threads and measure the time it takes with differing numbers of threads. – D.B. Dec 09 '16 at 03:44

4 Answers4

12

I used the following code in Java 8 and it did the work. I was able to reduce the batch job to run from 28 minutes to 3:39 minutes.

IntStream.range(0, 100000).parallel().forEach(i->{
     restTemplate.exchange(url, HttpMethod.GET, request, String.class);
}
});
WowBow
  • 7,137
  • 17
  • 65
  • 103
3

The standard call to parallel() will create a thread for each core your machine has available minus one core, using a Common Fork Join Pool.

If you want to specify the parallelism on your own, you will have different possibilities:

  1. Change the parallelism of the common pool: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
  2. Use an own pool:

Example:

int allRequestsCount = 20;
int parallelism = 4; // Vary on your own

ForkJoinPool forkJoinPool = new ForkJoinPool(parallelism);
IntStream.range(0, parallelism).forEach(i -> forkJoinPool.submit(() -> {
  int chunkSize = allRequestsCount / parallelism;
  IntStream.range(i * chunkSize, i * chunkSize + chunkSize)
           .forEach(num -> {

             // Simulate long running operation
             try {
               Thread.sleep(1000);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }

             System.out.println(Thread.currentThread().getName() + ": " + num);
           });
}));

This implementation is just examplary to give you an idea.

JDC
  • 4,247
  • 5
  • 31
  • 74
0

For your situation you can work with fork/join framework or make executor service pool of threads.

      ExecutorService service = null;
    try {

        service = Executors.newFixedThreadPool(8);
        service.submit(() -> {

            //do your task
        });
    } catch (Exception e) {
    } finally {
        if (service != null) {
            service.shutdown();
        }

    }
    service.awaitTermination(1, TimeUnit.MINUTES);
    if(service.isTerminated())
        System.out.println("All threads have been finished");
    else 
        System.out.println("At least one thread running");

And using fork/join framework

    class RequestHandler extends RecursiveAction {

    int start;
    int end;

    public RequestHandler(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= 10) {

            //REST Request
        } else {

            int middle = start + (end - start) / 2;
            invokeAll(new RequestHandler(start, middle), new RequestHandler(middle, end));
        }

    }

}

Public class MainClass{
   public void main(String[] args){

       ForkJoinTask<?> task = new RequestHandler(0, 100000);
       ForkJoinPool pool = new ForkJoinPool();
       pool.invoke(task);
   }
}
0xh3xa
  • 4,801
  • 2
  • 14
  • 28
  • His bottleneck is not in parallel processing. It is in blocking requests. Therefore, this won't do any good. – Imesha Sudasingha Dec 09 '16 at 01:27
  • I thought he is asking about how to split the requests in parallel maner – 0xh3xa Dec 09 '16 at 01:29
  • @ImeshaSudasingha What do you mean by blocking requests ? I asked to split requests in parallel. – WowBow Dec 09 '16 at 01:50
  • What I'm asking is, put the code used to send the requests. There are 2 types of requests. synchronous and asynchronous. I want to know which one you were using because, as far as I see, the bottleneck is not with parallelism. – Imesha Sudasingha Dec 09 '16 at 01:55
  • @ImeshaSudasingha if the bottleneck is using synchronous rather than asynchronous requests then it absolutely has to do with parallelism. See [this answer](http://stackoverflow.com/a/748189/3284624) – D.B. Dec 09 '16 at 03:15
  • Yes, I know that parallelism matters. But my point is, since the requests are independent, he can use asynchronous requests to improve performance. Going for parallelism with synchronous requests is a premature optimization as I feel. – Imesha Sudasingha Dec 09 '16 at 04:59
0

I've written a short article about that. It contains simple tool that allows you to control pool size:

https://gt-dev.blogspot.com/2016/07/java-8-threads-parallel-stream-how-to.html

g-t
  • 1,455
  • 12
  • 17