1

I am trying to write a program in Java using ExecutorService and its function invokeAll. My question is: does the invokeAll function solve the tasks simultaneously? I mean, if I have two processors, there will be two workers at the same time? Because aI can't make it scale correctly. It takes the same time to complete the problem if I give newFixedThreadPool(2) or 1.

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map is a class that implements Callable and wp is a vector of Partial Solutions, a class that holds some information in different times.

Why doesn't it scale? What could be the problem?

This is the code for PartialSolution:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

An this is the code for Map:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

So in Map i take every line from the fragment and search for every expression the number of appearances and i save also the number of line. After i process all the fragment, in the same PartialSolution i save the informations in a hash map and return the new PartialSolution. In the next step i combine the PartialSolutions with the same fileName and introduce them in a Callable class Reduce, who is the same as map, the difference is that it makes other operations, but returns also a PartialSolution.

This is the code to run the Map tasks:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

In task i create task of type Map and in list i obtain them. I don't know how to read the JVM thread dump. I hope it's good enough what informations i gave you. I work in NetBeans 7.0.1 if that helps.

Thank you, Alex

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • How many tasks do you have? And what do they do? Is there a lot of I/O? – Thilo Nov 29 '11 at 08:07
  • My tasks are those callable classes, that use PartialSolution, which have some text and count how manny times a word appears that text and the lines. PartialSolution is actually a part from a text, and i want to obtain those informations for every part and then to unite them, with another Callable class called Reduce. I want to process those parts simultaneously. depending the number of processors i have. I/O will be at the end, when i will unite all tasks and from 10 parts to say, and will have just one with all the information about that document. It's MapReduce that Google use. – Stanciu Alexandru-Marian Nov 29 '11 at 08:41
  • What i want to know is if the method invokeAll, if i created the ExcutorService with 10 threads, will solve 10 tasks at the same time or will solve one at a time? In Map i have a constructor and i implement the function call() that returns another PartialSolution but this time with the proper informations. And another question, if i say list.get(i).get() this will return the PartialSolution after it was solved wright? I really don't understand why doesn't the time improves if i use 2 threads instead of 1. Why doesn't it scales wright? – Stanciu Alexandru-Marian Nov 29 '11 at 09:36
  • You could've use `homework` tag. (and also hope that noone will copy your code) – Iulius Curt Dec 14 '11 at 17:35

3 Answers3

2

What i want to know is if the method invokeAll, if i created the ExcutorService with 10 threads, will solve 10 tasks at the same time or will solve one at a time?

If you submit ten tasks to an ExecutorService with ten threads, it will run them all concurrently. Whether they can proceed completely parallel and independent from each-other depends on what they are doing. But they will each have their own thread.

And another question, if i say list.get(i).get() this will return the PartialSolution after it was solved?

Yes, it will block until the computation is done (if not done already) and return its result.

I really don't understand why doesn't the time improves if i use 2 threads instead of 1.

We need to see more code. Do they synchronize on some shared data? How long do these tasks take? If they are very short, you may not notice any difference. If they take longer, look at the JVM thread dump to verify that all of them are running.

Thilo
  • 257,207
  • 101
  • 511
  • 656
  • 2
    +1. There is one error though: invokeAll returns a list of completed futures. In other terms: it only returns when all the tasks have completed. – JB Nizet Nov 29 '11 at 11:05
0

Java 8 has introduced one more API in Executors - newWorkStealingPool to create work stealing pool. You don't have to create RecursiveTask and RecursiveAction but still can use ForkJoinPool.

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism level.

By default, it will take number of CPU cores as parameter for parallelism. If you have core CPUs, you can have 8 threads to process the work task queue.

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

Either ExecutorService or ForkJoinPool or ThreadPoolExecutor performance would be good if you don't have shared data and shared locking (synchronization) and inter thread communication. If all task are independent of each other in task queue, performance would be improved.

ThreadPoolExecutor constructor to customize and control workflow of tasks:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

Have a look at related SE questions:

How to properly use Java Executor?

Java's Fork/Join vs ExecutorService - when to use which?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
0

If you create the thread pool with two threads, then two tasks will be run at the same time.

There's two things I see that could be causing two threads to take the same amount of time as one thread.

If only one of your Map tasks is taking the majority of your time, then extra threads will not make that one task run faster. It cannot finish faster than the slowest job.

The other possibility is that your map task reads from a shared vector often. This could be causing enough contention to cancel out the gain from having two threads.

You should bring this up in jvisualvm to see what each thread is doing.

Michael Krussel
  • 2,586
  • 1
  • 14
  • 16
  • I have installed VisualVM but i don't know how to use it, i mean i don't know were to look, how to read the data. Some help please. – Stanciu Alexandru-Marian Nov 29 '11 at 23:16
  • I have made this steps: Profiler - > CPU - > right click and then Thread Dump...but i don't understand a thing. – Stanciu Alexandru-Marian Nov 29 '11 at 23:24
  • @StanciuAlexandru-Marian I would recommend naming your threads something meaningful using a ThreadFactory. Then find the threads in the list of threads. Then check how the state of each thread changes while the code is running. This will give you an indication of how much work each thread is doing. If one thread is waiting, you can then do a thread dump to see what it is waiting on. – Michael Krussel Nov 30 '11 at 14:51
  • I have solved my problem. It was the fault of my computer. Although I have Intel 2 Duo Core it seems that it works very bad. I don't know why, when it happend. I tested on the cluster of my faculty and it works very fast and it scales. Thank you for all of your help and I hope I didn't cause you too much trouble. – Stanciu Alexandru-Marian Nov 30 '11 at 15:42