4

I'm using a service that reads messages from Kafka and pushes it into Cassandra.

I'm using a threaded architecture for the same.

There are say, k threads consuming from Kafka topic. These write into a queue, declared as:

public static BlockingQueue<>

Now there are a number of threads, say n, which write into Cassandra. Here is the code that does that:

public void run(){
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
    while (!Thread.currentThread().isInterrupted()) {
        Thread.yield();
        if (!content.isEmpty()) {
            try {
                JSONObject msg = content.remove();
                // JSON
                for(String tableName : tableList){
                    CassandraConnector.getSession().execute(createQuery(tableName, msg));
                }
            } catch (Exception e) {

            }
        }
    }
}

content is the BlockingQueue used for read-write operations.

I'm extending the Thread class in the implementation of threading and there are a fixed number of threads that continue execution unless interrupted.

The problem is, this is using too much of CPU. Here is the first line of top command:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
46232 vishran+  20   0 3010804 188052  14280 S 137.8  3.3   5663:24 java

Here is the output of strace on a thread of this process:

strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
....and so on

Why I am using Thread.yield(), is because of this

If you want any other information for debugging, please let me know.

Now the question is, how can CPU utilization be minimized?

Community
  • 1
  • 1
vish4071
  • 5,135
  • 4
  • 35
  • 65

3 Answers3

7

The whole purpose of a BlockingQueue is that it blocks when it is empty. So the consumer threads (the ones populating into Cassandra) don't have to manually check if they are empty. You can just make a call to take() and if the queue is empty, the call will block unless it is interrupted or if there is an element available.

When a thread is blocked, the scheduler can schedule some other thread in its place which saves you from calling yield() and so on. Remember that yield() will give way to another thread only if a thread with a priority greater than or equal to the thread that is yielding is available to run.

public void run(){
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
    try {
            JSONObject msg = content.take();
            // JSON
            for(String tableName : tableList){
                CassandraConnector.getSession().execute(createQuery(tableName, msg));
            }
     } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
     }
}
MS Srikkanth
  • 3,829
  • 3
  • 19
  • 33
  • Can you pls add a code snippet for this. I'll try the block in @Robert's answer. – vish4071 Apr 18 '16 at 11:42
  • Thanks..One more thing...Will `.take()` pop the element? – vish4071 Apr 18 '16 at 11:52
  • One last thing. `.yield` is not reqd now? If not...can you tell the reason? And why are we not using the `isInterrupted()` check? – vish4071 Apr 18 '16 at 11:56
  • Yes yield() is not required anymore as mentioned in my answer above. take() is a blocking call which sets the thread to BLOCKED state and hence it won't be consuming any resources. The scheduler takes care of scheduling some other thread that is available to run. – MS Srikkanth Apr 18 '16 at 11:59
  • And what about `isInterrupted()` check? This will run once and execute a query...and this thread will die...won't it? – vish4071 Apr 18 '16 at 12:01
  • Yes it will die after executing a single query . If you need it to run continuously then add a while loop as you did in the original. My idea was just to show you how take() must be used and so I picked out only code that is relevant.But remember that the while loop in your code will never stop because you are catching InterruptedException and doing nothing about. When InterruptedException is caught, the interrupt status is automatically cleared. So you have to set it manually like i am doing – MS Srikkanth Apr 18 '16 at 12:04
3

From the looks of your code it seems that your consumer threads are always checking for content available. Therefore, your threads are always running and never iddle (waiting for someone to notify them), therefore your CPU is always doing something, even if it is always yielding the thread the current thread.

while (!Thread.currentThread().isInterrupted()) { Thread.yield(); if (!content.isEmpty()) {

You are clearly tring to solve the producer-consumer issue that many of us faced somewhere over our programming careers.

What you're currently doing is having the consumer proactively continually checking if it has something to consume.

The least and easiest CPU intensive way of solving it is:

  1. Have the producer signal the consumer that it has produced something.

Check out this example as it contains a simplest way to do it. You may want to revisit Java Concurrency in Practice for more profound help.

João Rebelo
  • 79
  • 14
  • 2
    @vish4071 You should use the `BlockingQueue` as it is designed to be used: use one of the blocking methods: [`take()`](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#take--) or [`poll(timeout, unit)`](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#poll-long-java.util.concurrent.TimeUnit-) – Mark Rotteveel Apr 18 '16 at 11:22
0

As already described in the other answers you are performing busy waiting instead of using the central feature of your content BlockingQueue: wait for the next entry and remove it from the queue. This is done using the take()method:

while (!Thread.currentThread().isInterrupted()) {
    try {
        JSONObject msg = content.take();
        for(String tableName : tableList){
            CassandraConnector.getSession().execute(createQuery(tableName, msg));
        }
    } catch (Exception e) {

    }
}
Robert
  • 39,162
  • 17
  • 99
  • 152