1

I am doing a sample program with wait() and notify(), but when notify() is called, more than one thread is wakes up instead of one.

The code is:

public class MyQueue<T> {

    Object[] entryArr;
    private volatile int addIndex;

    private volatile int pending = -1;
    private final Object lock = new Object();

    private volatile long notifiedThreadId;
    private int capacity;

    public MyQueue(int capacity) {
        entryArr = new Object[capacity];
        this.capacity = capacity;
    }

    public void add(T t) {
        synchronized (lock) {
            if (pending >= 0) {
                try {
                    pending++;
                    lock.wait();
                    System.out.println(notifiedThreadId + ":" + Thread.currentThread().getId());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else if (pending == -1) {
                pending++;
            }
        }

        if (addIndex == capacity) { // its ok to replace existing value
            addIndex = 0;
        }

        try {
            entryArr[addIndex] = t;
        } catch (ArrayIndexOutOfBoundsException e) {
            System.out.println("ARRAYException:" + Thread.currentThread().getId() + ":" + pending + ":" + addIndex);
            e.printStackTrace();
        }

        addIndex++;

        synchronized (lock) {
            if (pending > 0) {
                pending--;
                notifiedThreadId = Thread.currentThread().getId();
                lock.notify();
            } else if (pending == 0) {
                pending--;
            }
        }
    }

}

public class TestMyQueue {

    public static void main(String args[]) {
        final MyQueue<String> queue = new MyQueue<>(2);

        for (int i = 0; i < 200; i++) {
            Runnable r = new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < Integer.MAX_VALUE; i++) {
                        queue.add(Thread.currentThread().getName() + ":" + i);
                    }
                }
            };
            Thread t = new Thread(r);
            t.start();
        }
    }

}

After some time, I see two threads being wake up by single thread. The output looks like:

91:114
114:124
124:198
198:106
106:202
202:121
121:40
40:42
42:83
83:81
81:17
17:189
189:73
73:66
66:95
95:199
199:68
68:201
201:70
70:110
110:204
204:171
171:87
87:64
64:205
205:115

Here I see 115 thread notified two threads, and 84 thread notified two threads; because of this we are seeing the ArrayIndexOutOfBoundsException.

115:84

115:111

84:203

84:200

ARRAYException:200:199:3

ARRAYException:203:199:3

What is the issue in the program?

Gray
  • 115,027
  • 24
  • 293
  • 354
CodingJDev
  • 11
  • 1
  • 2
    It seems you missed the actual purpose of `synchronized`. It’s to guard the access to the shared resource, not for performing wait and notify while accessing the shared resource entirely unprotected. Further, you should read the documentation of [`Object.wait()`](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait--) really carefully, especially the “*…spurious wakeups are possible, and this method should always be used in a loop*” part. – Holger Feb 09 '17 at 15:03
  • Thanks for your quick reply. I know we can use concurrent Lock. but My task is to make a lock using wait() and notify(). So at any point of time only one thread should execute the code between the synchronized block. – CodingJDev Feb 09 '17 at 15:18
  • Where in my comment did I say anything about “concurrent Lock”? The `synchronized` block *must span the entire operation*, including every access to the shared data structure, not just the parts where you perform `wait` or `notify`. You are accessing the `entryArr` and `addIndex` outside of `synchronized` blocks and declaring `addIndex` as `volatile` doesn’t help as it doesn’t make updates atomic. – Holger Feb 09 '17 at 15:22

1 Answers1

1

What is the issue in the program?

You have a couple of problems with your code that may be causing this behavior. First, as @Holder commented on, there are a lot of code segments that can be run by multiple threads simultaneously that should be protected using synchronized blocks.

For example:

if (addIndex == capacity) {
    addIndex = 0;
}

If multiple threads run this then multiple threads might see addIndex == capacity and multiple would be overwriting the 0th index. Another example is:

addIndex++;

This is a classic race condition if 2 threads try to execute this statement at the same time. If addIndex was 0 beforehand, after the 2 threads execute this statement, the value of addIndex might be 1 or 2 depending on the race conditions.

Any statements that could be executed at the same time by multiple threads have to be properly locked within a synchronized block or otherwise protected. Even though you have volatile fields, there can still be race conditions because there are multiple operations being executed.

Also, a classic mistake is to use if statements when checking for over or under flows on your array. They should be while statements to make sure you don't have the class consumer producer race conditions. See my docs here or take a look at the associated SO question: Why does java.util.concurrent.ArrayBlockingQueue use 'while' loops instead of 'if' around calls to await()?

Community
  • 1
  • 1
Gray
  • 115,027
  • 24
  • 293
  • 354