2

I need the following scenario:

Run all ScheduledFutures within a cycle and call every time the method tasksCompleted() after all tasks finished its execution. The next scheduling cycle must not wait while calling tasksCompleted() after the actual scheduling cycle.

In short: Call a method after the completion of the actual scheduling-cycle and do not stop the next scheduling-cycle

The following code creates tasks and the scheduling works. However, I am not able call tasksCompleted() when all tasks within a cycle completed.

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Scheduler {

    public static void main(String[] args) {
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        System.out.println("- [" + LocalTime.now() + "] run parent-task...");

        // create 3 tasks: each task needs 7 seconds.
        var tasks = createTasks("child", 3, 7);
        List<ScheduledFuture<?>> futures = new ArrayList<>();
        tasks.forEach(t ->
        {
            ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
            futures.add(future);
        });
        // this does not work..
        var scheduleCycleCompleted = futures.stream().allMatch(f -> f.isDone());
        System.out.println("scheduleCycleCompleted: " + scheduleCycleCompleted);

        // maybe a solution with CompletableFuture?
        CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    }

    static void tasksCompleted() {
        System.out.println("schedule cycle completed");
    }

    static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration) {
        var tasks = new ArrayList<Runnable>();
        for (var i = 0; i < numbersOfTasks; i++) {
            int taskNr = i;
            Runnable task = () ->
            {
                System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
                        + taskDuration + " seconds]");
                try {
                    TimeUnit.SECONDS.sleep(taskDuration);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            };
            tasks.add(task);
        }
        return tasks;
    }

}
nimo23
  • 5,170
  • 10
  • 46
  • 75
  • Probably a [duplicate question](https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future). – Amit Mendapara Feb 03 '20 at 11:58
  • Does this answer your question? [Waiting on a list of Future](https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future) – daniu Feb 03 '20 at 12:31
  • I simplified the use case a little bit: I do not want to stop the scheduling cycle until all previous taks has been finished. It should only call a method `tasksCompleted()` after one cylce completed. – nimo23 Feb 03 '20 at 12:31
  • Why don't you just schedule a single task that then runs the individual ones in parallel? – daniu Feb 03 '20 at 12:33
  • @daniu I dont want to do that, because then all its tasks must wait until the new schedule can be made. Each task should run with its own scheduler and should not depend on the finishing of other tasks. – nimo23 Feb 03 '20 at 12:38
  • I don't really see a context in which this makes a lot of sense. It's fine if the tasks all take about the same amount of time, but if you have one task which takes 7 seconds and one which takes 20, you'll be lagging behind with the "completed cycles" soon. – daniu Feb 03 '20 at 12:54
  • @daniu the 7 sec task is only an example. The use case is the following: Give me a signal when one cylce is done. Nothing more. If t1 needs 7 sec and t2 needs 20 sec. then the next cycle for t1 will begin sooner than t2. This is intended. But after one cycle is finished (after t1 and t2 finishes), then the scheduler should give me a signal. Nothing more. – nimo23 Feb 03 '20 at 13:11

2 Answers2

2

Updated

I hope it will work.

CountDownLatch will solve the problem here.

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Scheduler {

    public static void main(String[] args) throws InterruptedException {
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);

        System.out.println("- [" + LocalTime.now() + "] run parent-task...");

        int noOfTask=3;
        CountDownLatch countDownLatch = new CountDownLatch(noOfTask);

        TaskComplete taskComplete=new TaskCompleteImpl(noOfTask,countDownLatch);

        // create 3 tasks: each task needs 7 seconds.
        List<Runnable> tasks = createTasks("child", noOfTask, 7,countDownLatch,taskComplete);

        List<ScheduledFuture<?>> futures = new ArrayList<>();
        tasks.forEach(t ->
        {
            ScheduledFuture<?> future = ses.scheduleWithFixedDelay(t, 0, 2, TimeUnit.SECONDS);
            futures.add(future);
        });
        // this does not work..





    }

    interface TaskComplete{
        void tasksCompleted();
    }

    static class TaskCompleteImpl implements TaskComplete {

        int totalTask=0;
        int index=0;
        CountDownLatch countDownLatch;
        public TaskCompleteImpl(int totalTask){

        }

        public TaskCompleteImpl(int noOfTask, CountDownLatch countDownLatch) {
            this.totalTask=noOfTask;
            this.countDownLatch=countDownLatch;
        }

        @Override
        public synchronized void tasksCompleted() {
            index=index+1;
            if(index==totalTask){
                System.out.println("schedule cycle completed");
                index=0;
                countDownLatch=new CountDownLatch(totalTask);
            }

        }
    }


    static List<Runnable> createTasks(String group, int numbersOfTasks, long taskDuration, CountDownLatch countDownLatch, TaskComplete taskComplete) {
        List tasks = new ArrayList<Runnable>();
        for (int i = 0; i < numbersOfTasks; i++) {
            int taskNr = i;
            Runnable task = () ->
            {
                System.out.println("- [" + LocalTime.now() + "] Running " + group + "-task" + taskNr + "...[needs "
                        + taskDuration + " seconds]");
                try {
                    TimeUnit.SECONDS.sleep(taskDuration);
                    countDownLatch.countDown();
                    countDownLatch.await();
                    taskComplete.tasksCompleted();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            };
            tasks.add(task);
        }
        return tasks;
    }

}
  • thanks, but this does not work: the futures list is empty at that time because it is filled by another asynchronous thread (The scheduledExecutor). And using future.get(); within the task-loop would stop the execution of the next task until it is done even within the same scheduling cycle. – nimo23 Feb 03 '20 at 11:49
  • I don't see how his proposal is wrong, could you elaborate with some sample? If the blocking during 'get' affects your code, you can always provide a 0 timeout in that call. – Mr. tk Feb 03 '20 at 12:00
  • Thanks. But I do not want to stop the `ses`. It should run indefinitly. It should only call a method `tasksCompleted()` after one cylce completed. Nothing more. – nimo23 Feb 03 '20 at 12:33
  • Use task completion as a listener service, in that case, you can get the output in a single place. – Gaurav Bhardwaj Feb 03 '20 at 13:03
  • thanks, but countdownlatch does not work in this scenario because it only works for the first scheduling cycle. – nimo23 Feb 03 '20 at 13:14
  • 1
    It will work because after every cycle it will reinitialize, Just check the class TaskCompleteImpl – Gaurav Bhardwaj Feb 03 '20 at 13:19
  • @nimo23 It will not work if there is a difference between the running time of tasks. As an extreme example, if you have one task running 5 and one running 20 seconds, the `CountDownLatch` will trigger after task 1 has completed twice, but it's not the same scheduling cycle. – daniu Feb 04 '20 at 07:22
1

To handle the case of different runtimes within a scheduling cycle, what you need is a way to identify which task belongs to which group. You can do that by giving them an identifying counter, so that each time a task is executed, the counter is used to denominate the group it's running in.

interface GroupedRunnable extends Runnable {
    String getGroup();
}

class CountingRunnable implements GroupedRunnable {
    private AtomicInteger counter = new AtomicInteger();
    private final Runnable delegate;
    private final String taskName;

    CountingRunnable(Runnable delegate, String taskName) {
        this.delegate = delegate;
        this.taskName = taskName;
    }
    public void run() {
        System.out.printf("[%s] - Running task %s in group %s%n", LocalTime.now(), taskName, getGroup());
        delegate.run();
        counter.incrementAndGet();
        System.out.printf("[%s] - Running task %s in group %s finished%n", LocalTime.now(), taskName, getGroup());
    }

    @Override
    public String getGroup() {
        return counter.toString();
    }
}

Now, you can have a watchdog class that keeps track on which members of which group have already executed. Since you know how many members a group has, a simple counter is sufficient.

class GroupMonitoringService {
    // key: group, value: tasks
    Map<String, AtomicInteger> finishedTasks = new HashMap<>();
    private final Runnable finisher;
    private final int groupSize;

    GroupMonitoringService(Runnable finisher, int groupSize) {
        this.finisher = finisher;
        this.groupSize = groupSize;
    }

    public synchronized void taskFinished(String group) {
        var finishedInGroup = finishedTasks.computeIfAbsent(group, k -> new AtomicInteger());
        if (finishedInGroup.incrementAndGet() >= groupSize) {
            // scheduling group complete
            System.out.printf("Group %s finished executing%n", group);
            finisher.run();
            finishedTasks.remove(group);
        }
    }
}

Now all you have to do is wrap your original task into a grouped task, and make sure the monitoring service is notified when it's finished, so wrap again.

private static List<Runnable> createTasks() {
    List<Runnable> result = new ArrayList<>();
    for (int i = 0; i < GROUP_SIZE; i++) {
        RandomWaitTask originalTask = new RandomWaitTask();
        CountingRunnable groupedTask = new CountingRunnable(originalTask, "Task " + i);
        Runnable notifyingRunnable = () -> {
            groupedTask.run();
            MONITORING_SERVICE.taskFinished(groupedTask.getGroup());
        };
        result.add(notifyingRunnable);
    }
    return result;
}

So now you can just schedule those.

You can see the entire code here (although it doesn't actually run properly on that site because of the resource limit it imposes, but if you copy it into your IDE, it works).

daniu
  • 14,137
  • 4
  • 32
  • 53
  • Thanks! That is really a nice solution. In my case the code with CountDownLatch also works, However, I improved it by using java `Phaser` instead (with this, I do not need to know in advance how many parties are involved). – nimo23 Feb 04 '20 at 08:07