0

Lets say I have n threads concurrently taking values from a shared queue:

public class WorkerThread implements Runnable{
        private BlockingQueue queue;
        private ArrayList<Integer> counts = new ArrayList<>();
        private int count=0;
        public void run(){
            while(true) {
                queue.pop();
                count++;
            }
        }
}

Then for each thread, I want to count every 5 seconds how many items it has dequeued, and then store it in its own list (counts) I've seen here Print "hello world" every X seconds how you can run some code every x seconds:

Timer t = new Timer();
t.scheduleAtFixedRate(new TimerTask(){
    @Override
    public void run(){
        counts.add(count);
        count = 0
    }
}, 0, 5000);

The problem with this is that I can't access count variable and the list of counts unless they are static. But I don't want them to be static because I don't want the different threads to share those variables.

Any ideas of how to handle this?

Mr. Liu
  • 337
  • 1
  • 3
  • 8
  • Putting `Timer t = ...` code block just above `while (true)` compiles (except that you need `.take` instead of `.pop`), so everything should be fine. But since timer task will execute in another thread, use `AtomicInteger count` and `getAndSet(0)` method. – Victor Sorokin Nov 07 '17 at 14:05
  • Do you need this information in runtime during task execution of after its finishing? – Mikita Harbacheuski Nov 07 '17 at 14:37
  • If I put Timer t=... above the while it would work if I am only using 1 thread. I want to use n threads. For example, I have thread 1 and thread 2. Then in 5 seconds thread 1 has taken 1000 values from the queue, and thread 2 has taken 1100. I need thread 1 to store 1000 in its own list, and thread 2 store 1100 in its own list too. I don't want to share the count variable between threads. I need to collect this information during task execution. I need it after finishing just to check if the work of the thread is more or less the same during its execution. – Mr. Liu Nov 07 '17 at 15:08
  • @Mr.Liu it will work exactly as you described, for several threads as well as for a single one -- each `Runnable` will have its own `count`. – Victor Sorokin Nov 07 '17 at 17:14

2 Answers2

1

I don't think it's possible to use scheduled execution for you case(neither Timer nor ScheduledExecutorService), because each new scheduled invocation will create a new tasks with while loop. So number of tasks will increase constantly.

If you don't need to access this list of counts in runtime i would suggest something like this one:

  static class Task implements Runnable {
    private final ThreadLocal<List<Integer>> counts = ThreadLocal.withInitial(ArrayList::new);
    private volatile List<Integer> result = new ArrayList<>();
    private BlockingQueue<Object> queue;

    public Task(BlockingQueue<Object> queue) {
      this.queue = queue;
    }

    @Override
    public void run() {
      int count = 0;
      long start = System.nanoTime();
      try {
        while (!Thread.currentThread().isInterrupted()) {
          queue.take();
          count++;
          long end = System.nanoTime();
          if ((end - start) >= TimeUnit.SECONDS.toNanos(1)) {
            counts.get().add(count);
            count = 0;
            start = end;
          }
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
      // the last value
      counts.get().add(count);
      // copy the result cause it's not possible
      // to access thread local variable outside of this thread
      result = counts.get();
    }

    public List<Integer> getCounts() {
      return result;
    }
  }

  public static void main(String[] args) throws Exception {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    BlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>();
    Task t1 = new Task(blockingQueue);
    Task t2 = new Task(blockingQueue);
    Task t3 = new Task(blockingQueue);
    executorService.submit(t1);
    executorService.submit(t2);
    executorService.submit(t3);

    for (int i = 0; i < 50; i++) {
      blockingQueue.add(new Object());
      Thread.sleep(100);
    }
    // unlike shutdown() interrupts running threads
    executorService.shutdownNow();
    executorService.awaitTermination(1, TimeUnit.SECONDS);

    System.out.println("t1 " + t1.getCounts());
    System.out.println("t2 " + t2.getCounts());
    System.out.println("t3 " + t3.getCounts());

    int total = Stream.concat(Stream.concat(t1.getCounts().stream(), t2.getCounts().stream()), t3.getCounts().stream())
        .reduce(0, (a, b) -> a + b);
    // 50 as expected
    System.out.println(total);
  }
Mikita Harbacheuski
  • 2,193
  • 8
  • 16
-1
  • Why not a static AtomicLong?
  • Or the WorkerThread(s) can publish that they poped to the TimerTask or somewhere else? And the TimerTask reads that info?
  • What would be the use of an AtomicLong? I don't want each thread to share their own counts. Can you explain a little bit how the WorkerThread(s) can publish their values to the TimerTask? – Mr. Liu Nov 07 '17 at 13:14
  • If you need clarifications or to ask the OP a question then use comments (if you can't, wait until you have enough reputation). Otherwise, turn your answer into a real answer and not a set of questions. – user1803551 Nov 07 '17 at 13:14
  • My answer is more a set of sugestions - not real questions. The WorkerThread could take the TimerTask as parameter and call a method. – Stefan LoKran Dotti Nov 07 '17 at 13:26