2

I am trying to implement the multithreaded approach using executor interface where i have produced multiple threads in main class

class Main
{
    private static final int NTHREADS = 10;

    public static void main(String[] args)
    {
        .........
        String str = createThreads(document);
        .............
    }


    public String createThreads(String docString)
    {

        ........
        .......
        Map<String,String> iTextRecords = new LinkedHashMap<String, String>();
        if(!iText.matches(""))
        {
            String[] tokenizedItext = iText.split("\\^");
            ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);
            for(int index = 0 ;index < tokenizedItext.length;index++)
            {
                Callable<Map<String,String>> worker = null;
                Future<Map<String,String>> map = null;
                if(tokenizedItext[index].matches("^[0-9.<>+-= ]+$") || tokenizedItext[index].matches("^\\s+$"))
                {
                    iTextRecords.put(tokenizedItext[index],tokenizedItext[index]);
                }
                else
                {
                    worker = new MultipleDatabaseCallable(tokenizedItext[index],language);
                    map = executor.submit(worker);
                    try
                    {
                        iTextRecords.putAll(map.get());
                    }
                    catch(InterruptedException ex)
                    {
                        ex.printStackTrace(System.out);
                    }
                    catch(ExecutionException ex)
                    {
                        ex.printStackTrace(System.out);
                    }
                }

            }

            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated())
            {

            }

    }
}

The Callable class is as

class MultipleDatabaseCallable implements Callable<Map<String,String>> 
{
    @Override
    public Map<String, String> call() throws Exception {

        System.out.println("Entering: "+Thread.currentThread().getName());
        Map<String,String> map = new HashMap<String,String>();
        for(int i =0;i<50000;i++)
        {
            for(int i1 = 0 ;i1<5000;i1++)
            {
                for(int i2 =0;i2 <500;i2++)
                {

                }
            }
        }
        System.out.println("Exiting: "+Thread.currentThread().getName());
        return map;
    }
}

output I am getting is

Entering: pool-1-thread-1
Exiting: pool-1-thread-1
Entering: pool-1-thread-2
Exiting: pool-1-thread-2
Entering: pool-1-thread-3
Exiting: pool-1-thread-3
Entering: pool-1-thread-4
Exiting: pool-1-thread-4
Entering: pool-1-thread-5
Exiting: pool-1-thread-5
Entering: pool-1-thread-6
Exiting: pool-1-thread-6

While looking at the output it seems like only one thread is entering at a time in the call method and other thread enters only when previous one exist. However it is expected that the multiple threads should enter and execute call() method. Also when I am executing the same program by making NTHREADS = 1. it is taking same time as it is taking with NTHREADS =10

so it seems like the application is running as good as a single threaded application.please suggest whats wrong i am doing in implementation.

Thanks

Nishit Jain
  • 1,549
  • 8
  • 21
  • 33
  • I guess the workload is too low. Your for-loops will be optimized away if you do not do anything in it. Put a Thread.sleep(1) into the innermost loop and test again. – Fildor Sep 27 '13 at 11:44
  • I tried with that too and the program goes on halt until the thread1 exits – Nishit Jain Sep 27 '13 at 11:48
  • 1
    `iTextRecords.putAll(map.get());` is like join until data available. – Ortwin Angermeier Sep 27 '13 at 11:58
  • A side note: You should try to avoid `while (!executor.isTerminated()){}` Because you do busy waiting. Have a look here: http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish – subsub Sep 27 '13 at 14:57

2 Answers2

6

When you call

                map = executor.submit(worker);

the value returned map in this case is a Future. Meaning that it does not have a value, until the callable has returned one. Now when you call

                    iTextRecords.putAll(map.get());

What happens is that the current thread blocks (inside the map.get()) waiting for the callable to return (in the other thread).

Since you always wait for a callable to be finished (per map.get()) before submitting a new one (per executor.submit()) you enforce the sequential execution you observe.

In order to execute the tasks in parallel, you have to start them all before calling get for the first time. You could for instance create an ArrayList<Future<Map<String,String>>> futures = ... and then do

  futures.add(executor.submit(worker)); 

to submit the tasks (no need for the map variable) and create a second loop (after the for(int i ...) loop):

 for(Future<Map<String,String>> f: futures) {
     iTextRecords.putAll(f.get);
 }
subsub
  • 1,857
  • 10
  • 21
  • Missed the get ... +1 – Fildor Sep 27 '13 at 11:50
  • So is there any way to execute multiple threads when you want to get the return values? – Nishit Jain Sep 27 '13 at 12:02
  • I've updated the answer to explain how to collect the return values. – subsub Sep 27 '13 at 12:21
  • thanks that thing worked..but in my case the number of iteration will go up to several thousand increasing the array list size to give OutOfMewmoryException and garbageCollector overhead exception. – Nishit Jain Sep 27 '13 at 13:21
  • True. In this case: add to the iTextRecords (using ConcurrentHashMap instead of the LinkedHashMap) inside the Callables (you can use Runnables instead since you don't need the return value). – subsub Sep 27 '13 at 14:43
  • On the other hand, since the FixedThreadPool is using an unbounded queue you have also potentially the problem of waiting Runnables to use up all memory. So you'll either need some method of "controlling" throughput or rethink you design to limit the number of runnables. – subsub Sep 27 '13 at 14:50
  • Actually I can't use Runnable because I want each thread to return a value (map) – Nishit Jain Sep 28 '13 at 13:36
  • I understood that. What I meant was using runnables would be possible if instead of returning you store the data into iTextRecords directly. – subsub Sep 28 '13 at 14:23
  • But how can I make values in runnable to be available to iTextRecord in caller class – Nishit Jain Sep 28 '13 at 16:30
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/38244/discussion-between-subsub-and-nishit-jain) – subsub Sep 28 '13 at 18:28
0

You must collect your futures while you submit the callables. Call get() on your futures only after you finish submitting.

Ralf H
  • 1,392
  • 1
  • 9
  • 17