I need to create a dynamic number(I get it from command line) of threads concurrently that will be Consumers and another thread that will be Producer.
Those threads have shared buffer that the Producer writes to it random numbers 10 times, but I must be sure that in the next time that he will write, it will be only if, all the consumers are read the data that the producer wrote and any consumer read it only once.
The consumers report the current value that they read.
I used a boolean array at size the number I get, so for first time that consumer read then I change his value in the array to true.
When I run it I get an java.lang.IllegalMonitorStateException
and I don't understand why...
The producer don't writes to the buffer at all.
Thanks.
Buffer.java
public interface Buffer
{
public void set(int value) throws InterruptedException;
public int get() throws InterruptedException;
public boolean areAllTrue();
public void markedRead(int index);
public boolean checkInedx(int index);
public void setAllFalse();
}
SynchronizedBuffer.java
import java.util.Arrays;
// Fig. 23.18: SynchronizedBuffer.java
// Synchronizing access to shared data using Object
// methods wait and notify.
public class SynchronizedBuffer implements Buffer
{
private int buffer = -1; // shared by producer and consumer threads
private boolean occupied = false; // whether the buffer is occupied
private boolean[] flags;
public SynchronizedBuffer(int n)
{
this.flags = new boolean[n];
}
public synchronized void markedRead(int index)
{
this.flags[index] = true;
}
public synchronized boolean checkInedx(int index)
{
return flags[index];
}
public synchronized boolean areAllTrue()
{
for (boolean b : flags)
if (!b)
return false;
return true;
}
public synchronized void setAllFalse()
{
Arrays.fill(flags, false);
}
// place value into buffer
public synchronized void set(int value) throws InterruptedException
{
// while there are no empty locations, place thread in waiting state
while (occupied)
{
// output thread information and buffer information, then wait
System.out.println("Producer tries to write.");
displayState("Buffer full. Producer waits.");
wait();
} // end while
buffer = value; // set new buffer value
// indicate producer cannot store another value
// until consumer retrieves current buffer value
occupied = true;
displayState("Producer writes " + buffer);
notifyAll(); // tell waiting thread(s) to enter runnable state
} // end method set; releases lock on SynchronizedBuffer
// return value from buffer
public synchronized int get() throws InterruptedException
{
// while no data to read, place thread in waiting state
while (!occupied)
{
// output thread information and buffer information, then wait
System.out.println("Consumer tries to read.");
displayState("Buffer empty. Consumer waits.");
wait();
} // end while
// indicate that producer can store another value
// because consumer just retrieved buffer value
occupied = false;
displayState("Consumer reads " + buffer);
notifyAll(); // tell waiting thread(s) to enter runnable state
return buffer;
} // end method get; releases lock on SynchronizedBuffer
// display current operation and buffer state
public void displayState(String operation)
{
System.out.printf("%-40s%d\t\t%b\n\n", operation, buffer, occupied);
} // end method displayState
} // end class SynchronizedBuffer
Producer.java
// Fig. 23.12: Producer.java
// Producer with a run method that inserts the values 1 to 10 in buffer.
//import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class Producer implements Runnable
{
// private final static Random generator = new Random();
private final Buffer sharedLocation; // reference to shared object
private int count;
// constructor
public Producer(Buffer shared)
{
sharedLocation = shared;
} // end Producer constructor
// store randomly values from 1 to 100 in sharedLocation
public void run()
{
int sum = 0;
try
{
while (!sharedLocation.areAllTrue())
{
wait();
}
Thread.sleep(ThreadLocalRandom.current().nextLong(3000l));
count = ThreadLocalRandom.current().nextInt(1, 101);
sharedLocation.set(count); // set value in buffer
sharedLocation.setAllFalse();
notifyAll();
sum += count; // increment sum of values
System.out.printf("\t%2d\n", sum);
} // end try
catch (InterruptedException exception)
{
exception.printStackTrace();
} // end catch
System.out.println("Producer done producing\nTerminating Producer");
} // end method run
} // end class Producer
Consumer.java
import java.util.concurrent.ThreadLocalRandom;
// Fig. 23.13: Consumer.java
// Consumer with a run method that loops, reading 10 values from buffer.
public class Consumer implements Runnable
{
private final Buffer sharedLocation; // reference to shared object
private final int index;
// constructor
public Consumer(Buffer shared, int index)
{
sharedLocation = shared;
this.index = index;
} // end Consumer constructor
// read sharedLocation's value 10 times and sum the values
public void run()
{
int sum = 0;
for (int count = 1; count <= 10; count++)
{
// sleep 0 to 3 seconds, read value from buffer and add to sum
try
{
while (sharedLocation.checkInedx(index))
{
wait(); // wait until
}
Thread.sleep(ThreadLocalRandom.current().nextLong(3000l));
sum += sharedLocation.get();
sharedLocation.markedRead(this.index);
System.out.printf("\t\t\t%2d\n", sum);
notifyAll();
} // end try
catch (InterruptedException exception)
{
exception.printStackTrace();
} // end catch
} // end for
System.out.printf("\n%s %d\n%s\n", "Consumer read values totaling", sum, "Terminating Consumer");
} // end method run
} // end class Consumer
ConsumerNProducers.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerNProducers
{
public static void main(String[] args) throws InterruptedException
{
// create a newCachedThreadPool
ExecutorService application = Executors.newCachedThreadPool();
// create SynchronizedBuffer to store ints
Buffer sharedLocation = new SynchronizedBuffer(Integer.parseInt(args[0]));
System.out.printf("%-40s%s\t\t%s\n%-40s%s\n\n", "Operation", "Buffer", "Occupied", "---------", "------\t\t--------");
// execute the Producer and Consumer tasks
application.execute(new Producer(sharedLocation));
// Thread.sleep(5000);
for (int i = 0; i < Integer.parseInt(args[0]); i++)
{
application.execute(new Consumer(sharedLocation, i));
}
application.shutdown();
} // end main
}