0

Let's say that we have the following entities: Project and Release, which is a one to many relationship.

Upon an event consumption from an SQS queue where a release id is sent as part of the event, there might be scenarios where we might have to create thousands of releases in our DB, where for each release we have to make a rest call to a 3rd party service in order to get some information for each release.

That means that we might have to make thousands of calls, in some cases more than 20k calls just to retrieve the information for the different releases and store it in the DB.

Obviously this is not scalable, so I'm not really sure what's the way to go in this scenario.

I know I might use a CompletableFuture, but I'm not sure how to use that with spring.

The http client that I am using is WebClient.

Any ideas?

davidalayachew
  • 1,279
  • 1
  • 11
  • 22
Carlos Gonzalez
  • 607
  • 1
  • 5
  • 13
  • Could you give more details about the actual work that you are doing? The actual tasks you are performing with these projects and releases will decide the solution that is best. – davidalayachew Apr 16 '22 at 00:20
  • We receive an event with a project id, we make an http call to retrieve the project information (name, creation date, type etc) along with its release ids, for each release id we have to make an http call to retrieve the release info (name, release date, etc etc), we store in our DB the project info and the release info, that is, we create one project and say 20k releases in the DB. Ideally persisting the DB info should be transactional. – Carlos Gonzalez Apr 16 '22 at 00:44

1 Answers1

0

You can make the save queries in a method transactional by adding the annotation @Transactional above the method signature. The method should also be public, or else this annotation is ignored.

As for using CompletableFuture in spring; You could make a http call method asynchronous by adding the @Async annotation above its signature and by letting it return a CompletableFuture as a return type. You should return a completed future holding the response value from the http call. You can easily make a completed future with the method CompletableFuture.completedFuture(yourValue). Spring will only return the completed future once the asynchronous method is done executing everything int its code block. For @Async to work you must also add the @EnableAsync annotation to one of your configuration classes. On top of that the @Async annotated method must be public and cannot be called by a method from within the same class. If the method is private or is called from within the same class then the @Async annotation will be ignored and instead the method will be executed in the same thread as the calling method is executed.

Next to an @Async annotated method you could also use a parallelStream to execute all 20K http calls in parallel. For example:

List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Lastly you could also use a ThreadPoolExecutor to execute the http calls in parallel. An example:

List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.

//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());

//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());

    private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
        ReleaseInfo releaseInfo = null;
        try {
            releaseInfo = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            return releaseInfo;
        }
    }

Make sure to call shutdownNow() on ThreadPoolExecutor after you're done with it to avoid memory leaks.

Maurice
  • 6,698
  • 9
  • 47
  • 104
  • 1
    Thanks for your answer. Really good one. I have a question on the following statement though, "One thread for every element in releaseIds", is this a good practice o healthy? I mean, we would be creating twenty thousand threads? – Carlos Gonzalez Apr 16 '22 at 12:23
  • @CarlosGonzalez You don't have to, it was just an example. The real number of threads should always be in proportion to the number of CPU cores that the computer your app runs on has. Too many threads wouldn't make sense because the majority of threads would just be waiting for other threads to finish using a CPU core for processing their task. [Heres a good SO question](https://stackoverflow.com/questions/1718465/optimal-number-of-threads-per-core) about what the optimal number of threads per core should be. – Maurice Apr 16 '22 at 13:11
  • @CarlosGonzalez i changed my example so that the number of threads is always equal to the number of available CPU processors on the machine. You can determine the number of available processors with the following method call: `Runtime.getRuntime().availableProcessors()` – Maurice Apr 19 '22 at 22:26