0

I'm working on a problem, that was supposed to be VERY simple to solve, however I am not getting it done so easily.

The problem is quite simple: I have a Java Program running on Linux/x86 that can perform two basic functionalities F1 and F2. I would like to set F1 to have a higher priority, even though it is a MUST that F2 executes from times to times, i.e, the fact of having F1 requests on queue cannot put F2 requests waiting forever.

My first though was just having separate queues with a thread pool for each functionality, I set the F1 pool to have 8 threads while the F2 pool got only 2 threads.

On my expectaion linux would give fairly time share for each thread, so F1 would have 8 quantums while F2 would get just 2. If there was no F1 requests, F2 pool could get every quantum to itself, the same should be true for F1 just in case F2 has no requests.

However, the program is not behavingthat way, if I get a burst of F2 requests and just couple of F1 requets, the latter is taking a long time to get its turn.

Doest that make sense talking about Oracle HotSpot/linux scheduling? Or it should not be happening, what would point to an implementation error from my part?

PS: I've read about linux scheduling, and it seems that SCHED_OTHER (TS) gives time share for each task, however every time a task ready is not executed it gets a bigger quantum, and if that is happening to F2 pool, that might explain the above mentioned behavior.

Thanks and Regards.

Below there is a sample source code.

package test;

import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * Created by giscardff on 08/07/18.
 */
public class TestThread {

    // Test Program
    public static void main(String args[]) throws Exception {

        // queues containing jobs to be done
        ArrayBlockingQueue<MyDTO> queueA = new ArrayBlockingQueue<>(100);
        ArrayBlockingQueue<MyDTO> queueB = new ArrayBlockingQueue<>(100);

        // create pool for functionality A
        for(int i = 1; i <= 8; i++){
            MyThread thread = new MyThread("ThreadA" + i, queueA);
            thread.start();
        }

        // create pool for functionality B
        for(int i = 1; i <= 2; i++){
            MyThread thread = new MyThread("ThreadB" + i, queueB);
            thread.start();
        }

        // create producer for A
        // it will take 100ms  between requests
        Producer producerA = new Producer(queueA, 0);
        producerA.start();

        // create producer for B
        // it will take 0ms between requests
        Producer producerB = new Producer(queueB, 0);
        producerB.start();

    }

}

/**
 * Just put a request into a queue
 */
class Producer extends Thread {

    private ArrayBlockingQueue<MyDTO> queue;
    private long sleep;

    public Producer(ArrayBlockingQueue<MyDTO> queue, long sleep){
        this.queue = queue;
        this.sleep = sleep;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if(sleep > 0)Thread.sleep(sleep);
                queue.put(new MyDTO());
            }
        }catch(Exception ex){}
    }
}

/**
 * Retrieve a request from a queue, calculate how long request took to
 * be received for each 1M requests
 */
class MyThread extends Thread {

    private ArrayBlockingQueue<MyDTO> queue;
    private long delay = 0;
    private int count = 0;

    public MyThread(String name, ArrayBlockingQueue<MyDTO> queue){
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                MyDTO input = queue.take();
                delay += System.currentTimeMillis() - Long.parseLong(input.getTime());
                if(++count % 1000 == 0){
                    System.out.printf("%s: %d\n", getName(), delay / 10);
                    count = 0;
                }
            }
        }catch(Exception ex){ex.printStackTrace();}
    }
}

/**
 * Just a DTO representing a request
 * NOTE: The time was set as String to force CPU to do something more than just math operations
 */
class MyDTO {
    private String time;
    public MyDTO(){
        this.time = "" + System.currentTimeMillis();
    }

    public String getTime() {
        return time;
    }
}
giscard.faria
  • 331
  • 1
  • 13
  • I think you're at the mercy of your OS scheduler and how the JVM interacts with it. I'm not aware of any Java APIs that allow you to precisely set the exact amount of time two threads are allocated time. – markspace Jul 07 '18 at 22:18
  • 1
    Can you post the code that is managing your threading? Are the tasks IO bound or CPU bound? How are you measuring them? – Krease Jul 07 '18 at 22:22
  • How many cores are there in the machine? – Andreas Jul 07 '18 at 22:23
  • I think the best you'll get for this type of problem is a [PriorityQueue](https://docs.oracle.com/javase/7/docs/api/java/util/PriorityQueue.html), or perhaps a [ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html) for it's work stealing algorithm. – MeetTitan Jul 07 '18 at 22:24
  • Also, how many OTHER PROCESSES (not JVM threads, but other stuff like daemons or just plain other applications) are running? What is their process priority? Have you tried playing with the `nice` command? What values did you use? – markspace Jul 07 '18 at 22:24
  • @Krease in order to test the concept I created a very simple code, just create two pool of threads, each one with its own queue and use the main to fill both queues up. Both threads does basically the same, they go to the database and insert some data, The most priority functionality perform a SELECT and the low priority one does an INSERT. – giscard.faria Jul 07 '18 at 22:32
  • @Andreas it is running in a 2vCPU, however it could be deployed on a 4vCPU EC2 as well – giscard.faria Jul 07 '18 at 22:32
  • @markspace just the OS ones and my process (inside a docker container), I did try set thread priorities, no changes, and I believe that is because TS do not use them, I will give nice a try however setting nice every time the container restart or app is update would not be good for sysops – giscard.faria Jul 07 '18 at 22:34
  • BTW I am thinking about use chrt to change process scheduling to RR, at this way I am quite sure every thread will get the same "quantum" so if a functionality has more threads it for sure will get more time. I just post the question to see if the results I am getting make sense or it should not be happen. @markspace, let all the work to the OS is what I would like to have, if it is possible to get that from OS I won't touch a single line of code to get it :P – giscard.faria Jul 07 '18 at 22:42
  • 2
    Your code is running SQL statements? Then you're not utilizing the CPUs, because they spend most of their time waiting on the database server. Your performance test is meaningless, since you're not testing the CPU scheduling of your application server at all. – Andreas Jul 07 '18 at 22:42
  • @Andreas is not just performing SELECT/INSERT it has to deserialize a json object build the SQL and execute it, CPU is getting over 90%, so, for sure it has been using a lot. Anyway, the main point I am trying to reach is confirm the fact of having more thread for a functionality don't guarantee ti will get more time from scheduler – giscard.faria Jul 07 '18 at 22:45
  • It seems to me that using a single `ThreadPoolExecutor` with a `PriorityBlockingQueue` or similar might be your best plan. – Dawood ibn Kareem Jul 08 '18 at 02:28
  • @DawoodibnKareem I believe that PriorityBlockingQueue will always put the most priority task ahead the other ones, it does not allow a behavior where priority task will just get more share without block the other ones – giscard.faria Jul 08 '18 at 14:53
  • Try doing the same measurements without going to the database (ie: make it purely CPU bound). Also please provide the code for how you’re setting up your thread pools – Krease Jul 08 '18 at 15:37
  • Well, yes, but you can implement your own logic to calculate the priority of each task, depending on what's already in the queue. – Dawood ibn Kareem Jul 08 '18 at 18:41
  • I've put a test source code in the original question, after running it I got the result below, from output is possible note ThreadB pool got almost twice requests when compared to the A one. ThreadB2 3810 ThreadB1 2930 ThreadA1 692 ThreadA3 682 ThreadA5 558 ThreadA6 500 ThreadA4 430 ThreadA2 406 ThreadA7 234 ThreadA8 211 – giscard.faria Jul 08 '18 at 22:16
  • 90% CPU for database bound code sounds high. Have you measured GC? – ewramner Jul 09 '18 at 07:09

1 Answers1

1

It looks like you've got a few issues. I'll try to summarize them and provide starting point for a path forward:

Thread Contention

Using the BlockingQueue comes with a cost - every write operation (put & take) is lock contended between the producers or consumers. Your "A pool" has 9 threads contending over the write lock for queueA (1 producer, 8 consumers), while your "B pool" has 3 threads contending over the lock for queueB (1 producer, 2 consumers).

This related answer provides a bit more detail about contention. The simplest ways around this are to "use less threads" or use "lock-free" mechanisms to eliminate the contention.

Thread Scheduling

As mentioned in the comments, you're at the mercy of how the JVM is scheduling your threads.

If java thread scheduling used perfectly fair time shares on the CPU, you'd probably see consumption counts on each thread in the same pool extremely close to each other. You've probably noticed they're not - my runs of your (slightly modified) code occasionally give me a count spread of 300K or more across the threads.

You can often get this better when there are enough CPU cores for each CPU-bound thread (you've got 12 in your sample code), but it's far from ideal in many cases, especially in the face of thread contention.

What can you do?

  1. Build your own fairness logic - don't rely on the JVM thread scheduler to be fair, because it won't be.
    • A simple idea for your case would be to keep both queues, but use a single pool to process both - use either round-robin or Math.random() (ie: if (rand < 0.8) { queueA.poll();}) to determine which queue to poll from. Note - Use poll so you can easily handle the case when a queue is empty without blocking.
  2. Experiment with number of CPU-bound threads running on your hardware. With my suggestion for (1) above, you could even have just one worker thread fairly processing both queues. Remember, too many threads contending over the same resources will slow down your processing.

Isn't threading fun? :)

Krease
  • 15,805
  • 8
  • 54
  • 86
  • I did a test with a LinkedBlockingQueue looking better results just to get the same and see in the source code they have a unique lock for both head and tail :P Backing to ArraryBlockingQueue... same test using PoolB=2/PoolA=5, PoolB got 2.6M/PoolA got 880K (almost 3 times more) Same test with PoolB=2/PoolA=5, PoolB got 5.2M/PoolA got 3M (73% more) Same test with PoolB=2/PoolA=2, PoolB got 11M/PoolA got 41M, wtf? yeah, this time PoolA got almost 3 times more So,I believe the answer to the original question for sure is more threads doesn't mean (for sure) more timeshare – giscard.faria Jul 09 '18 at 23:52
  • 1
    I will go with a yield() for each N processed messages on the lower priority pool, also I will reduce number of threads, it was clearly that too much threads worsened performance, thanks. – giscard.faria Jul 09 '18 at 23:56