1

I wrote a simple Java program to help play with Threads and Thread pools to complete certain tasks. In my program, there are objects of class TheObject which have to have some kind of processing done to them (in this case, just a sleep delay and printing out their fields).

TheObject objects are placed in a queue, from which WorkerThreads draw them and process them.

I created a Manager class that initializes the TheObjectss and the WorkerThreads. I'm getting strange output when I run it; Instead of the Threads switching off the work, one thread is handling everything! Why is this happening? How do I get the threads to share the workload?

This is the output:

--------------------------------------------
alfred   a   0
Thread-0
--------------------------------------------
bob   b   1
Thread-0
--------------------------------------------
carl   c   2
Thread-0
--------------------------------------------
dave   d   3
Thread-0
--------------------------------------------
earl   e   4
Thread-0
--------------------------------------------
fred   f   5
Thread-0
--------------------------------------------
greg   g   6
Thread-0
--------------------------------------------
harry   h   7
Thread-0
--------------------------------------------
izzie   i   8
Thread-0
--------------------------------------------
jim   j   9
Thread-0
--------------------------------------------
kyle   k   0
Thread-0
--------------------------------------------
Larry   L   1
Thread-1
--------------------------------------------
Michael   M   2
Thread-1
--------------------------------------------
Ned   N   3
Thread-0
--------------------------------------------
Olaf   O   4
Thread-0
--------------------------------------------
Peter   P   5
Thread-0
--------------------------------------------
Quincy   Q   6
Thread-0
--------------------------------------------
Raphael   R   7
Thread-0
--------------------------------------------
Sam   S   8
Thread-0
--------------------------------------------
Trixie   T   9
Thread-0

The Code:

Classes:

  • Manager
  • TheObject
  • TheObjectQueue
  • WorkerThread

Manager

public class Manager {

    public static void main(String[] args) throws InterruptedException {
        TheObjectQueue queue = new TheObjectQueue();
        //Create Objects
        int numberOfInitialObjects = 10;
        String[] arrayOfNames = {"alfred", "bob", "carl", "dave", "earl", "fred", "greg", "harry", "izzie", "jim"};
        //char[] arrayOfChars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'};
        //int[] arrayOfNums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        for (int i = 0; i < numberOfInitialObjects; i++){
                TheObject anObject = new TheObject(arrayOfNames[i], arrayOfNames[i].charAt(0), i);
                queue.addToQueue(anObject);

        }

        int numberOfThreads = 2;
        for (int i = 0; i < numberOfThreads; i++){
            WorkerThread workerThread = new WorkerThread(queue);
            workerThread.start();
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        String[] arrayOfNames2 = {"kyle", "Larry", "Michael", "Ned", "Olaf", "Peter", "Quincy", "Raphael", "Sam", "Trixie"};
        for (int i = 0; i < numberOfInitialObjects; i++){
            TheObject anObject = new TheObject(arrayOfNames2[i], arrayOfNames2[i].charAt(0), i);
            queue.addToQueue(anObject);
        }

    }
}

TheObject

public class TheObject {
    private String someName;
    private char someChar;
    private int someNum;

    public TheObject(String someName, char someChar, int someNum) {
        super();
        this.someName = someName;
        this.someChar = someChar;
        this.someNum = someNum;
    }

    public String getSomeName() {
        return someName;
    }
    public char getSomeChar() {
        return someChar;
    }
    public int getSomeNum() {
        return someNum;
    }
}

TheObjectQueue

import java.util.LinkedList;

public class TheObjectQueue {

    private LinkedList<TheObject> objectQueue = null;

    public TheObjectQueue(){
        objectQueue = new LinkedList<TheObject>();
    }
    public LinkedList<TheObject> getQueue(){
        return objectQueue;
    }
    public void addToQueue(TheObject obj){
        synchronized (this) {
            objectQueue.addFirst(obj);
            this.notify();
        }

    }
    public TheObject removeFromQueue(){
        synchronized (this) {
            TheObject obj = objectQueue.removeLast();
            return obj;
        } 
    }
    public int getSize(){
        return objectQueue.size();
    }
    public boolean isEmpty(){
        return objectQueue.isEmpty();
    }
}

WorkerThread

import java.util.Random;

public class WorkerThread extends Thread{
    private TheObjectQueue queue;

    public WorkerThread(TheObjectQueue queue){
        this.queue = queue;
    }
    public void process(TheObject obj){
        //Stall for a random amount of time
        Random random = new Random();
        int randomInt = random.nextInt(3)*1000;
        try {
            Thread.sleep(randomInt);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        //Begin Printing
        System.out.println("--------------------------------------------");
        System.out.print(obj.getSomeName());
        System.out.print("   ");
        System.out.print(obj.getSomeChar());
        System.out.print("   ");
        System.out.println(obj.getSomeNum());
        System.out.println(this.getName());
    }

    @Override
    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    try {
                        queue.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                TheObject objToDealWith = queue.removeFromQueue();
                process(objToDealWith);
                super.run();
            }
        }
    }
}
CodyBugstein
  • 21,984
  • 61
  • 207
  • 363
  • 3
    Why not use an ExecutorService? – Brett Okken Jun 22 '14 at 18:50
  • 1
    @Jarrod I don't see how this is a duplicate. Even if the tools exist to do this differently, the question isn't about their use. I'm re-opening. – Sotirios Delimanolis Jun 22 '14 at 19:04
  • One possible reason for this behavior is `isEmpty()` being not synchronized, so all other worker threads, except `thread-0` always see queue as empty. Oops, no, you call `isEmpty` holding lock on `queue`, so `isEmpty()` should works as expected. – Victor Sorokin Jun 22 '14 at 20:35

2 Answers2

2

The synchronization over the (shared) queue in WorkerThread.run only allows a single thread to process a task at a time - the effect is effectively a one-worker pool! In this case, Thread 0 "wins" most of the time on obtaining the lock; synchronized lock acquisition is not guaranteed to be fair.

A simple fix is to fetch the task from the queue, using the required thread-safe construct, and then process the task outside of the synchronized section. This allows the workers to process the tasks, which are assumed to be independent, concurrently.

// e.g.
@Override
public void run() {
    while(true){
        TheObject objToDealWith; 
        synchronized (queue) {
            // Only synchronized on queue when fetching task
            objToDealWith = getTask(queue);
        }

        // Process task; independent of queue
        process(objToDealWith);
    }
}

Since one of the threads will be busy processing a task, while the other is obtaining (or has obtained) a lock on the queue, the work distribution will "be fair".

Community
  • 1
  • 1
user2864740
  • 60,010
  • 15
  • 145
  • 220
-1

Why so difficult? Just use the ExecutorService provided by java.

If you write it by hand you are doing it wrong.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample {

    public static void main(String[] args) {
//      configure here how many threads and other properties, the queue here is actually build in.
        ExecutorService executor = Executors.newCachedThreadPool();
        String[] arrayOfNames = { "alfred", "bob", "carl", "dave", "earl",
                "fred", "greg", "harry", "izzie", "jim" };
        for (int i = 0; i < arrayOfNames.length; i++) {
            TheObject anObject = new TheObject(arrayOfNames[i], arrayOfNames[i].charAt(0), i);
            MyRunnable runnalbe = new MyRunnable(anObject);
            executor.execute(runnalbe);
        }
        executor.shutdown()
    }

    static class MyRunnable implements Runnable {

        final TheObject anObject;

        MyRunnable(TheObject theObject) {
            this.anObject = theObject;
        }

        @Override
        public void run() {
            //TODO do work with anObject
        }

    }

    static class TheObject {
        private String someName;
        private char someChar;
        private int someNum;

        public TheObject(String someName, char someChar, int someNum) {
            this.someName = someName;
            this.someChar = someChar;
            this.someNum = someNum;
        }

        public String getSomeName() {
            return someName;
        }

        public char getSomeChar() {
            return someChar;
        }

        public int getSomeNum() {
            return someNum;
        }
    }
}
Zarathustra
  • 2,853
  • 4
  • 33
  • 62
  • 1
    What part of the question are you answering? – Sotirios Delimanolis Jun 22 '14 at 19:01
  • At most, this should be a comment. – Sotirios Delimanolis Jun 22 '14 at 19:05
  • all. Reinventing a Queue? No way. Reinventing an ExecutorService/ThreadPool? No. Reinventing the Wheel? No. Reinventing a ThreadFactory? No. So my answer is: use the tools that are provided by java. => If you use these Classes there would be no problem. – Zarathustra Jun 22 '14 at 19:06
  • 2
    Then show us how you would use `ExecutorService` to solve OP's problems. Link only answers are [extremely _poor_](http://meta.stackexchange.com/questions/8231/are-answers-that-just-contain-links-elsewhere-really-good-answers). – Sotirios Delimanolis Jun 22 '14 at 19:08
  • @Zarathustra As the code stands and with including `user2864740`'s answer, is there anything missing that would be included if using `ExecutorService`? – CodyBugstein Jun 22 '14 at 19:17
  • @Imray I updated my answer and added some example code. – Zarathustra Jun 22 '14 at 19:18
  • One important reason for OP's described behavior is the code in the `run()` method and how the `TheObjectQueue` is used. You haven't addressed the problem. – Sotirios Delimanolis Jun 22 '14 at 19:20
  • @Zarathustra Thanks. So I'm guessing that when `executor.execute(runnalbe);` is called, the object is added to some sort of `Queue` structure... and Executor takes care of the handling. Right? – CodyBugstein Jun 22 '14 at 19:20
  • @Imray right, you can even decide what for a queue to use, there are many factory methods from the `Executors` class. – Zarathustra Jun 22 '14 at 19:22