2

So I'm attending a course in multi threaded development and are currently learning about semaphores. In our latest assignment we are supposed to use three threads and two queues. The writer thread will write chars to the first queue, then a "encryptor" thread will read the chars from that queue, encrypt the char and then add it to the second queue. Then we have a reader thread which reads from the second queue. To handle synchronization we are supposed to use semaphore's and mutex, but I managed without any:

public class Buffer {
private Queue<Character> qPlain = new LinkedList<Character>();
private Queue<Character> qEncrypt = new LinkedList<Character>();
private final int CAPACITY = 3;

public Buffer() {
    System.out.println("New Buffer!");
}

public synchronized void addPlain(char c) {
    while (qPlain.size() == CAPACITY) {
        try {
            wait();
            System.out.println("addPlain is waiting to add Data");
        } catch (InterruptedException e) {
        }
    }
    qPlain.add(c);
    notifyAll();
    System.out.println("addPlain Adding Data-" + c);
}

public synchronized char removePlain() {
    while (qPlain.size() == 0) {
        try {
            wait();
            System.out.println("----------removePlain is waiting to return Data.");
        } catch (InterruptedException e) {
        }
    }
    notifyAll();
    char c = qPlain.remove();
    System.out.println("---------------removePlain Returning Data-" + c);
    return c;
}

public synchronized void addEncrypt(char c) {
    while (qEncrypt.size() == CAPACITY) {
        try {
            wait();
            System.out.println("addEncrypt is waiting to add Data");
        } catch (InterruptedException e) {
        }
    }

    qEncrypt.add(c);
    notifyAll();
    System.out.println("addEncrypt Adding Data-" + c);
}

public synchronized char removeEncrypt() {
    while (qEncrypt.size() == 0) {
        try {
            wait();
            System.out.println("----------------removeEncrypt is waiting to return Data.");
        } catch (InterruptedException e) {
        }
    }
    notifyAll();
    char c = qEncrypt.remove();
    System.out.println("--------------removeEncrypt Returning Data-" + c);
    return c;
}

}

So this works fine, but I'm not going to pass as I haven't used any semaphore. I do understand the concept, but I just don't see the point to use any in this case. I have 2 queues and just one reader and writer for each one.

EDIT: Updated with Semaphores instead. It almost works, problem arises when the removePlain() method get's called when the queue is empty. I'm pretty sure I should block it, but I'm lost here. Could I not just use a mutex here instead?

public class Buffer {
private Semaphore encryptedSem = new Semaphore(0);
private Semaphore decryptedSem = new Semaphore(0);
private final Queue<Character> qPlain = new LinkedList<Character>();
private final Queue<Character> qEncrypt = new LinkedList<Character>();
private final int CAPACITY = 3;
private boolean startedWrite = false;
private boolean startedRead = false;

/**
 * Adds a character to the queue containing non encrypted chars.
 * 
 * @param c
 */
public void addPlain(char c) {

    // Makes sure that this writer executes first.
    if (!startedWrite) {
        startedWrite = true;
        encryptedSem = new Semaphore(1);
    }

    if (qPlain.size() < CAPACITY) {
        aquireLock(encryptedSem);
        System.out.println("addPlain has lock");
        qPlain.add(c);
        realeseLock(encryptedSem);
    }
}

/**
 * Removes and returns the next char in the non encrypted queue.
 * 
 * @return
 */
public char removePlain() {
    // TODO Need to fix what happens when the queue is 0. Right now it just
    // returns a char that is 0. This needs to be blocked somehow.

    char c = 0;

    if (qPlain.size() > 0) {
        aquireLock(encryptedSem);
        System.out.println("removePlain has lock");
        c = qPlain.remove();
        realeseLock(encryptedSem);
    } else {
        System.out.println("REMOVEPLAIN CALLED WHEN qPlain IS EMPTY");
    }
    return c;
}

/**
 * Adds a character to the queue containing the encrypted chars.
 * 
 * @param c
 */
public void addEncrypt(char c) {
    if (!startedRead) {
        startedRead = true;
        decryptedSem = new Semaphore(1);
    }

    if (qEncrypt.size() < CAPACITY) {
        aquireLock(decryptedSem);
        System.out.println("addEncrypt has lock");
        qEncrypt.add(c);
        realeseLock(decryptedSem);
    }

}

/**
 * Removes and returns the next char in the encrypted queue.
 * 
 * @return
 */
public char removeEncrypt() {
    char c = 0;
    if (qEncrypt.size() > 0) {
        aquireLock(decryptedSem);
        System.out.println("removeEncrypt has lock");
        c = qEncrypt.remove();
        realeseLock(decryptedSem);

    }
    return c;
}

/**
 * Aquries lock on the given semaphore.
 * 
 * @param sem
 */
private void aquireLock(Semaphore sem) {
    try {
        sem.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

/**
 * Realeses lock on the given semaphore.
 * 
 * @param sem
 */
private void realeseLock(Semaphore sem) {
    sem.release();
}

}

kallnall
  • 21
  • 2
  • There always are many ways of doing "stuff". E.g. I would ditch any synchronization of my own here, and rely on blocking queues. But the point is, I guess, find a way to use semaphores to protect access to the queues (and in the process you'll gain a finer grained locking model : here, your first reader can not put in queue1 while your final reader reads from queue2, this limitation could be overcome). Please also make your instance variables (esp queues) final instances otherwise you may have subtle bugs depending on the order of instanciation of you Buffer class vs. the threads. – GPI Nov 29 '16 at 12:25
  • 1
    Take a look into JavaDoc of java.util.concurrent.locks.ReentrantReadWriteLock - it has a good example and basically you don't have to change that much. – brummfondel Nov 29 '16 at 12:41
  • I updated my post where I tried out semaphores. – kallnall Nov 29 '16 at 15:57

1 Answers1

1

OK, so trying to adress your concerns, without doing your homework :-)

About your first sample

At first sight, this is a working sample. You are using a form of mutual exclusion through the synchronized keyword, which allows you to use this.wait/notify correctly. This also provides safeguards seeing every thread synchronizes on the same monitor, which provides adequate happen-before safety.

In other words, thanks to this single monitor, you are assured that anything under the synchronized methods is executed exclusively and that these methods side-effects are visible inside the other methods.

Only minor gripe is that your queues are not final, which according to safe object publication guidelines and depending on how your whole system/threads is bootstraped, might lead to visibility issues. Rule of thumb in multithreaded code (and maybe even generally) : whatever can be made final should be.

The real problem with your code is that it does not fulfill your requirements : use semaphores.

About your second sample

Unsafe boolean mutation

This one has real issues. First, your startedWrite/ startedRead booleans : you mutate them (change their true/false value) outside of any synchronization (lock, semaphores, syncrhonized, ... nothing at all). This is unsafe, under the java memory model it would be legal for a thread that has not performed the mutation to not see the mutated value. Put it another way, the first write could set startedWrite to true, and it could be that all other threads never see that true value.

Some discussions on this : - https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html - Java's happens-before and synchronization

So anything that relies on these booleans is inherently flawed in your sample. That is, your Semaphore assignments, for one thing.

Several ways to correct this :

  • Always mutate shared state under a synchonization tool of some sort (in your first sample, it was the synchronized keyword, and here it could be your semaphores), and make sure that the same tool is used by all threads mutating or accessing the variable
  • Or use a concurrently safe type, like AtomicBoolean is this case, which has concurrency guarantees that any mutation is made visible to other threads

Race conditions

Another issue with your second code sample, is that you check the sizes of your queues before taking a lock and modifiying them, that is :

if (qPlain.size() > 0) {
    aquireLock(encryptedSem);
    ...
    c = qPlain.remove();
    realeseLock(encryptedSem);
} else {
    System.out.println("REMOVEPLAIN CALLED WHEN qPlain IS EMPTY");
}

Two concurrent threads could perform the check at the first line at the same time, and behave wrongly. Typical scenario is :

  1. qplain has a size of 1
  2. Thread 1 arrives at the if, checks that qplain is not empty, the check succeeds, then thread 1 is paused by the OS scheduler right here and now
  3. Thread 2 arrives at the same if and the same check succeeds for the same reason
  4. Thread 1 and Thread 2 resume from there on, both think they are allowed to take 1 element out of qplain which is wrong, because qplain has a size of 1 actually.

It will fail. You should have had a mutual exclusion of some sort. You can not (rule of thumb again) perform a check and after it perform a mutation under a lock. Both the check and the mutation should happen in, broadly speaking, the same lock. (Or you are a very advanced multithreading kind of guy and you know optimistic locking and stuf like that really well).

Possible deadlock

Another rule of thumb: any time you acquire and release a lock and/or a resource at the same call site, you should have a try/finally pattern.
That is, no matter how it is done, your code should always look like

acuquireSemaphore();
try { 
    // do something
} finally {
    releaseSemaphore();
}

Same goes for locks, input or output streams, sockets, ... Failure to do so may lead to your semaphore being acquired but never released, especially in case of an uncaught exception. So do use try and finally around your resources.

Conclusions

With such serious flaws, I did not really read your code to see if the "spirit" of it works. Maybe it does, but at this point, it's not worth it to check it out further.

Going forward with your assignment

You are asked to use two tools : Semaphores and mutual exclusion (e.g. synchonized, or Lock I guess). Both are not exactly the same thing!

You probablye get mutual exclusions, as your first sample showed. Probably not Semaphores yet. The point of semaphores, is that they (safely) manage a number of "permits". One (a thread) can ask for a permit, and if the semaphore has one available and grants it, one can proceed with one's work. Otherwise, one is put in a "holding pattern" (a wait) untill a permit is available. At some point, one* is expected to give the permit back to the Semaphore, for others to use.

(*Please note : it is not mandatory for a semaphore to work that threads performing permit acquisition are the one to perform permit release. It is part of what make a lock and a semaphore so different, and it's a good thing).

Let's start simple : a Semaphore that only has one permit can be used as a mutual exclusion. Bonus point : it can be released by another thread than the one that acquired it. That makes it ideal for message passing between threads : they can exchange permits.

What does it remember us of ? Of wait / notify of course!

A possible path to a solution

So we have a semaphore, and it has a number of permits. What could the meaning of this number be ? A natural candidate is : have a Semaphore hold the number of elements inside the queues. At first, this could be zero.

  • Each time somebody puts an element in the queue, it raises the number of permits by one.
  • Each time somebody takes an element off the queue, it lowers the number of permits.
  • Then : trying to take an element off an empty queue means trying to acquire a permit from an empty semaphore, it will automatically block the caller. This seems to be what you want.

But!

  1. We're yet to have a definition for "putting an element on top of a full queue". That is because semaphores are not bounded in permits. One can start with an empty semaphore and call "release" a thousand times, and end up with a 1000 permits available. We wil blow our maximal capacity without any kind of bounds.
  2. Let's say we have a workaround for that, we're still not done : we did not make sure at this point that readers and writers do not modify the queue at the same time. And this is crucial for correctneess !

So we need other ideas.

Well issue #2 is easy : we are allowed to use exclusive locks for this exercie, so we'll use them. Just make sure that any manipulation to the list itself is under a synchonized block using the same monitor.

Issue number one... Well, we have a Semaphore representing a "not empty" condition. That's one of the two pairs of wait/notify you had in your first sample. OK cool, let's make another Semaphore representing a "not full" condition, the other wait/notifyPair of your code sample !

So recap : use a semaphore for each couple of wait/notify in your original sample. Keep a mutual exclusion to actually modify the contents of the queue object. And be very carreful of the articulation of the mutual exclusion part with the semaphores, it is the crux of the matter.

And I'll stop there to let you walk down this path if you want.

Bonus point

You should not have to code the same thing twice here. In your samples, you coded twice the same logic (one for the "clear text", once for the "encrypted"), basically, wait for (at least) a spot before stacking a char, and wait for the presence of (at least) a char before popping a it.

This should be one and the same code / methods. Do it once, and you'll get it right (or wrong of course) at all times. Write it twice, you double the chance of mistakes.

Future thoughts

  1. This is all still very complex, for something that could be done using a `BlockingQueuè but then again, homeworks do have another purpose :-).
  2. A bit more complex, but this message passing pattern of signaling having a thread waiting for a "notEmpty" signal, while the other waits on a "notFull" signal is the exact use case of the JDK Condition object, which mimicks the use of wait/notify.
Community
  • 1
  • 1
GPI
  • 9,088
  • 2
  • 31
  • 38