-1

I am trying to create a basic Semaphore implementation using Queue. The idea is, there is a database, and there are 10 writers. Writers can only write to the database in mutual exclusion. I am using Queue because I want to implement First In First Out and Last In First Out.

Using Semaphore, I can't notify a specific thread to wake up. So my idea is what I am doing is for every Writer, I create an object and tell the Writer to wait on that object. Puts that object in a queue. Then remove the object from the queue and notify the Thread that is waiting on that object. In this way, I think I can make a FIFO or LIFO implementation.

I need help on the actual code implementation: 1. I run the code below, it gave me a lot of IllegalMonitorStateException. 2. FIFO and LIFO code (my FIFO code seems incorrect, while for LIFO code, I'm thinking to use Stack instead of Queue).

public class Test {
  public static void main(String [] args) {
    Database db = new Database();

    for (int i = 0; i < 10; i++) 
      (new Thread(new Writer(db))).start();
  }
}

public class Writer implements Runnable {

  private Database database;

  public Writer(Database database) {
    this.database = database;
  }

  public void run() {
    this.database.acquireWriteLock();

    this.database.write();

    this.database.releaseWriteLock();
  }
}

public class Database {

  private Semaphore lockQueue;

  public Database() {
    this.lockQueue = new Semaphore();
  }

  public void write() {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ie) {}
  }

  public void acquireWriteLock() {
    lockQueue.acquire();
  }

  public void releaseWriteLock() {
    lockQueue.release();
  }
}

import java.util.Queue;
import java.util.LinkedList;

public class Semaphore {
  private Queue<Object> queue;

  public Semaphore() {
    this.queue = new LinkedList<Object>();
  }

  public synchronized void acquire() {
    Object object = new Object();

    try {
      if (this.queue.size() > 0) {
        object.wait();
        this.queue.add(object);
      }
    } catch (InterruptedException ie) {}

    this.queue.add(object);
  }

  public synchronized void release() {
    Object object = this.queue.remove();
    object.notify();
  }
}
Christian Sakai
  • 929
  • 3
  • 9
  • 25

1 Answers1

1

You need to acquire the lock of the object before you can use wait() and notify(). Try to check if the following code will work:

public class Semaphore {
    private Queue<Object> queue;
    private int state;

    public Semaphore() {
        this.queue = new LinkedList<Object>();
    }

    public void acquire() {
        Object object = new Object();

        synchronized (object) {
            try {
                if (this.state > 0) {
                    this.queue.add(object);
                    object.wait();                  
                }  else {
                    state++;
                }

            } catch (InterruptedException ie) {
            }

        }
    }

    public void release() {
        Object object = this.queue.poll();

        state--;

        if(null == object) {
            return;
        }

        synchronized (object) {
            object.notify();
        }
    }
}
  • What is the difference between this code and the one I wrote above? I'm using `object.wait()` instead of `wait()` because I want to wait on that object, so I can use it as a queue. If I just use `wait()`, wouldn't that means that I'm waiting on this instance of Semaphore, and when I do `notify()`, I cannot notify a specific thread to wake up – Christian Sakai Nov 22 '16 at 04:25
  • I edited the code based from the following reasons: 1st, IllegalMonitorStateException is thrown since the Semaphore instance is locked instead of the object's instance. 2nd, state integer is used instead of queue's size to determine if there are working threads. If Queue's size is used, and the object is added, the thread might remove the same object, resulting to deadlock. – Jessie Brian Revil Nov 22 '16 at 05:33
  • Thank you, this is exactly what I need. Can you elaborate more on why it is a synchronized block on the acquire and release instead of synchronized methods – Christian Sakai Nov 23 '16 at 02:40
  • `synchronized` method is basically the same as `synchronized(this)` block. Since invocation of `wait()` and `notify()` must be in `synchronized` block of that object (see: [http://stackoverflow.com/questions/2779484/why-must-wait-always-be-in-synchronized-block](http://stackoverflow.com/questions/2779484/why-must-wait-always-be-in-synchronized-block)), `synchronized(object)` is used. – Jessie Brian Revil Nov 23 '16 at 02:52
  • Thank you so much! – Christian Sakai Nov 23 '16 at 03:06
  • I think there is a bug in the above solution. If two threads tried to acquire permits from semaphore, they might pass, even if they should not Race condition: When two threads synchronize on two separate objects at the same time, they see a preferable state and go ahead with state increment. Here I have implemented a fixed version of the above code. https://github.com/pixels89/Practice/blob/master/Algo/src/in/barmans/practice/threading/SemaphoreSimple.java – Mukesh Barman May 29 '19 at 06:57