0

I have a problem with some threads.

My script

1 - loads like over 10 millions lines into an Array from a text file

2 - creates an ExecutorPool of 5 fixed threads

3 - then it is iterating that list and add some threads to the queue

executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));

Now the active threads never bypass 5 fixed threads, which is good, but i obseved that my processor goes into 100% load, and i have debuged a little bit and i saw that MyCustomThread constructor is being called, witch means that no matter if i declare 5 fixed threads, the ExecutorService will still try to create 10 milions objects.

The main question is : How do i prevent this? I just want to have threads being rejected if they don't have room, not to create 10 million object and run them one by one.

Second question : How do i get the current active threads? I tried threadGroup.activeCount() but it always give me 5 5 5 5 ....

THE CALLER CLASS :

System.out.println("Starting threads ...");
final ThreadGroup threadGroup = new ThreadGroup("workers");
//ExecutorService executor = Executors.newFixedThreadPool(howManyThreads);

ExecutorService executor = Executors.newFixedThreadPool(5,new ThreadFactory() {
    public Thread newThread(Runnable r) {
        return new Thread(threadGroup, r);
    }
});

int increment = 0;              
for(String line : arrayOfLines)
{
    if(increment > 10000)
    {
        //System.out.println("TOO MANY!!");
        //System.exit(0);
    }

    System.out.println(line);
    System.out.println(threadGroup.activeCount());

    if(threadGroup.activeCount() >= 5)
    {
        for(int i = 0; i < 10; i++)
        {
            System.out.println(threadGroup.activeCount());
            System.out.println(threadGroup.activeGroupCount());
            Thread.sleep(1000);
        }
    }


    try
    {
        executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));
    }
    catch(Exception ex)
    {
        continue;
        //System.exit(0);
    }

    increment++;
}

executor.awaitTermination(10, TimeUnit.MILLISECONDS);
executor.shutdown();

THREAD CLASS :

public class MyCustomThread extends Thread
{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyCustomThread(String ip)
    {
        this.ip = ip;
    }

    public MyCustomThread(String ip,int threadTimeout,String threadName)
    {

        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyCustomThread constructor has been called!");
    }

    @Override
    public void run()
    {
        // do some stuff that takes time ....
    }
}

Thank you.

Damian
  • 761
  • 1
  • 9
  • 26

2 Answers2

2

You are doing it a bit wrong. The philosophy with executors is that you implement the work unit as a Runnable or a Callable (instead of a Thread). Each Runnable or Callable should do one atomic piece of work which is mutually exclusive of other Runnables or Callables.

Executor services internally use a pool of threads so your creating a thread group and Thread is not doing any good.

Try this simple piece:

ExecutorService executor = Executors.newFixedThreadPool(5);`
executor.execute(new MyRunnableWorker());

public class MyRunnableWorker implements Runnable{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyRunnableWorker(String ip){
        this.ip = ip;
    }

    public MyRunnableWorker(String ip,int threadTimeout,String threadName){
        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyRunnableWorker constructor has been called!");
    }

    @Override
    public void run(){    {
        // do some stuff that takes time ....
    }
}

This would give you what you want. Also try to test you thread code execution using visualVM to see how threads are running and what the load distribution.

Nazgul
  • 1,892
  • 1
  • 11
  • 15
  • 1
    First of all, i don't understand something, i need only a class? Second, i already tried that, and it is the same thing ... look (http://pastebin.com/dimVnLiF). And believe me, when i pass 10.000 rows in my txt it is working just fine, the problem is when i pass things to the executor pool (many). – Damian Apr 28 '14 at 19:09
  • use execute instead of submit. – Nazgul Apr 29 '14 at 07:25
  • Sweet, it is working, but what is the differance between `execute` and `submit` ?? – Damian Apr 29 '14 at 15:33
  • please do upvote the answer if it helped. submit is used for cases when you expect the worker to return you back something. submit returns you back a Future object representing pending completion of the task which you are expected to receive and call get on to allow the executor to complete the finishing tasks. With execute there is no such obligation. execute is more like 'do this for me and forget it, dont bother to hit back'. That's why i said, break the main job into atomic work pieces that are mutually exclusive and do those pieces in runnables. – Nazgul Apr 29 '14 at 15:49
  • oh, thank you for the answer, but why would i need the thread to return something? The ideea of a thread is just make many things in paralel. If i want threads to comunicate, just make a database, a static variable ... not make them choke and go 100% when you have 10.000.000 things to do, right? – Damian Apr 29 '14 at 15:53
  • Not the thread, a Callable. Sometimes you may need the work pieces to return back a computation result. e.g: read CSV lines and give 100 lines to each worker. Worker is expected to find count of word 'hiya' in the lines. each worker keeps a local count and when it finishes it returns it back to you. You keep a list or array of all worker count values returned and add them up to get grand total. I know you can do it by keeping an atomic int in parent class also but this is a better design with 'mutual exclusion' of shared data. – Nazgul Apr 29 '14 at 15:57
  • read about threads etc at this link http://walivi.wordpress.com/2013/08/24/concurrency-in-java-a-beginners-introduction/ – Nazgul Apr 29 '14 at 15:58
  • I see that there is A LOT to read. Guess i have to learn more, thank you so very much! – Damian Apr 29 '14 at 21:23
  • I have one more question. Couldn't this made be possible using `extend Thread` ? – Damian Apr 29 '14 at 21:25
  • Not actually. See the idea of executors is that "you make the work tasks, i make the threads to execute those work tasks in parallel". So executors free u from managing the threads. U just need to concentrate on work tasks. If you start making threads yourself, that defeats the purpose of the executor. – Nazgul Apr 30 '14 at 05:26
  • thank you for your answer, for me (this is how i see it), `Thread` class and `Runnable` class just helps me make a `method` run in parallel/background, and the only differance i see is that `Runnable` don't create the object from the beginning like `Thread` does ... i'm i right? – Damian Jun 02 '14 at 19:19
  • right. Just a small correction. Dont think in terms of 'method' to be run in parallel. think in terms of some atomic job sets or tasks that should be run together as a single unit. All such jobs should go in a new Runnable of Thread. Jobs that dont depend on each other and dont share data too, can go in separate Runnable/threads. – Nazgul Jun 03 '14 at 05:20
  • i understand it crystal clear, but when do you have to use the `Thread` class? when you want some response from the thread? If so, it can be done with `Runnable` as well, just declare a `private static ArrayList messagesFromThreads ...` play whit it, and the main thread should connect to this `locked static` variable and you are good to go ..., don't even need to read `Thread`class documentation again. what do you think about this? – Damian Jun 03 '14 at 16:16
  • Frankly, with the birth of Executors, direct use of Threads is discouraged now. Direct Thread class extension is more of a study concept now to teach use of Threads. Production and big apps use executors now. – Nazgul Jun 04 '14 at 04:02
  • i understand now, `Thread` class is for learning, i am ok with that, thank you very much for your answers. i will be glad to help you as well if i can :) – Damian Jun 04 '14 at 19:16
  • 1
    its ok ...no problem...sharing knowledge spreads love of programming. – Nazgul Jun 05 '14 at 05:39
1

I think your biggest problem here is that MyCustomThread should implement Runnable, not extend Thread. When you use an ExecutorService you let it handle the Thread management (i.e. you don't need to create them.)

Here's an approximation of what I think you're trying to do. Hope this helps.

public class FileProcessor
{

    public static void main(String[] args)
    {

        List<String> lines = readFile();
        System.out.println("Starting threads ...");
        ExecutorService executor = Executors.newFixedThreadPool(5);

        for(String line : lines)
        {
            try
            {
                executor.submit(new MyCustomThread(line));
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
            }
        }

        try
        {
            executor.shutdown();
            executor.awaitTermination(10, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            System.out.println("A processor took longer than the await time to complete.");
        }
        executor.shutdownNow();

    }

    protected static List<String> readFile()
    {
        List<String> lines = new ArrayList<String>();
        try
        {
            String filename = "/temp/data.dat";
            FileReader fileReader = new FileReader(filename );
            BufferedReader bufferedReader = new BufferedReader(fileReader);
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                lines.add(line);
            }
            bufferedReader.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        return lines;
    }
}

public class MyCustomThread implements Runnable
{

    String line;

    MyCustomThread(String line)
    {
        this.line = line;
    }

    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getName() + " processed line:" + line);

    }

}

EDIT: This implementation does NOT block on the ExecutorService submit. What I mean by this is that a new instance of MyCustomThread is created for every line in the file regardless of whether any previously submitted MyCustomThreads have completed. You could add a blocking / limiting worker queue to prevent this.

ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LimitedQueue<Runnable>(10));

An example of a blocking / limiting queue implementation can be found here:

Community
  • 1
  • 1
MarkOfHall
  • 3,334
  • 1
  • 26
  • 30
  • Thank you for your answer, but trust me, it is the same thing, still goes to 100%, the objects are still being created, why is that? – Damian Apr 28 '14 at 17:17
  • What's wrong with your CPU going to 100%? Have you tried processing a smaller dataset? – MarkOfHall Apr 28 '14 at 17:33
  • Yes, with 10.000 lines it is just fine, the problem is when the THREAD object are beeing created. I see 100% on my cpu once i see ` System.out.prinln("MyCustomThread constructor has been called!");` being executed. This is the problem, i am sure about this. I also had found an example, but i have to read a little bit http://www.javacodegeeks.com/2013/01/java-thread-pool-example-using-executors-and-threadpoolexecutor.html – Damian Apr 28 '14 at 17:37
  • Thank you so much, i have gave you a +1. Seems that i have to use `Runnable` and `submit` instead of `execute`. I have one more question. Couldn't this made be possible using `extend Thread` ? – Damian Apr 29 '14 at 21:24
  • Yes, you "could" do this extending Thread, but extending Thread and creating a new instance of Thread for every line in the file would be bad. First, Threads are "expensive" objects to create and manager. Second, the rest of the code defines a thread pool backed ExecutorService which will help us manage the Thread / Pool allocations, etc. Since the java.util.concurrent package was introduced in Java 1.5, the preferred approach has been to use the higher level constructs in the concurrent package. – MarkOfHall Apr 30 '14 at 03:36
  • then basically `Thread` class is junk ... i mean excuse me if i am wrong, but in this case `Runnable` is meant for threading ... for example if you have 10.000.000 tasks, why would you use `Thread` class? To create all the objects before? To overwhelm your server? – Damian Jun 02 '14 at 19:22
  • if `Thread` class, no matter what you do to her, it will always create objects when you call it, then it is just a toy that you can learn threads for the first time ... – Damian Jun 02 '14 at 19:23