0

This question is an architectural problem that I have not been able to figure out.

I have a TaskScheduler that has operations such as start() and stop(). TaskScheduler is intended to be agnostic, I want to be able to pass into it any "Runnable", a "UID" and the "Interval" that the service should run for. This all gets added to a hashmap so that if you try to pass in an existing runnable with the same UID it will replace the previous runnable with the new information.

Extending the TaskScheduler is MyScheduler, which is specific to the request that I want to make. In this example, I am making multiple Profile requests every 60 seconds. To keep track of which profile request is which, I am using UID as a key.

I want to then bubble up the responses to the app level from MyScheduler. This is where I am having issues. I am only able to bubble up the response from the latest scheduler. So if I create Scheduler A and Scheduler B, I will receive updates only from Scheduler B. Simlarly, if I create Scheduler A-C, then I will receive updates only from Scheduler C.

I know why this is, MyScheduler uses the last request that was passed into it. However, I do not know a good pattern (methodology) to resolve this.

TaskScheduler class

public class TaskScheduler {

    private static Map<String, SchedulerModel> schedulerModels = new HashMap<>();

    TaskScheduler() {}

    private ScheduledFuture<?> start(@NotNull final SchedulerModel schedulerModel) {
        return schedulerModel.executorService.scheduleWithFixedDelay(schedulerModel.runnable, 0, schedulerModel.interval, TimeUnit.SECONDS);
    }

    /**
     * Method is used to onSchedulerStop executing tasks
     */
    private void shutdown(@NotNull SchedulerModel schedulerModel) {
        if (schedulerModel.executorService != null) {
            schedulerModel.executorService.shutdownNow();
            schedulerModel.executorService = null;
        }
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        SchedulerModel schedulerModel = new SchedulerModel();
        schedulerModel.executorService = Executors.newSingleThreadScheduledExecutor();
        schedulerModel.runnable = runnable;
        schedulerModel.interval = interval;
        schedulerModels.put(uid, schedulerModel);
    }

    public void stop(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            shutdown(schedulerModels.get(uid));
            schedulerModels.remove(uid);
        } else {
            // scheduler id not found
        }
    }


    public void start(@NotNull String uid) {
        if (schedulerModels.get(uid) != null) {
            start(schedulerModels.get(uid));
        } else {
            // scheduler id not found
        }
    }

}

MyScheduler (this name is temporary)

public class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 60; // seconds

    private ProfileRequest request;
    private ApiInterface apiInterface;
    private SchedulerInterface schedulerInterface;


    public MyScheduler() {}

    public void createScheduler(@NotNull ApiInterface apiInterface,
                               @NotNull ProfileRequest request,
                               @NotNull SchedulerInterface schedulerInterface) {

        this.apiInterface = apiInterface;
        this.request = request;
        this.schedulerInterface = schedulerInterface;
        super.setTask(new SchedulerRunnable(), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    public void start(@NotNull String uid) {
        start(uid); // start scheduler
        schedulerInterface.onSchedulerStart(uid); // send feedback to callback
    }

    public void stop(@NotNull String uid) {
        stop(uid); // stop scheduler
        schedulerInterface.onSchedulerStop(uid); // send feedback to callback
    }

    private class SchedulerRunnable implements Runnable {

        @Override
        public void run() {

            ApiClient.createBookmark(request, new Callback<Response>() {
                @Override
                public void onSuccess(@NotNull Response response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}

Trying to achieve this on app level

    mProfileScheduler.createScheduler(apiInterface, request, new SchedulerInterface {

    Override
    public void onSuccess(Response response) {
       // problem so far is that I only get response from latest scheduler
    }

    Override
    public void onFailure(Exception exception) {}

    Override
    public void onSchedulerStop(String uid) {
       // pass back uid so that I know which profile scheduler was stopped
    }

    Override
    public void onSchedulerStart(String uid) {}
       // pass back uid so that I know which profile scheduler was started
    }
});
portfoliobuilder
  • 7,556
  • 14
  • 76
  • 136

1 Answers1

1

You have this problem because schedulerInterface is a member of MyScheduler. Thus it's shared across all tasks and is overwritten after each new task is submitted.

Solution here is to make schedulerInterface a member of SchedulerRunnable:


private class SchedulerRunnable implements Runnable {
  private SchedulerInterface schedulerInterface;

  SchedulerRunnable(SchedulerInterface schedulerInterface) {
    this.schedulerInterface = schedulerInterface;
  }
}

In order to invoke onSchedulerStop() and onSchedulerStart() you can make start() and stop in TaskScheduler return Runnable. Then in MyTaskScheduler you would cast it to SchedulerRunnable to obtain a reference to the schedulerInterface.

If you don't want Runnable to be returned as part of public interface, you can create protected methods e.g. Runnable doStart() and Runnable doStop that can be overridden and are invoked by void start() and void stop().


Other issues

Concurrency

You are using HashMap for TaskScheduler schedulerModels. It is not thread-safe. This is OK if you do not intend to access it from more than one thread. Otherwise, you may encounter issues with race conditions and memory visibility.

You should use ConcurrentHashMap and its atomic operations like computeIfPresent() or computeIfAbsent() instead of put.

Resource management

  1. When you replace an existing task with a new one with same UID you neither stop its executor service nor cancel the currently running task. Thus you are going to leak threads and previous runnable will keep running.

  2. You create a new SingleThreadExecutorService for each task. That makes number of used threads potentially unbounded and makes it's hard to make any guarantees about application resource consumption. Normally you would use a thread pool with fixed number of threads that are reused between tasks.

Again, I suggest reading "Java Concurrency in Practice" book to learn about these problems and patterns to solve them.


Full Solution

After talking in chat this is my suggested solution. I've mocked all unspecified classes and interfaces.

import org.jetbrains.annotations.NotNull;

import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {

    private static class MySchedulerInterface implements SchedulerInterface {
        private final ProfileRequest request;

        public MySchedulerInterface(ProfileRequest req1) {
            this.request = req1;
        }

        @Override
        public void onSuccess(String response) {
            System.out.println("onSuccess:[" + request + "]" + response);
        }

        @Override
        public void onFailure(Exception exception) {
            System.out.println("onFailure:[" + request + "]" + exception);
        }

        @Override
        public void onSchedulerStop(String uid) {
            System.out.println("onSchedulerStop:[" + request + "] - " + uid);
        }

        @Override
        public void onSchedulerStart(String uid) {
            System.out.println("onSchedulerStart:[" + request + "] - " + uid);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        ApiInterface api = new ApiInterface();

        ProfileRequest req1 = new ProfileRequest("1", "apple");
        ProfileRequest req2 = new ProfileRequest("2", "orange");
        ProfileRequest req3 = new ProfileRequest("3", "peach");
        ProfileRequest req11 = new ProfileRequest("1", "pineapple");

        MyScheduler scheduler = new MyScheduler();
        scheduler.createScheduler(api, req1, new MySchedulerInterface(req1));
        scheduler.createScheduler(api, req2, new MySchedulerInterface(req2));
        scheduler.createScheduler(api, req3, new MySchedulerInterface(req3));

        System.out.println("Created 3 tasks");
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Starting 3 tasks");
        scheduler.start("1");
        scheduler.start("2");
        scheduler.start("3");
        System.out.println("Started 3 tasks");

        TimeUnit.SECONDS.sleep(10);

        System.out.println("Replacing task 1...");
        scheduler.createScheduler(api, req11, new MySchedulerInterface(req11));
        System.out.println("Replaced task 1.");

        TimeUnit.SECONDS.sleep(10);
        System.out.println("Stopping 3 tasks...");
        scheduler.stop("1");
        scheduler.stop("2");
        scheduler.stop("3");
        System.out.println("The end.");
    }
}

class ProfileRequest {
    private final String uid;
    private final String value;

    public ProfileRequest(String uid, String value) {
        this.uid = uid;
        this.value = value;
    }

    public String getUid() {
        return uid;
    }

    public String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return new StringJoiner(", ", ProfileRequest.class.getSimpleName() + "[", "]")
                .add("uid='" + uid + "'")
                .add("value='" + value + "'")
                .toString();
    }
}

class ApiInterface {
    public void createBookmark(ProfileRequest request, Callback<String> stringCallback) {
        stringCallback.onSuccess("SUCCESS: I'm done with " + request);
    }
}

interface SchedulerInterface {

    void onSuccess(String response);

    void onFailure(Exception exception);

    void onSchedulerStop(String uid);

    void onSchedulerStart(String uid);
}


interface Callback<T> {
    void onSuccess(@NotNull T response);

    void onFailure(@NotNull Exception exception);
}

class MyScheduler extends TaskScheduler {

    private final int DEFAULT_SCHEDULER_INTERVAL = 2; // seconds

    public MyScheduler() {
    }

    public void createScheduler(@NotNull ApiInterface apiInterface,
                                @NotNull ProfileRequest request,
                                @NotNull SchedulerInterface schedulerInterface) {
        super.setTask(new SchedulerRunnable(apiInterface, request, schedulerInterface), request.getUid(), DEFAULT_SCHEDULER_INTERVAL);
    }

    @Override
    public ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask task = super.doStart(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStart(uid);
        }
        return task;
    }

    @Override
    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = super.doStop(uid);
        if (task != null) {
            final SchedulerRunnable runnable = (SchedulerRunnable) task.runnable;
            runnable.schedulerInterface.onSchedulerStop(uid);
        }
        return task;
    }

    private class SchedulerRunnable implements Runnable {

        private final ApiInterface apiInterface;
        private final ProfileRequest request;
        private final SchedulerInterface schedulerInterface;

        SchedulerRunnable(ApiInterface apiInterface, ProfileRequest request, SchedulerInterface schedulerInterface) {
            this.apiInterface = apiInterface;
            this.request = request;
            this.schedulerInterface = schedulerInterface;
        }

        @Override
        public void run() {
            apiInterface.createBookmark(request, new Callback<String>() {
                @Override
                public void onSuccess(@NotNull String response) {
                    schedulerInterface.onSuccess(response);
                }

                @Override
                public void onFailure(@NotNull Exception exception) {
                    schedulerInterface.onFailure(exception);
                }
            });
        }
    }
}


class SchedulerModel {
    ScheduledExecutorService executorService;
    Runnable runnable;
    int interval;
}


class TaskScheduler {

    static class ScheduledTask {
        String uid;
        Runnable runnable;
        int interval;
        ScheduledFuture<?> future;

        ScheduledTask(String uid, Runnable runnable, int interval, ScheduledFuture<?> future) {
            this.uid = uid;
            this.runnable = runnable;
            this.interval = interval;
            this.future = future;
        }

        void dispose() {
            if (future != null) {
                future.cancel(true);
            }
        }

        boolean isScheduled() {
            return future != null;
        }
    }

    private ConcurrentMap<String, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>();
    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

    TaskScheduler() {
    }

    /**
     * Method is used to initialize scheduler task and time delays
     *
     * @param runnable Represents a command that can be executed
     * @param interval The time interval for execution of code
     */
    void setTask(Runnable runnable, String uid, int interval) {
        AtomicBoolean requiresRestart = new AtomicBoolean(false);
        final ScheduledTask task = scheduledTasks.compute(uid, (id, oldTask) -> {
            ScheduledTask newTask = new ScheduledTask(uid, runnable, interval, null);
            if (oldTask != null) {
                oldTask.dispose();
                requiresRestart.set(oldTask.isScheduled());
            }
            return newTask;
        });

        if (requiresRestart.get()) {
            start(uid);
        }
    }

    public void start(@NotNull String uid) {
        doStart(uid);
    }

    public void stop(@NotNull String uid) {
        doStop(uid);
    }

    protected ScheduledTask doStart(@NotNull String uid) {
        final ScheduledTask scheduledTask = scheduledTasks.computeIfPresent(uid, (id, oldTask) -> {
            ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
                    oldTask.runnable, 0, oldTask.interval, TimeUnit.SECONDS);
            ScheduledTask newTask = new ScheduledTask(oldTask.uid, oldTask.runnable, oldTask.interval, future);
            return newTask;
        });
        return scheduledTask;
    }

    protected ScheduledTask doStop(@NotNull String uid) {
        final ScheduledTask task = scheduledTasks.remove(uid);
        task.dispose();
        return task;
    }


}

Devstr
  • 4,431
  • 1
  • 18
  • 30
  • 1
    There are also numerous concurrency and resource management issues with your code. I can expand on that if you like. I'd suggest reading the Java Concurrency in Practice book for general background on concurrency: http://jcip.net/ – Devstr Sep 04 '19 at 19:05
  • thanks for the example. And yes, please elaborate on the issues you see. – portfoliobuilder Sep 04 '19 at 19:12
  • #1 is a good point, thanks for that. But #2 is purposeful. There are x-number of threads that can be created. Ignore the nightmare that creates. But the way to do that, from my research, is to create a new single ThreadExecutorService for each task. I can then cancel (shutdown) each task using the UID. Which is working perfectly. – portfoliobuilder Sep 04 '19 at 19:45
  • 1
    Welcome! Can you please elaborate what is the reasoning behind #2? IMO the better way would be to keep a reference to the `Future` that you get from `executor.scheduleWithFixedDelay()` when submitting a task and call `cancel` on it if you want to stop it. Your runnable need to to handle `ThreadInterruptedException` properly, of course. – Devstr Sep 04 '19 at 19:50
  • The reason behind #2 is silly Product Managers. I have a question about the reference to the Future. While that helps me keep track of the threads, how am I keeping track of the responses being returned? Am I also keeping a list of ScheduledFuture> – portfoliobuilder Sep 04 '19 at 19:54
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/198969/discussion-between-portfoliobuilder-and-devstr). – portfoliobuilder Sep 04 '19 at 19:55