0

I am implementing an ExecutorService on my own to learn about the Java internals. While the ExecutorService works when called upon in a single thread, I am not sure as to how implement it such that multiple threads can submit tasks to the same executor service concurrently. How do I implement this functionality into my existing code ?

The code I have written is as follows.

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

public class MyExecutorService implements ExecutorService {
    private final MyWorkerThread[] workerThreads;
    private final LinkedList<MyTask> taskQueue;
    private final LinkedList<Object> completed;
    private boolean terminated;
    private boolean hasBeenShutDown;
    private final Object shutDownLock;

    public MyExecutorService(int numThreads) {
        workerThreads = new MyWorkerThread[numThreads];
        completed = new LinkedList<>();
        taskQueue = new LinkedList<>();
        shutDownLock = new Object();
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i] = new MyWorkerThread();
            workerThreads[i].start();
        }
        terminated = false;
        hasBeenShutDown = false;
    }

    @Override
    public void shutdown() {
        hasBeenShutDown = true;
        for (int i = 0; i < workerThreads.length; i++) {
            submitToStop();
        }
        for (MyWorkerThread worker : workerThreads) {
            try {
                worker.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        synchronized (shutDownLock) {
            shutDownLock.notifyAll();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        hasBeenShutDown = true;
        for (MyWorkerThread workerThread : workerThreads) {
            workerThread.interrupt();
        }
        LinkedList<Runnable> pendingTasks = new LinkedList<>();
        for (MyTask task : taskQueue) {
            pendingTasks.add(task.getFutureTask());
        }
        return pendingTasks;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean isShutdown() {
        return hasBeenShutDown;
    }


    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        System.out.println("Awaiting Termination");
        synchronized (shutDownLock) {
            try {
                shutDownLock.wait(unit.convert(timeout, unit));
                terminated = true;
            } catch (InterruptedException e) {
                // ignore
            }
        }
        return hasBeenShutDown;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        FutureTask<T> futureTask = new FutureTask<>(task, result);
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }

    @Override
    public Future<?> submit(Runnable task) {
        FutureTask<?> futureTask = new FutureTask<>(task, "RESULT");
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
        List<Future<T>> futures = new ArrayList<>();
        for (Callable<T> task : tasks) {
            futures.add(submit(task));
        }
        return futures;
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        FutureTask<List<Future<T>>> futureWorkerTask = new FutureTask<>(() -> invokeAll(tasks));
        try {
            return futureWorkerTask.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return new ArrayList<>();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        invokeAll(tasks);
        synchronized (completed) {
            while (completed.isEmpty()) {
//                System.out.println("Waiting");
                completed.wait();
            }
        }
        return (T) completed.removeFirst();
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        FutureTask<T> futureWorkerTask = new FutureTask<>(() -> invokeAny(tasks));
        try {
            return (T) futureWorkerTask.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return null;
        }
    }

    @Override
    public void execute(Runnable task) {
        FutureTask<String> futureTask = new FutureTask<>(task, "RESULT");
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
    }

    private void submitToStop() {
        synchronized (taskQueue) {
//            System.out.println("added to queue");
            taskQueue.add(new MyTask(true, null));
            taskQueue.notify();
        }
    }

    private class MyWorkerThread extends Thread {
        public void run() {
            while (true) {
                MyTask myTask;
                try {
                    synchronized (taskQueue) {
                        while (taskQueue.isEmpty())
                            taskQueue.wait();
                        myTask = taskQueue.removeFirst();
                    }
                    if (myTask.toStop)
                        break;
                    FutureTask futureTask = myTask.getFutureTask();
                    futureTask.run();
//                    System.out.println("Running");
                    synchronized (completed) {
                        completed.add(futureTask.get());
                        completed.notifyAll();
                    }
                } catch (Exception e) {
                    break;
                }
            }
        }
    }

    private static class MyTask {
        private FutureTask task;
        private final boolean toStop;

        MyTask(boolean toStop, FutureTask task) {
            this.task = task;
            this.toStop = toStop;
        }

        public FutureTask getFutureTask() {
            return task;
        }
    }
}

1 Answers1

1

You have your synchronized queue in place, it should work (I'm just looking at your submit). Just verify this thoroughly though, write some proper multi-threaded tests for each public method. Instead of LinkedList there is always also the ConcurrentLinkedQueue, it will handle the synchronization aspects for you internally; have a look over here How to use ConcurrentLinkedQueue?

Editing in answering your comment, on the invokeAny() method:

So in the API description, it says that it's supposed to

/**
 * Executes the given tasks, returning the result
 * of one that has completed successfully (i.e., without throwing
 * an exception), if any do. Upon normal or exceptional return,
 * tasks that have not completed are cancelled.
 * The results of this method are undefined if the given
 * collection is modified while this operation is in progress.
 *
 * @param tasks the collection of tasks
 * @param <T> the type of the values returned from the tasks
 * @return the result returned by one of the tasks
 * @throws InterruptedException if interrupted while waiting
 * @throws NullPointerException if tasks or any element task
 *         subject to execution is {@code null}
 * @throws IllegalArgumentException if tasks is empty
 * @throws ExecutionException if no task successfully completes
 * @throws RejectedExecutionException if tasks cannot be scheduled
 *         for execution
 */
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

and looking at your code, this seems to be what you are doing.

Then I just peeked into java's own 'main' basic implementation of this, the

java.util.concurrent.AbstractExecutorService

and it does, of course, a whole lot more (I guess your IDE, like my Intellij, includes a decompiler - have a peek yourself).

As Java itself comes with 'industrial strength' variants of ExecutorService, it's not common to build one yourself. What you customize is rather 'one-level-up', what a worker thread is supposed to do, put some effort in providing some means of task-ID handling, what to do in case of various exceptions, timeouts, etc, etc, etc. It's unusual to write your own ExecutorService but it's surely a good exercise to see how Java itself implements this stuff, you will see what aspects of concurrency you should always take special consideration of when writing code for concurrency at all.

Ola Aronsson
  • 411
  • 4
  • 7