2

I have a producer and consumer that share a buffer. I want to be able to run, pause, continue running and stop the threads.

What I tried is to keep an Enum flag to indicate the state & every time I producer a new item, I check the state in if-else. If the state is running, I keep running, if it's waiting I make the thread waits. This works well giving the fact that the producer and consumer are always working (i.e the producer can put items into the buffer and consumer always can find items in the buffer). However once I get into the issue where one of the threads is waiting for each other because the buffer being full or empty, then the entire logic of the program gets screwed and I'm totally not able to solve it. I've been working on this for 4 days and still no hope. I would be really thankful if someone could help me through this. Thanks!

To run, pause, continue running and stopping the threads, I'm using a GUI. It puts the threads into waiting by pause() ... then once I want to run them again I wakeup() the threads and notify them. Same with the die().

EDIT: the problem with the logic so far is that when I click the button to continue, the inner states of the threads remain "WAITING" while they should be "RUNNING". That's why the GUI blocks.

Here's my code for the Producer:

public class GProducer2 implements Runnable {


    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;

    private static Queue<Integer> stream = new LinkedList<Integer>();
    static {
        for ( int i = 0; i <= 1000; i++ ) {
            stream.add(i);
        }
    }

    public GProducer2( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
//      innerState = ThreadState.RUNNING;
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the stream
         */
        while( state != ThreadState.DYING ) {
            if ( !stream.isEmpty() ) {
                int item = stream.poll();
                /*
                 * The second while loop is to not lose items if the 
                 * thread has to wait, so it process the item when the thread
                 * is running again.
                 */
                while( state != ThreadState.DYING ) {

                    if ( state == ThreadState.RUNNING ) {
                        //Check to see if buffer has free space
                        boolean freeBuffer = false;
                        synchronized (buffer) {
                            freeBuffer = buffer.offer(item);
                        }

                        while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
                            //if it doesn't, then wait...
                            synchronized (this) {
                                try {
                                    innerState = ThreadState.WAITING;
                                    wait(100);
                                } catch (InterruptedException e) {
                                    //e.printStackTrace();
                                }
                            }
                            //check to see if the buffer has free space now
                            synchronized (buffer) {
                                freeBuffer = buffer.offer(item);
                            }
                        }

                        if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
                            synchronized (this) {
                                innerState = ThreadState.RUNNING;
                            }
                            //... continue with the stuff if you need
                            //...
                            //System.out.println(item);
                            //..then break
                            break;
                        }

                    }
                    else if ( state == ThreadState.WAITING ) {
                        synchronized (this) {
                            try {
                                innerState = ThreadState.WAITING;
                                wait();
                            } catch (InterruptedException e) {
                                //e.printStackTrace();
                                //innerState = ThreadState.RUNNING;
                            }
                        }
                    }
                }
            }//when the stream is done.
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                        if ( state == ThreadState.WAITING )
                            innerState = ThreadState.RUNNING;
                        else
                            innerState = ThreadState.DYING;
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }
    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }


    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }

}

Here's the code of my Consumer:

public class GConsumer implements Runnable {

    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;
    private List<Integer> holder;

    public GConsumer( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
        holder = new LinkedList<Integer>();
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the buffer
         */
        while( state != ThreadState.DYING ) {
            if ( state == ThreadState.RUNNING ) {
                //if the buffer has items then process them
                boolean emptyBuffer = true;
                synchronized (buffer) {
                    emptyBuffer = buffer.isEmpty();
                }

                if ( !emptyBuffer ) {
                    //Start doing your stuff
                    innerState = ThreadState.RUNNING;
                    int item;
                    synchronized (buffer) {
                        item = buffer.poll();
                    }

                    holder.add(item);

                }
                //otherwise the thread waits for the buffer to get items
                else {
                    synchronized (this) {
                        try {
                            innerState = ThreadState.WAITING;
                            wait(100);
                        } catch (InterruptedException e) {
                            //e.printStackTrace();
                        }
                    }
                }

            }
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }

    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }

    public synchronized List<Integer> getHolder() {
        return holder;
    }

    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }
}

Here's the code of my GUI:

public class GController implements ActionListener, ItemListener {

    ExecutorService executor = Executors.newCachedThreadPool();
    private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
    private volatile AppState appState = AppState.CLEAN_START;

    private GProducer2 producer;
    private GConsumer consumer;

    //GUI stuff
    static JToggleButton startBtn;
    static JButton stopBtn;
    static JButton showBtn;

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                createAndShowGUI();
            }
        });
    }

    public GController() {
//      producer = new GProducer2(buffer);
//      consumer = new GConsumer(buffer);
    }

    private static void createAndShowGUI() {
        GController gController = new GController();
        JFrame frame = new JFrame("GUI Concurrency");
        frame.setPreferredSize( new Dimension(400, 200));
        frame.setLayout( new FlowLayout() );
        frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );

        startBtn = new JToggleButton("Start");
        startBtn.addItemListener(gController);

        stopBtn = new JButton("Cancel");
        stopBtn.setEnabled(false);
        stopBtn.setActionCommand("Cancel");
        stopBtn.addActionListener(gController);

        showBtn = new JButton("Show");
        showBtn.setActionCommand("Show");
        showBtn.addActionListener(gController);

        frame.getContentPane().add(startBtn);
        frame.getContentPane().add(stopBtn);
        frame.getContentPane().add(showBtn);

        frame.pack();
        frame.setVisible(true);
    }


    @Override
    public void actionPerformed(ActionEvent e) {
        String command = e.getActionCommand();
        System.out.println(command + " is clicked");

        if ( command.equals("Cancel") ) {
            startBtn.setText("Start");
            appState = AppState.CLEAN_START;

            producer.die();
            consumer.die();

            synchronized (producer) {
                producer.notify();
            }
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );

            synchronized (consumer) {
                consumer.notify();
            }
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            //Block here until they are both dead;

            consumer.getHolder().clear();
            executor.shutdown();
        }
        else if ( command.equals("Show") ) {
            for ( int i : consumer.getHolder() ) {
                System.out.println("[" + i + "]");
            }
            System.out.println();
        }

    }

    @Override
    public void itemStateChanged(ItemEvent e) {
        if ( e.getStateChange() == ItemEvent.SELECTED ) {

            if ( appState == AppState.CLEAN_START) {
                System.out.println("Start");
                startBtn.setText("Pause");
                appState = AppState.RUNNING;
                executor = Executors.newCachedThreadPool();
                producer = new GProducer2(buffer);
                consumer = new GConsumer(buffer);
                executor.execute( producer );
                executor.execute( consumer );
                executor.shutdown();
                stopBtn.setEnabled(false);
            }
            //Now continue execution
            else if ( appState == AppState.PAUSED ) {
                System.out.println("Continue");
                appState = AppState.RUNNING;
                producer.wakeup();
                synchronized (producer) {
                    producer.notify();
                }

                System.out.println( "P:" + producer.getState()  );
                System.out.println( "P inner:" + producer.getInnerState()  );

                consumer.wakeup();
                synchronized (consumer) {
                    consumer.notify();
                }

                System.out.println( "C:" + consumer.getState()  );
                System.out.println( "C inner:" + consumer.getInnerState()  );

                //block the app here until they are really running
//              while( !producer.isSynched() ) {
//              }

                while( !producer.isSynched() | !consumer.isSynched() ) {
                    System.out.println( "P:" + producer.getState()  );
                    System.out.println( "P inner:" + producer.getInnerState()  );

                    System.out.println( "C:" + consumer.getState()  );
                    System.out.println( "C inner:" + consumer.getInnerState()  );
                }

                startBtn.setText("Pause");
                stopBtn.setEnabled(false);
            }
        }
        else {
            System.out.println("Pause");
            startBtn.setText("Continue");
            appState = AppState.PAUSED;

            System.out.println("Before:");

            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            producer.pause();
            consumer.pause();

            //Block the app here until they are really waiting
            System.out.println("After:");
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );
            while( !producer.isSynched() | !consumer.isSynched() ) {
            }
            stopBtn.setEnabled(true);
        }
    }

}
Jack Twain
  • 6,273
  • 15
  • 67
  • 107

1 Answers1

0

There is way too much code here, but I can see the synchronization primitives are definitely not used correctly (like not checking the condition after wakeup, declaring a field volatile and also guarding it with synchronized etc).

Synchronization primitives are notoriously difficult to use properly, and if I were you, I'd use higher level constructs rather than trying to fix the current approach.

I'm not sure what you are trying to accomplish, but I gather you want to "run", "pause" and "stop" the processing. To achieve "run", use a simple, standard producer consumer construct with a bounded queue (see this example). To achieve "pause", have your tasks reference a shared Semaphore and inside your task call acquire on this Semaphore like so:

final Semaphore signal = new Semaphore(1);
final ExecutionService service = // create one with a bounded queue
                                 // and a CallerRuns policy

void submit(final Callable<T> work){
     Callable<T> wrapped = new Callable<T>(){
         public T call() throws Exception(){
             signal.acquire();
             return work.call();
         }
     };
     service.submit(wrapped);
}

*: The code is for illustrative purpose and isn't meant compile; but you'll get the idea

Then you can stop/restart processing by calling drainPermits/release on the signal. In order to "stop", simply call shutdown on ExecutionService

Community
  • 1
  • 1
Enno Shioji
  • 26,542
  • 13
  • 70
  • 109
  • thank you *so much*, but I really just need a quick fix here. Because learning other stuff now could take more time to re-write everything. – Jack Twain Oct 05 '13 at 14:58
  • @AlexTwain: I hope I won't come across like a dick, but I really think your current code is beyond repair. Having a "state" and "innerState", a method called "isSynced", mutating state on InterruptedException & continuing, mixing volatile and synchronized , not checking condition on wakeup are all quite serious code smell/bugs. Badly written threaded code is really hard to test and debug and can continue to cause pain after deploy. But of course it's your call... – Enno Shioji Oct 05 '13 at 15:32