I have data that comes in from several multicast addresses. In my task consumer thread, I have an array of reentrantlocks, each index corresponding to a multicast address. I will lock fairly on an address. because I want to ensure that each data must be processed sequentially/in correct order
task = TASK_QUEUE.take(); //arrayblockingqueue
lock = task.getUdpChannel().getLock();
try {
while (!lock.tryLock(TIMEOUT, TimeUnit.MILLISECONDS)){} //busy spin for lock
runTask(appenderIndex);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
LOGGER.info("{} {} {} {} {} |", () -> ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of("Asia/Hong_Kong")) ,
() -> remoteAdd.getHostName(), () -> remotePort, () -> ni,
() -> remaining);
}
Checking the logs, it seems that some threads are "chattier" than others. I currently have 3 threads, and if it were to interact with the 12 datagram packets sent from 239.2.1.1 55000 as seen below, I believe that the task consumers will behave as if it was single threaded for that amount of time, since all the other threads will be blocked waiting for their turn (only one thread is allowed to lock)
I am considering to use multiple threads, more than the amount of cores that I have - 12 task consumer threads vs 8 cores (excluding the main thread and producer threads). This should mean that the extra threads can process tasks that are being queued up without waiting for these series of tasks from that specific address to finish completely.
Is it reasonable to have more threads than cores? Would it be better to find out how to have a backlog of tasks (other threads pass tasks from 239.2.1.1 55000 to the thread that currently has the corresponding lock)
I did read this, which gives me the idea that having more threads is better, but how many more threads would it be reasonable to have?
I have wrote some sample code as follows, The following is a working version that is infinitely looping, and has a working task dispatcher.
public class Main {
private final ArrayBlockingQueue<Task> taskPool;
private final ArrayBlockingQueue<Task> taskQueue;
private static int capacity = 2000;
private static int threads = 3;
private Thread[] producers;
private Thread[] consumers;
public Main() {
this.taskPool = new ArrayBlockingQueue<>(capacity);
this.taskQueue = new ArrayBlockingQueue<>(capacity, true);
this.producers = new Thread[threads];
consumers = new Thread[threads];
ReentrantLock[] locks = new ReentrantLock[4];
TaskConsumer[] cArr = new TaskConsumer[threads];
TaskDispatch dispatch = new TaskDispatch(this.taskQueue, cArr, threads);
for (int i=0; i< locks.length; i++) {
locks[i] = new ReentrantLock();
}
for (int i=0; i< threads; i++) {
this.producers[i] = new Thread(new TaskProducer(this.taskPool, this.taskQueue), "producer"+i);
this.producers[i].start();
cArr[i] = new TaskConsumer(this.taskPool, locks, capacity, dispatch);
this.consumers[i] = new Thread(cArr[i], "consumer"+i);
this.consumers[i].start();
}
Thread dThread = new Thread(dispatch, "dispatch");
dThread.start();
}
public void fillPool() throws InterruptedException {
for (int i=0; i< capacity; i++) {
taskPool.put(new Task());
}
}
}
public class TaskProducer implements Runnable{
private final static String[] randomStrings = {"random", "payload", "to", "simulate"};
private ArrayBlockingQueue<Task> TASK_POOL;
private ArrayBlockingQueue<Task> TASK_QUEUE;
public TaskProducer(ArrayBlockingQueue<Task> pool, ArrayBlockingQueue<Task> queue) {
// TODO Auto-generated constructor stub
this.TASK_POOL = pool;
this.TASK_QUEUE = queue;
}
@Override
public void run() {
// TODO Auto-generated method stub
Task task = null;
Random rand = new Random();
while (true) {
if (Thread.currentThread().isInterrupted()) {
break;
}
if (task == null) {
try {
task = TASK_POOL.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// System.out.println("iterating"+task);
if (task != null) {
int idx = rand.nextInt(0, 4);
task.setId(idx);
task.setData(Instant.now().toString());
try {
TASK_QUEUE.put(task);
task = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
public class TaskConsumer implements Runnable {
private ArrayBlockingQueue<Task> TASK_POOL;
private ReentrantLock[] locks;
private ArrayBlockingQueue<Task> internalQueue;
private TaskDispatch dispatch;
public TaskConsumer(ArrayBlockingQueue<Task> pool, ReentrantLock[] locks, int capacity, TaskDispatch dispatch) {
// TODO Auto-generated constructor stub
this.TASK_POOL = pool;
this.locks = locks;
this.internalQueue = new ArrayBlockingQueue<>(capacity, true);
this.dispatch = dispatch;
}
@Override
public void run() {
// TODO Auto-generated method stub
Task task = null;
while (true) {
// System.out.println("iterating "+Thread.currentThread().getName());
if (Thread.currentThread().isInterrupted()) {
break;
}
if (task == null) {
try {
task = internalQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (task != null) {
//process
int id = task.getId();
System.out.println(Thread.currentThread().getName()+" "+task.getId()+" "+task.getData()+" "+Instant.now().toString());
//return
try {
if (internalQueue.peek() == null) {
synchronized (dispatch) {
if (internalQueue.peek() == null) {
dispatch.signalFreeConsumer(task.getId(), this);
}
}
}
TASK_POOL.put(task);
task = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public void putInternalQueue(Task t) throws InterruptedException {
this.internalQueue.put(t);
}
}
public class TaskDispatch implements Runnable {
private ConcurrentHashMap<Integer , TaskConsumer> mapping;
private ArrayBlockingQueue<TaskConsumer> freeConsumers;
private ArrayBlockingQueue<Task> TASK_QUEUE;
private TaskConsumer[] consumers;
public TaskDispatch( ArrayBlockingQueue<Task> queue,TaskConsumer[] consumers, int threads) {
mapping = new ConcurrentHashMap<>(threads);
freeConsumers = new ArrayBlockingQueue<>(threads);
this.TASK_QUEUE = queue;
this.consumers = consumers;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i=0; i< consumers.length; i++) {
try {
freeConsumers.put(consumers[i]);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Task task = null;
TaskConsumer free = null;
while (true) {
if (Thread.currentThread().isInterrupted()) {
break;
}
if (task == null) {
try {
task = TASK_QUEUE.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (task != null) {
if (mapping.containsKey(task.getId())) {
synchronized (this) {
if (mapping.containsKey(task.getId())) {
try {
mapping.get(task.getId()).putInternalQueue(task);
task = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}else {
continue;
}
}
}else {
try {
if (free == null) {
free = freeConsumers.poll(5000, TimeUnit.MILLISECONDS);
}
if (free != null) {
synchronized (this) {
mapping.putIfAbsent(task.getId(), free);
free.putInternalQueue(task);
task = null;
free = null;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
public void signalFreeConsumer(Integer id, TaskConsumer consumer) throws InterruptedException {
synchronized (this) {
// System.out.println("removing "+id+" "+Thread.currentThread().getName());
mapping.remove(id, consumer);
freeConsumers.put(consumer);
// System.out.println("notify "+Arrays.asList(freeConsumers.toArray()));
}
}
}
public class Runner {
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
Main m = new Main();
m.fillPool();
}
}
public class Task {
private int id;
private String data;
public Task () {}
public Task(int id, String dat) {
// TODO Auto-generated constructor stub
this.id = id;
this.data = dat;
}
public int getId() {
return this.id;
}
public String getData() {
return this.data;
}
public void setId(int id) {
this.id = id;
}
public void setData(String data) {
this.data = data;
}
public String toString() {
return ""+id+" "+data;
}
}