2

I'm learning about threads and synchronization and I'm trying to set up a "phases" system, in which the first phase generates a message(an int for convenience), then passes it on to phase 2, which modifies it(multiplies it by 2), and then passes it on to the last phase which further modifies it and prints it out to console. The problem is that the third phase never gets to run, even though it receives the messages.

In my example I set an array of semaphores called "resources", in which semaphore A(index 0) has 4 permits, semaphore B has 3, and C has 2. All are set to be fair.

I tried setting the Semaphores to be fair, but that didn't fix my issue. I also tried changing the sleep timings but I had no luck.


class Fase1 extends Thread{
    private int i = 0;
    private Semaphore[] resources;
    private Fase2 recipient;

    public Fase1(Semaphore[] res, Fase2 fase2){
        recipient=fase2;
        resources=res;
    }

    @Override
    public void run(){
        try{
            while(true){
                resources[0].acquire(2);
                resources[1].acquire(2);
                recipient.receiveMessage(i);
                i++;
                sleep(200);
                resources[1].release(2);
                resources[0].release(2);
            }
        } catch (InterruptedException e){
        }
    }
}

class Fase2 extends Thread{
    private Semaphore[] resources;
    private Fase3 recipient;
    private boolean receivedMessage = false;
    private int message = 0;

    public Fase2(Semaphore[] res, Fase3 fase3){
        recipient=fase3;
        resources=res;
    }

    @Override
    public void run(){
        try{
            while(true){
                if(receivedMessage){
                    resources[0].acquire(2);
                    resources[1].acquire(2);
                    resources[2].acquire(2);
                    recipient.receiveMessage(message*2);
                    receivedMessage = false;
                    sleep(200);
                    resources[2].release(2);
                    resources[1].release(2);
                    resources[0].release(2);
                }
            }
        } catch (InterruptedException e){
        }
    }
    public void receiveMessage(int msg){
        message = msg;
        receivedMessage = true;
    }
}

class Fase3 extends Thread{
    private Semaphore[] resources;
    private boolean receivedMessage = false;
    private int message = 0;

    public Fase3(Semaphore[] res){
        resources=res;
    }

    @Override
    public void run(){
        try{
            while(true){
                if(receivedMessage){
                    resources[1].acquire(2);
                    resources[2].acquire(2);
                    System.out.println(message+1);
                    receivedMessage = false;
                    sleep(200);
                    resources[2].release(2);
                    resources[1].release(2);
                }
            }
        } catch (InterruptedException e){
        }
    }
    public void receiveMessage(int msg){
        message = msg;
        receivedMessage = true;
    }
}

I noticed that the permits are somehow getting screwed up, it's almost like some of the threads are not releasing them properly, even though it seems right to me.

Federico Chiesa
  • 105
  • 1
  • 2
  • 10
  • How are you initializing everything ? Also, is there a reason you're using 3 semaphores and acquiring permits 2 by 2 ? You only have 2 resources to protect. – ttzn Jul 09 '19 at 14:18
  • I initialized the semaphores as I described, and then created one object for each class, initialized with the resources array and the object on which to call the receiveMessage function(so phase2 object to phase1 and phase3 object to phase2. – Federico Chiesa Jul 09 '19 at 14:25
  • My bad, somehow missed that part. – ttzn Jul 09 '19 at 14:42

1 Answers1

1

There is a fundamental flaw in your design : you are synchronizing access to the message resource, but not to the receivedMessage flag. When you set the flag to true in thread #2, the JVM has no obligation to propagate that write to thread #3 since that thread does not perform synchronization until it's inside the if block, which may very well never happen. The same holds for communication between thread #1 and #2.

The acquire() and release() operations act as synchronization points and will make writes visible across threads, so you need to call them before checking the flags. For instance in Fase3 :

    @Override
    public void run(){
        try{
            while(true){
                resources[1].acquire(2); // All writes by thread #2 are now visible
                if(receivedMessage){
                    resources[2].acquire(2);
                    System.out.println(message+1);
                    sleep(200);
                    receivedMessage = false;
                    resources[2].release(2);
                }
                resources[1].release(2);
            }
        } catch (InterruptedException e){
        }
    }

There is yet another solution, which is to make the receivedMessage flags volatile, but it's cleaner to use a single lock mechanism properly.

As a side note, it's best to use Runnables instead of extending Thread.

ttzn
  • 2,543
  • 22
  • 26