I have a producer and consumer that share a buffer. I want to be able to run, pause, continue running and stop the threads.
What I tried is to keep an Enum flag to indicate the state & every time I producer a new item, I check the state in if-else. If the state is running, I keep running, if it's waiting I make the thread waits. This works well giving the fact that the producer and consumer are always working (i.e the producer can put items into the buffer and consumer always can find items in the buffer). However once I get into the issue where one of the threads is waiting for each other because the buffer being full or empty, then the entire logic of the program gets screwed and I'm totally not able to solve it. I've been working on this for 4 days and still no hope. I would be really thankful if someone could help me through this. Thanks!
To run, pause, continue running and stopping the threads, I'm using a GUI. It puts the threads into waiting by pause() ... then once I want to run them again I wakeup() the threads and notify them. Same with the die().
EDIT: the problem with the logic so far is that when I click the button to continue, the inner states of the threads remain "WAITING" while they should be "RUNNING". That's why the GUI blocks.
Here's my code for the Producer:
public class GProducer2 implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private static Queue<Integer> stream = new LinkedList<Integer>();
static {
for ( int i = 0; i <= 1000; i++ ) {
stream.add(i);
}
}
public GProducer2( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
// innerState = ThreadState.RUNNING;
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the stream
*/
while( state != ThreadState.DYING ) {
if ( !stream.isEmpty() ) {
int item = stream.poll();
/*
* The second while loop is to not lose items if the
* thread has to wait, so it process the item when the thread
* is running again.
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//Check to see if buffer has free space
boolean freeBuffer = false;
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
//if it doesn't, then wait...
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
//check to see if the buffer has free space now
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
}
if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
synchronized (this) {
innerState = ThreadState.RUNNING;
}
//... continue with the stuff if you need
//...
//System.out.println(item);
//..then break
break;
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
//innerState = ThreadState.RUNNING;
}
}
}
}
}//when the stream is done.
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
if ( state == ThreadState.WAITING )
innerState = ThreadState.RUNNING;
else
innerState = ThreadState.DYING;
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
Here's the code of my Consumer:
public class GConsumer implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private List<Integer> holder;
public GConsumer( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
holder = new LinkedList<Integer>();
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the buffer
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//if the buffer has items then process them
boolean emptyBuffer = true;
synchronized (buffer) {
emptyBuffer = buffer.isEmpty();
}
if ( !emptyBuffer ) {
//Start doing your stuff
innerState = ThreadState.RUNNING;
int item;
synchronized (buffer) {
item = buffer.poll();
}
holder.add(item);
}
//otherwise the thread waits for the buffer to get items
else {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public synchronized List<Integer> getHolder() {
return holder;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
Here's the code of my GUI:
public class GController implements ActionListener, ItemListener {
ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
private volatile AppState appState = AppState.CLEAN_START;
private GProducer2 producer;
private GConsumer consumer;
//GUI stuff
static JToggleButton startBtn;
static JButton stopBtn;
static JButton showBtn;
public static void main(String[] args) {
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
createAndShowGUI();
}
});
}
public GController() {
// producer = new GProducer2(buffer);
// consumer = new GConsumer(buffer);
}
private static void createAndShowGUI() {
GController gController = new GController();
JFrame frame = new JFrame("GUI Concurrency");
frame.setPreferredSize( new Dimension(400, 200));
frame.setLayout( new FlowLayout() );
frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );
startBtn = new JToggleButton("Start");
startBtn.addItemListener(gController);
stopBtn = new JButton("Cancel");
stopBtn.setEnabled(false);
stopBtn.setActionCommand("Cancel");
stopBtn.addActionListener(gController);
showBtn = new JButton("Show");
showBtn.setActionCommand("Show");
showBtn.addActionListener(gController);
frame.getContentPane().add(startBtn);
frame.getContentPane().add(stopBtn);
frame.getContentPane().add(showBtn);
frame.pack();
frame.setVisible(true);
}
@Override
public void actionPerformed(ActionEvent e) {
String command = e.getActionCommand();
System.out.println(command + " is clicked");
if ( command.equals("Cancel") ) {
startBtn.setText("Start");
appState = AppState.CLEAN_START;
producer.die();
consumer.die();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//Block here until they are both dead;
consumer.getHolder().clear();
executor.shutdown();
}
else if ( command.equals("Show") ) {
for ( int i : consumer.getHolder() ) {
System.out.println("[" + i + "]");
}
System.out.println();
}
}
@Override
public void itemStateChanged(ItemEvent e) {
if ( e.getStateChange() == ItemEvent.SELECTED ) {
if ( appState == AppState.CLEAN_START) {
System.out.println("Start");
startBtn.setText("Pause");
appState = AppState.RUNNING;
executor = Executors.newCachedThreadPool();
producer = new GProducer2(buffer);
consumer = new GConsumer(buffer);
executor.execute( producer );
executor.execute( consumer );
executor.shutdown();
stopBtn.setEnabled(false);
}
//Now continue execution
else if ( appState == AppState.PAUSED ) {
System.out.println("Continue");
appState = AppState.RUNNING;
producer.wakeup();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
consumer.wakeup();
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//block the app here until they are really running
// while( !producer.isSynched() ) {
// }
while( !producer.isSynched() | !consumer.isSynched() ) {
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
}
startBtn.setText("Pause");
stopBtn.setEnabled(false);
}
}
else {
System.out.println("Pause");
startBtn.setText("Continue");
appState = AppState.PAUSED;
System.out.println("Before:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
producer.pause();
consumer.pause();
//Block the app here until they are really waiting
System.out.println("After:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
while( !producer.isSynched() | !consumer.isSynched() ) {
}
stopBtn.setEnabled(true);
}
}
}