-1

I need to create a dynamic number(I get it from command line) of threads concurrently that will be Consumers and another thread that will be Producer.

Those threads have shared buffer that the Producer writes to it random numbers 10 times, but I must be sure that in the next time that he will write, it will be only if, all the consumers are read the data that the producer wrote and any consumer read it only once.

The consumers report the current value that they read.

I used a boolean array at size the number I get, so for first time that consumer read then I change his value in the array to true.

When I run it I get an java.lang.IllegalMonitorStateException and I don't understand why...

The producer don't writes to the buffer at all.

Thanks.

Buffer.java

public interface Buffer
{
    public void set(int value) throws InterruptedException;

    public int get() throws InterruptedException;

    public boolean areAllTrue();

    public void markedRead(int index);

    public boolean checkInedx(int index);

    public void setAllFalse();
}

SynchronizedBuffer.java

import java.util.Arrays;

// Fig. 23.18: SynchronizedBuffer.java
// Synchronizing access to shared data using Object 
// methods wait and notify.
public class SynchronizedBuffer implements Buffer
{
    private int buffer = -1; // shared by producer and consumer threads
    private boolean occupied = false; // whether the buffer is occupied
    private boolean[] flags;

    public SynchronizedBuffer(int n)
    {
        this.flags = new boolean[n];
    }

    public synchronized void markedRead(int index)
    {
        this.flags[index] = true;
    }

    public synchronized boolean checkInedx(int index)
    {
        return flags[index];
    }

    public synchronized boolean areAllTrue()
    {
        for (boolean b : flags)
            if (!b)
                return false;
        return true;
    }

    public synchronized void setAllFalse()
    {
        Arrays.fill(flags, false);
    }

    // place value into buffer
    public synchronized void set(int value) throws InterruptedException
    {
        // while there are no empty locations, place thread in waiting state
        while (occupied)
        {
            // output thread information and buffer information, then wait
            System.out.println("Producer tries to write.");
            displayState("Buffer full. Producer waits.");
            wait();
        } // end while

        buffer = value; // set new buffer value

        // indicate producer cannot store another value
        // until consumer retrieves current buffer value
        occupied = true;

        displayState("Producer writes " + buffer);

        notifyAll(); // tell waiting thread(s) to enter runnable state
    } // end method set; releases lock on SynchronizedBuffer

    // return value from buffer
    public synchronized int get() throws InterruptedException
    {
        // while no data to read, place thread in waiting state
        while (!occupied)
        {
            // output thread information and buffer information, then wait
            System.out.println("Consumer tries to read.");
            displayState("Buffer empty. Consumer waits.");
            wait();
        } // end while
            // indicate that producer can store another value
            // because consumer just retrieved buffer value
        occupied = false;

        displayState("Consumer reads " + buffer);

        notifyAll(); // tell waiting thread(s) to enter runnable state

        return buffer;
    } // end method get; releases lock on SynchronizedBuffer

    // display current operation and buffer state
    public void displayState(String operation)
    {
        System.out.printf("%-40s%d\t\t%b\n\n", operation, buffer, occupied);
    } // end method displayState
} // end class SynchronizedBuffer

Producer.java

// Fig. 23.12: Producer.java
// Producer with a run method that inserts the values 1 to 10 in buffer.
//import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable
{
    // private final static Random generator = new Random();
    private final Buffer sharedLocation; // reference to shared object
    private int count;

    // constructor
    public Producer(Buffer shared)
    {
        sharedLocation = shared;
    } // end Producer constructor

    // store randomly values from 1 to 100 in sharedLocation
    public void run()
    {
        int sum = 0;
        try
        {
            while (!sharedLocation.areAllTrue())
            {
                wait();
            }
            Thread.sleep(ThreadLocalRandom.current().nextLong(3000l));
            count = ThreadLocalRandom.current().nextInt(1, 101);
            sharedLocation.set(count); // set value in buffer
            sharedLocation.setAllFalse();
            notifyAll();
            sum += count; // increment sum of values
            System.out.printf("\t%2d\n", sum);
        } // end try


        catch (InterruptedException exception)
        {
            exception.printStackTrace();
        } // end catch

        System.out.println("Producer done producing\nTerminating Producer");
    } // end method run
} // end class Producer

Consumer.java

import java.util.concurrent.ThreadLocalRandom;

// Fig. 23.13: Consumer.java
// Consumer with a run method that loops, reading 10 values from buffer.

public class Consumer implements Runnable
{
    private final Buffer sharedLocation; // reference to shared object
    private final int index;

    // constructor
    public Consumer(Buffer shared, int index)
    {
        sharedLocation = shared;
        this.index = index;
    } // end Consumer constructor

    // read sharedLocation's value 10 times and sum the values
    public void run()
    {
        int sum = 0;
        for (int count = 1; count <= 10; count++)
        {
            // sleep 0 to 3 seconds, read value from buffer and add to sum
            try
            {
                while (sharedLocation.checkInedx(index))
                {
                    wait(); // wait until 
                }
                Thread.sleep(ThreadLocalRandom.current().nextLong(3000l));
                sum += sharedLocation.get();
                sharedLocation.markedRead(this.index);
                System.out.printf("\t\t\t%2d\n", sum);
                notifyAll();
            } // end try

            catch (InterruptedException exception)
            {
                exception.printStackTrace();
            } // end catch
        } // end for

        System.out.printf("\n%s %d\n%s\n", "Consumer read values totaling", sum, "Terminating Consumer");
    } // end method run
} // end class Consumer

ConsumerNProducers.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerNProducers
{

    public static void main(String[] args) throws InterruptedException
    {
        // create a newCachedThreadPool
        ExecutorService application = Executors.newCachedThreadPool();

        // create SynchronizedBuffer to store ints
        Buffer sharedLocation = new SynchronizedBuffer(Integer.parseInt(args[0]));

        System.out.printf("%-40s%s\t\t%s\n%-40s%s\n\n", "Operation", "Buffer", "Occupied", "---------", "------\t\t--------");

        // execute the Producer and Consumer tasks
        application.execute(new Producer(sharedLocation));
//      Thread.sleep(5000);
        for (int i = 0; i < Integer.parseInt(args[0]); i++)
        {
            application.execute(new Consumer(sharedLocation, i));
        }
        application.shutdown();

    } // end main

}
Bugs
  • 4,491
  • 9
  • 32
  • 41
Asaf
  • 107
  • 1
  • 12

1 Answers1

0

Your producers are waiting for this condition to produce:

while (!sharedLocation.areAllTrue())

Thus, if all of your producers are waiting from the start, not all entries in sharedLocation are true and it seems like they are never initialized. I suggest changing your constructor so that you initialize all your values:

public SynchronizedBuffer(int n)
{
    this.flags = new boolean[n];
    Arrays.fill(flags, true);
}
influjensbahr
  • 3,978
  • 1
  • 12
  • 12