25

From my main I am starting two threads called producer and consumer. Both contains while(true) loop. Producer loop is UDP Server hence it does not require sleep. My problem is in the Consumer loop. Consumer loop remove the objects from the linked queue and pass it on to a function for further processing. From what researched it is not a good practice to use thread sleep in a loop as at times O/S will not release at end of set time. If I remove thread sleep when the application is ideal it drags CPU to 20 to 30%.

class Producer implements Runnable {
    private DatagramSocket dsocket;
    FError fer = new FError();

    int port =1548;
    ConcurrentLinkedQueue<String> queue;

    Producer(ConcurrentLinkedQueue<String> queue){
        this.queue = queue; 
    }

    @Override
    public void run() {

        try {

            // Create a socket to listen on the port.
            dsocket = new DatagramSocket(port);
            // Create a buffer to read datagrams into.
            byte[] buffer = new byte[30000];
            // Create a packet to receive data into the buffer
            DatagramPacket packet = new DatagramPacket(buffer,
            buffer.length);

            while (true) {
                try {

                   // Wait to receive a datagram
                    dsocket.receive(packet);
                    //Convert the contents to a string,
                    String msg = new String(buffer, 0, packet.getLength());

                    int ltr = msg.length();
                     // System.out.println("MSG =" + msg);

                    if(ltr>4)
                    {

                        SimpleDateFormat sdfDate = new SimpleDateFormat  ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy

                        Date now = new Date();
                        String strDate = sdfDate.format(now);

                        //System.out.println(strDate);

                        queue.add(msg + "&" + strDate);

                     // System.out.println("MSG =" + msg);
                    }

                  // Reset the length of the packet before reusing it.
                   packet.setLength(buffer.length);

                } catch (IOException e) {
                    fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
                    dsocket.close();
                    break; 
                }
            }

        } catch (SocketException e) {
          fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage()); 

        }

    }

}

class Consumer implements Runnable {

    String str;  
    ConcurrentLinkedQueue<String> queue;

    Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;  
    }

    @Override
    public void run() {

        while (true) {
            try {

                while ((str = queue.poll()) != null) {

                    call(str);  // do further processing

                   }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {

                ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
            }

        }

    }

}
Svetlin Zarev
  • 14,713
  • 4
  • 53
  • 82
Jro
  • 466
  • 2
  • 6
  • 14
  • 3
    It might be good to read up on the various classes available to you in the java.util.concurrent package. – Michael Krause Jan 28 '19 at 00:11
  • 3
    A traditional produce/consumer pattern would make use of a monitor lock to block the consumer until the producer has something to offer, you could use a blocking queue to achieve the same result – MadProgrammer Jan 28 '19 at 00:17
  • 3
    Why not use [BlockingQueue](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html) – Scary Wombat Jan 28 '19 at 00:20
  • Thanks for all your answers. In my situation I can not use the Blocking Queue. producer will receive any time msg on the UDP Port. Consumer part has to remove one by one do further processing on the msg, which takes some time do, hence I can not use Blocking. – Jro Jan 28 '19 at 01:17
  • 1
    Related: [NetBeans / Java / New hint: Thread.sleep called in loop](https://stackoverflow.com/questions/3535754/netbeans-java-new-hint-thread-sleep-called-in-loop) – Ole V.V. Jan 28 '19 at 21:30

3 Answers3

27

Instead of making Consumer extend Runnable you could change your code to incorporate a ScheduledExecutorService which runs the polling of the queue every half a second instead of making the thread sleep. An example of this would be

public void schedule() {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    executor.scheduleAtFixedRate(() -> {
        String str;
        try {
            while ((str = queue.poll()) != null) {
                call(str);  // do further processing
            }
        } catch (IOException e) {
            ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
        }
    }, 0, 500, TimeUnit.MILLISECONDS);
}
SizableShrimp
  • 666
  • 7
  • 12
  • Thanks for the prompt reply and I am going try this and let you know - how it went. – Jro Jan 28 '19 at 00:17
11

The proper solution to your problem is to use a blocking queue. It gives you several advantages:

  • does not waste cpu busy waiting
  • can have limited capacity - imagine you have a fast producer, but a slow consumer -> if the queue is not limited in size, then your application can easily reach OutOfMemory condition

Here is a small demo, which you can play with:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProdConsTest {
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        final Runnable producer = () -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    System.out.println("Producing: " + i);
                    queue.put(i);

                    //Adjust production speed by modifying the sleep time
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
            }
        };

        final Runnable consumer = () -> {
            while (true) {
                final Integer integer;
                try {
                    //Uncomment to simulate slow consumer:
                    //Thread.sleep(1000);

                    integer = queue.take();
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
                System.out.println("Consumed: " + integer);
            }
        };


        final Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        final Thread producerThread = new Thread(producer);
        producerThread.start();

        producerThread.join();
        consumerThread.interrupt();
        consumerThread.join();
    }
}

Now uncomment the sleep() in the consumer and observe what happems with the application. If you were using a timer based solution such as the proposed ScheduledExecutorService or you were busy waiting, then with fast producer, the queue would grow uncontrollably and eventually crash your application

Svetlin Zarev
  • 14,713
  • 4
  • 53
  • 82
  • Thanks for your answer. I implemented the scheduledExectorService and it is working fine. As I mentioned producer listening to a UDP port and it can receive packets anytime and I can not use the blocking queue. The consumer is much faster than the producer. Call method hand over the object to multi thread class. Anyway I will try your method as well – Jro Apr 08 '19 at 01:57
1

Let the consumer wait() on an object that both have access to, and let the producer notify() listeners on this object when there's new messages. The consumer should remove all messages then, not just a single one like in the example.

ctrl-d
  • 392
  • 1
  • 8
  • 5
    This amounts to the same thing as a blocking queue(which is totally preferable to the answers advising to use a scheduled task). No point reinventing it. – Nathan Hughes Jan 28 '19 at 00:40