0

I am working on a project in which I will be having different Bundles. Let's take an example, Suppose I have 5 Bundles and each of those bundles will have a method name process.

Now currently, I am calling the process method of all those 5 bundles sequentially, one by one and then I am writing to the database. But that's what I don't want.

Below are the things that I am looking for-

  1. I need to call all those 5 Bundles process method in parallel using multithreaded code and then write to the database. I am not sure what is the right way to do that? Should I have five thread? One thread for each bundle? But what will happen in that scenario, suppose if I have 50 bundles, then I will have 50 threads?
  2. And also, I want to have timeout feature as well. If any bundles is taking lot of time than the threshold setup by us, then it should get timeout and log as an error that this bundle has taken lot of time.

I hope the question is clear enough.

Below is the code I have so far which is calling process method of each bundles sequentially one by one.

public void callBundles(final Map<String, Object> eventData) {

    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {

        // calling the process method of a bundle
        final Map<String, String> response = entry.getPlugin().process(outputs);

        // then write to the database.
        System.out.println(response);
    }
}

I am not sure what is the best and efficient way to do this? And I don't want to write sequentially. Because, in future, it might be possible that I will have more than 5 bundles.

Can anyone provide me an example of how can I do this? I have tried doing it but somehow it is not the way I am looking for.

Any help will be appreciated on this. Thanks.

Update:-

This is what I came up with-

public void callBundles(final Map<String, Object> eventData) {

    // Three threads: one thread for the database writer, five threads for the plugin processors
    final ExecutorService executor = Executors.newFixedThreadPool(5);

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();

    @SuppressWarnings("unchecked")
    final Map<String, String> outputs = (Map<String, String>)eventData.get(Constants.EVENT_HOLDER);

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
        executor.submit(new Runnable () {
            public void run() {
                final Map<String, String> response = entry.getPlugin().process(outputs);
                // put the response map in the queue for the database to read
                queue.offer(response);
            }
        });
    }

    Future<?> future = executor.submit(new Runnable () {
        public void run() {
            Map<String, String> map;
            try {
                while(true) {
                    // blocks until a map is available in the queue, or until interrupted
                    map = queue.take();
                    // write map to database
                    System.out.println(map);
                }
            } catch (InterruptedException ex) {
                // IF we're catching InterruptedException then this means that future.cancel(true)
                // was called, which means that the plugin processors are finished;
                // process the rest of the queue and then exit
                while((map = queue.poll()) != null) {
                    // write map to database
                    System.out.println(map);
                }
            }
        }
    });

    // this interrupts the database thread, which sends it into its catch block
    // where it processes the rest of the queue and exits
    future.cancel(true); // interrupt database thread

    // wait for the threads to finish
    try {
        executor.awaitTermination(5, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        //log error here
    }
}

But I was not able to add any timeout feature into this yet.. And also If I am run my above code as it is, then also it is not running.. I am missing anything?

Can anybody help me with this?

arsenal
  • 23,366
  • 85
  • 225
  • 331
  • For the first part, I would recommend that you take a look at [Executors](http://docs.oracle.com/javase/tutorial/essential/concurrency/executors.html). I believe some way has already posted a solution for timing out a task managed by executors. You might like to do some more research and see what you find – MadProgrammer Sep 05 '13 at 00:27
  • Thanks MadProgrammer for the suggestion. But somehow I am not able to put a piece of code together that will do the same thing as I mentioned above. It will be great for me to understand if you can provide a simple example of how to do that? Thanks.. – arsenal Sep 05 '13 at 00:37
  • Threading is a complex subject, 10 mins is hardly enough time to experiment with the concept, let alone a brand new API... – MadProgrammer Sep 05 '13 at 00:43

1 Answers1

0

This is BASIC example, partially based on the solution presented in ExecutorService that interrupts tasks after a timeout.

You will have to figure out the best way to get this implemented into your own code. Use it only as a guide!

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorExample {

    // This is used to "expire" long running tasks
    protected static final ScheduledExecutorService EXPIRE_SERVICE = Executors.newScheduledThreadPool(1);
    // This is used to manage the bundles and process them as required
    protected static final ExecutorService BUNDLES_SERVICE = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {

        // A list of the future tasks created by the BUNDLES_SERVICE.
        // We need this so we can monitor the progress of the output
        List<Future<String>> futureTasks = new ArrayList<>(100);
        // This is a list of all the tasks that have either completed
        // or begin canceled...we want these so we can determine
        // the results...
        List<Future<String>> completedTasks = new ArrayList<>(100);

        // Add all the Bundles to the BUNDLES_SERVICE
        for (int index = 0; index < 100; index++) {

            Bundle bundle = new Bundle();
            // We need a reference to the future so we can cancel it if we
            // need to
            Future<String> futureBundle = BUNDLES_SERVICE.submit(bundle);
            // Set this bundles future, see Bundle for details
            bundle.setFuture(futureBundle);
            // Add it to our monitor queue...
            futureTasks.add(futureBundle);

        }

        // Basically we are going to move all completed/canceled bundles
        // from the "active" to the completed list and wait until there
        // are no more "active" tasks
        while (futureTasks.size() > 0) {

            try {
                // Little bit of a pressure release...
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
            }

            // Check all the bundles...
            for (Future<String> future : futureTasks) {
                // If it has completed or was cancelled, move it to the completed
                // list.  AKAIK, isDone will return true is isCancelled is true as well,
                // but this illustrates the point
                if (future.isCancelled() || future.isDone()) {
                    completedTasks.add(future);
                }
            }

            // Remove all the completed tasks from the future tasks lists
            futureTasks.removeAll(completedTasks);
            // Some idea of progress...
            System.out.println("Still have " + futureTasks.size() + " outstanding tasks...");

        }

        // Dump the results...
        int index = 0;
        for (Future<String> future : completedTasks) {

            index++;
            System.out.print("Task " + index);
            if (future.isCancelled()) {

                System.out.println(" was canceled");

            } else if (future.isDone()) {

                try {
                    System.out.println(" completed with " + future.get());
                } catch (Exception ex) {
                    System.out.println(" failed because of " + ex.getMessage());
                }

            }

        }

        System.exit(0);

    }

    public static class ExpireBundle implements Runnable {

        private final Future futureBundle;

        public ExpireBundle(Future futureBundle) {
            this.futureBundle = futureBundle;
        }

        @Override
        public void run() {
            futureBundle.cancel(true);
        }

    }

    public static class Bundle implements Callable<String> {

        private volatile Future<String> future;

        @Override
        public String call() throws Exception {

            // This is the tricky bit.  In order to cancel a task, we
            // need to wait until it runs, but we also need it's future...
            // We could use another, single threaded queue to do the job
            // but that's getting messy again and it won't provide the information
            // we need back to the original calling thread that we are using
            // to schedule and monitor the threads...

            // We need to have a valid future before we can continue...
            while (future == null) {
                Thread.sleep(250);
            }

            // Schedule an expiry call for 5 seconds from NOW...this is important
            // I original thought about doing this when I schedule the original
            // bundle, but that precluded the fact that some tasks would not
            // have started yet...
            EXPIRE_SERVICE.schedule(new ExpireBundle(future), 5, TimeUnit.SECONDS);

            // Sleep for a random amount of time from 1-10 seconds
            Thread.sleep((long) (Math.random() * 9000) + 1000);

            return "Happy";

        }

        protected void setFuture(Future<String> future) {
            this.future = future;
        }
    }
}

Also. I had thought of using ExecutorService#invokeAll to wait for the tasks to complete, but this precluded the ability to timeout tasks. I don't like having to feed the Future into the Callable, but no other solution seemed to come to mind that would allow me to get the results from the submitted Future.

Community
  • 1
  • 1
MadProgrammer
  • 343,457
  • 22
  • 230
  • 366
  • Thanks for the suggestion. I updated the question with the code that I have got for now. But somehow, when I am running the above code, it is not printing out anything onto the console and also I was not able to add any timeout feature into that yet..Could you take a look into that? And let me know what wrong I am doing? – arsenal Sep 05 '13 at 01:59
  • I don't "think" you need the second runnable, which seems to be trying to take items out of the blocking queue. You could use `ExecutorService#invokeAll` which will block until all the tasks have completed (or failed) instead... – MadProgrammer Sep 05 '13 at 02:11