0

Regarding CompletableFuture functionality, if anyone has idea about it is not creating multiple threads dynamically. For that i have tried with executorService also in below code but executorService has a fixed thread pool so it goes to blocking state. Can you please help to implement CompletableFuture in multithreading dynamically as per below code?

private static CompletableFuture<Integer> createCompletableFuture(ByteArrayOutputStream baOS, int totalBytes, 
            List<FileUploadMultiLocator> fileUploadList) {
        CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                () -> {
                    try {
                            // simulate long running task
                            for (FileUploadMultiLocator fileUploadMultiLocator : fileUploadList) {
                                System.out.println(Thread.currentThread().getName() + " secondary task is called");
                                fileUploadMultiLocator.baOS.write(baOS.toByteArray(), 0, totalBytes);
                                fileUploadMultiLocator.setTotalBytes(totalBytes);
                                new Thread(fileUploadMultiLocator).start();
                                try {
                                    Thread.sleep(5);
                                } catch (InterruptedException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                                }
                            }
                        }
                    catch (Exception e) { }
                    return 20;
                });
        return futureCount;
    }

1 Answers1

1

I think you should make things simpler. Instead of creating N threads in a single asynchronous task (that's what a completable future is), I think you should loop over your list in the main thread, and for each element, create a completableFuture, giving it your "FileUploadMultiLocator" runnable. Each completable future is an asynchronous task in the Java common fork-join pool.

However, if you want control over the number of threads used for your tasks, you can use CompletableFuture.runAsync(Runnable r, Executor e). This way, you can prepare a thread pool with wanted number of Threads (using static methods in Executors class, and use it for your work.

Note that you have other tools at your disposal in java. You could also choose one of the 2 solutions below:

Java 8 Streams

if you can use Java 8, you should rather use Stream power. You can read more about it here.

In summary, the streams allow you parallel browsing/processing of a collection of objects. For your example, I would use the following approach:

        // Specify the wanted number of parallel tasks, and create a thread pool accordingly.
    final int threadNb = 4;
    final ForkJoinPool pool = new ForkJoinPool(threadNb);
    // We give the entire procedure to the thread pool, which will be in charge of managing executions
    pool.submit(
        // Ask to execute each runnable in the list, in a parallel way.
        () -> fileUploadList.parallelStream().forEach(r -> r.run())
    );

    // Below block is in charge of waiting complete execution.
    pool.shutdown();
    try {
        pool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException ex) {
        // Do whatever cancelling logic you want, or let it propagate
    }

Executor service

However, you are not forced to use streams, and you can work directly with an executor service. Why you should use this approach instead of creating threads yourself is described in the following answer :

        final List<Runnable> fileUploadList = null;
    // Specify the wanted number of parallel tasks.
    final int threadNb = 4;
    ExecutorService threadPool = Executors.newFixedThreadPool(threadNb);
    for (final Runnable r : fileUploadList) {
        // ... Do your pre-computing here ...
        // Now, submit asynchronous part of your task for execution
        threadPool.submit(r);
    }

    // Below block is in charge of waiting complete execution.
    threadPool.shutdown();
    try {
        threadPool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException ex) {
        // Do whatever cancelling logic you want, or let it propagate
    }

Also, note that here, I create a on-shot executor service. But, the common thing to do is to create it somewhere in your app, and keep it alive, so you can re-use its threads multiple times. In this case, the code I give you is wrong for the 'waiting completion' part, and you should monitor each task future instead.

One last thing: my examples are really simplified use-cases. I manage neither error nor timeout by task, so it needs refining before use. However, I hope it helps !

amanin
  • 3,436
  • 13
  • 17
  • Thanks for your valuable help. But my purpose is to write to primary location as it is without threading. Parallelly i want to send my ByteArrayOutputStream and total bytes to secondary locations like as List fileUploadList For more details, you can see method parameters. So i think you get an idea about that i want to create seperate threads for secondary locations writing but it should not impact my primary location writing in main thread For that i think completableFuture approach is only an option. What do you suggest? – Harisingh Rajput Apr 17 '18 at 10:09
  • I am not sure. I will try to explain what I've understood, and you correct if I'm wrong: – amanin Apr 17 '18 at 11:39
  • I am not sure. I will try to explain what I've understood, and you correct if I'm wrong: You want all your processing in an asynchronus completable future, but it must: 1. Write synchronously the content of input byte array (baOS), 2. trigger one asynchronous job for each 'FileUploadMultiLocator' (which implements Runnable, right ?) in your input list. And, finally, you want the return to be the count os succeeded runnables ? Is it that ? – amanin Apr 17 '18 at 11:45
  • First of all i m trying to explain exact scenario what i m trying. – Harisingh Rajput Apr 17 '18 at 11:57
  • First of all, let me explain exact scenario what i m trying. I read a single file input stream. With reading, i am storing it in byte array (baOS) and passing that byte array(baOS) to my createCompletableFuture method with list of secondary locations to write it in multiple secondary locations. Back to reading part, parallelly i am writing it to primary one location which should execute in main thread. And that part createCompletableFuture method calling should be executed in background asynchronously. – Harisingh Rajput Apr 17 '18 at 12:06
  • This is code sample of FileUploadMultiLocator runnable class:- `@Override public void run() { // TODO Auto-generated method stub try { bFout.write(baOS.toByteArray(),0,totalBytes); bFout.flush(); baOS.reset(); totalBytes = 0; // totalBytes=0; // baOS.reset(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }` I think now it can be clear to you. – Harisingh Rajput Apr 17 '18 at 12:07