0

I'm trying to implement a Producer Consumer problem in java. I'm using a circular buffer (circular array) to for the Producer to insert items into the buffer. Following is my code:

import java.util.*;
import java.io.*;

public class Buffer
{
    String a[];
    int front, rear;

    public Buffer(int size)
    {
            a = new String[size];
            front = rear = -1;
    }

    public boolean insert(String dataitem)
    {
            int p;
            p = (rear+1) % a.length;

            if(p==front)
            {
                    System.out.println("Buffer full");
                    return false;
            }
            else
            {       rear = p;
                    a[rear] = dataitem;
                    if(front == -1)
                            front = 0;
                    return true;
            }
    }

    public boolean empty()
    {
            if(front == -1)
                    return true;
            else
                    return false;
    }

    public String delete()
    {
            String result = a[front];
            if(front == rear)
                    front = rear = -1;
            else
                    front = (front +1)%a.length;
            return result;
    }

    public void display()
    {
            if(front == -1)
                    System.out.println("Buffer empty");
else
            {
                    System.out.println("Buffer elements are:");
                    int i= front;

                    while(i!= rear)
                    {
                            System.out.println(a[i]);
                            i = (i+1)%a.length;
                    }
                    System.out.println(a[i]);
            }
    }


    public static void main(String[] args)
    {
            int size = Integer.parseInt(args[0]);

            Buffer b = new Buffer(size);
            int ch;
            String dataitem, msg;

            Thread prod = new Thread(new Producer(b, size));
            Thread cons = new Thread(new Consumer(b, size));

            prod.start();
            cons.start();
    }

}

class Producer extends Thread
{
    Buffer b;
    int size;

    public Producer(Buffer b, int size)
    {
            this.b = b;
            this.size = size;
    }

    public void run()
    {
            while(true)
            {
                    synchronized(b)
                    {
                    for(int i = 1; i <= size; i++)
                    {
                            try
                            {       String dataitem = Thread.currentThread().getId()+"_"+i;
                                    boolean bool = b.insert(dataitem);
                                    //b.notifyAll();
if(bool)
                                            System.out.println("Successfully inserted "+dataitem);
                                    b.notifyAll();
                                    Thread.sleep(2000);
                            }
                            catch(Exception e)
                            {       e.printStackTrace();
                            }
                    }

            }
            }
    }
}

class Consumer extends Thread
{
    Buffer b;
    int size;

    public Consumer(Buffer b, int size)
    {
            this.b = b;
            this.size = size;
    }

    public void run()
    {
            while(b.empty())
            {
                    synchronized(b)
                    {
                            try
                            {
                                    System.out.println("Buffer empty");
                                    b.wait();
                            }
                            catch(Exception e)
                            {       e.printStackTrace();
                            }
                    }
            }

            synchronized(b)
            {
                    b.notifyAll();
                    String dataitem = b.delete();
                    System.out.println("Removed "+dataitem);
            }

    }
}

The producer is inserting dataitems into the buffer successfully. But they aren't being consumed by the consumer.

I get the following output when I execute the program.

Successfully inserted 11_1
Successfully inserted 11_2
Buffer full
Buffer full
Buffer full
Buffer full
Buffer full
Buffer full

My question is how do I get the consumer to consume items from the buffer?

halfer
  • 19,824
  • 17
  • 99
  • 186
user2201650
  • 527
  • 7
  • 13
  • 28

1 Answers1

1

The major problem is that the synchronized block in your Producer is too wide. It is never letting the Consumer acquire the lock

Start by narrowing the scope, for example...

while (true) {
    for (int i = 1; i <= size; i++) {
        try {
            String dataitem = Thread.currentThread().getId() + "_" + i;
            boolean bool = b.insert(dataitem);
            //b.notifyAll();
            if (bool) {
                System.out.println("Successfully inserted " + dataitem);
            }
            synchronized (b) {
                b.notifyAll();
            }
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

You may also consider synchronizing the ing insert and delete methods themselves. I personally would be tempted to use a internal lock, but you could simply synchronize the methods themselves, for example...

public synchronized boolean insert(String dataitem) {...}

public synchronized String delete() {...}

As it stands, your Consumer will only ever read a single value from the buffer, but I'll let you figure that one out ;)

As a side note, I might put the wait and notify functionality directly within the Buffer, so that whenever you try and delete a value, it will wait, within the delete method for the Buffer to be not empty and allow the insert method to make the notifications itself...but that's me ;)

Equally, I might consider blocking the insert method until there is more room, but that will come down to how you want to implement it :P

Updated

Very basically, this will start giving the results you are looking for...

public class ProducerConsumer {

    public static void main(String[] args) {
        new ProducerConsumer();
    }

    public ProducerConsumer() {
        int size = 5;

        Buffer b = new Buffer(size);

        Thread prod = new Thread(new Producer(b, size));
        Thread cons = new Thread(new Consumer(b, size));

        prod.start();
        cons.start();
    }

    public class Buffer {

        String a[];
        int front, rear;

        public Buffer(int size) {
            a = new String[size];
            front = rear = -1;
        }

        public synchronized boolean insert(String dataitem) {
            int p;
            p = (rear + 1) % a.length;

            if (p == front) {
                System.out.println("Buffer full");
                return false;
            } else {
                rear = p;
                a[rear] = dataitem;
                if (front == -1) {
                    front = 0;
                }
                return true;
            }
        }

        public boolean empty() {
            return front == -1;
        }

        public synchronized String delete() {
            String result = a[front];
            if (front == rear) {
                front = rear = -1;
            } else {
                front = (front + 1) % a.length;
            }
            return result;
        }

        public void display() {
            if (front == -1) {
                System.out.println("Buffer empty");
            } else {
                System.out.println("Buffer elements are:");
                int i = front;

                while (i != rear) {
                    System.out.println(a[i]);
                    i = (i + 1) % a.length;
                }
                System.out.println(a[i]);
            }
        }

    }

    class Producer extends Thread {

        Buffer b;
        int size;

        public Producer(Buffer b, int size) {
            this.b = b;
            this.size = size;
        }

        public void run() {
            int i = 0;
            while (true) {
                try {
                    String dataitem = Thread.currentThread().getId() + "_" + ++i;
                    boolean bool = b.insert(dataitem);
                    if (bool) {
                        System.out.println("Successfully inserted " + dataitem);
                    }
                    synchronized (b) {
                        b.notifyAll();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }

    class Consumer extends Thread {

        Buffer b;
        int size;

        public Consumer(Buffer b, int size) {
            this.b = b;
            this.size = size;
        }

        public void run() {

            while (true) {

                while (b.empty()) {
                    synchronized (b) {
                        try {
                            System.out.println("Buffer empty");
                            b.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }

                String dataitem = null;
                synchronized (b) {
                    dataitem = b.delete();
                }
                    System.out.println("Removed " + dataitem);
            }
        }
    }
}
MadProgrammer
  • 343,457
  • 22
  • 230
  • 366
  • After synchronizing the methods and that block in the Producer class, it works fine. But when I enter the Buffer size as 2, I get the following output: " Successfully inserted 11_1 Removed 11_1 Successfully inserted 11_2 Successfully inserted 11_1 Buffer full Buffer full " – user2201650 Nov 06 '13 at 02:11
  • The `Producer` has two loops, a `while(true)` and `for-loop`. The `while` loop is forcing the `for-loop` to repeat indefinitely. You may consider removing the outer `while-loop` which will stop the thread after it has completed adding the required number of elements – MadProgrammer Nov 06 '13 at 02:19
  • But I'm supposed to continuously add elements into the buffer and simultaneously read from the buffer. – user2201650 Nov 06 '13 at 02:24
  • If I remove the `while(true)`, i get the following output: ` Buffer empty Successfully inserted 11_1 Removed 11_1 Successfully inserted 11_2 ` – user2201650 Nov 06 '13 at 02:25
  • Then start by `dropping` the `for-loop`, the `while-loop` will take care of it (in the `Produce`) and add a `while(true) {...}` around the contents of the `Consumer` – MadProgrammer Nov 06 '13 at 02:25
  • But I have to increment each data item. It should be like 11_1, 11_2, 11_3 and so on in the buffer. That's why I used the `for-loop` in the `Producer`. – user2201650 Nov 06 '13 at 02:31
  • So, keep a counter in the `while` loop and increment on each loop. The `size`, as I understand, is the size of the `Buffer`, not the loops... – MadProgrammer Nov 06 '13 at 02:33
  • Yup, the `size` is the size of the `Buffer`. What do you mean by keeping a counter in the `while` loop and incrementing it on each loop? – user2201650 Nov 06 '13 at 02:34
  • `int counter = 0; while(true) { count++;...}` – MadProgrammer Nov 06 '13 at 02:35
  • I still keep getting the `Buffer full` as output. The consumer is not consuming the data items. :( – user2201650 Nov 06 '13 at 03:11
  • I took the `Thread.sleep` out of the `Producer`, you could put one back in after the `b.notifyAll` (outside of the synchronized block) which should give the consumer time to run). You could also use `Thread.yield();`, but `Thread.sleep` might give you better results – MadProgrammer Nov 06 '13 at 03:20
  • call the b.insert also inside the synchronized(b) block. The update to the front variables should be synchronized between the insert and delete calls. – Raji Nov 06 '13 at 03:20
  • @Raji That's why the method is `synchronized`, but that may give better results...I'd prefer it if the `Buffer` was more thread safe :P – MadProgrammer Nov 06 '13 at 03:22
  • @MadProgrammer check this http://stackoverflow.com/questions/3047564/java-synchronized-method-lock-on-object-or-method ... "Summary : Locking is only at synchronized method level and object's instance variables can be accessed by other thread" – Raji Nov 06 '13 at 03:27
  • @Raji As I understand it the method synchronisation is locking on the object instance, so that with `remove` and `insert` being `synchronised`, when one thread is in `remove`, no other thread can gain access to `remove` or `insert` - yes, there are other methods that aren't in this example, but this is simple start... – MadProgrammer Nov 06 '13 at 03:31