I am implementing an ExecutorService on my own to learn about the Java internals. While the ExecutorService works when called upon in a single thread, I am not sure as to how implement it such that multiple threads can submit tasks to the same executor service concurrently. How do I implement this functionality into my existing code ?
The code I have written is as follows.
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
public class MyExecutorService implements ExecutorService {
private final MyWorkerThread[] workerThreads;
private final LinkedList<MyTask> taskQueue;
private final LinkedList<Object> completed;
private boolean terminated;
private boolean hasBeenShutDown;
private final Object shutDownLock;
public MyExecutorService(int numThreads) {
workerThreads = new MyWorkerThread[numThreads];
completed = new LinkedList<>();
taskQueue = new LinkedList<>();
shutDownLock = new Object();
for (int i = 0; i < numThreads; i++) {
workerThreads[i] = new MyWorkerThread();
workerThreads[i].start();
}
terminated = false;
hasBeenShutDown = false;
}
@Override
public void shutdown() {
hasBeenShutDown = true;
for (int i = 0; i < workerThreads.length; i++) {
submitToStop();
}
for (MyWorkerThread worker : workerThreads) {
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (shutDownLock) {
shutDownLock.notifyAll();
}
}
@Override
public List<Runnable> shutdownNow() {
hasBeenShutDown = true;
for (MyWorkerThread workerThread : workerThreads) {
workerThread.interrupt();
}
LinkedList<Runnable> pendingTasks = new LinkedList<>();
for (MyTask task : taskQueue) {
pendingTasks.add(task.getFutureTask());
}
return pendingTasks;
}
@Override
public boolean isTerminated() {
return terminated;
}
@Override
public boolean isShutdown() {
return hasBeenShutDown;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
System.out.println("Awaiting Termination");
synchronized (shutDownLock) {
try {
shutDownLock.wait(unit.convert(timeout, unit));
terminated = true;
} catch (InterruptedException e) {
// ignore
}
}
return hasBeenShutDown;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> futureTask = new FutureTask<>(task);
synchronized (taskQueue) {
taskQueue.add(new MyTask(false, futureTask));
taskQueue.notifyAll();
}
return futureTask;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
FutureTask<T> futureTask = new FutureTask<>(task, result);
synchronized (taskQueue) {
taskQueue.add(new MyTask(false, futureTask));
taskQueue.notifyAll();
}
return futureTask;
}
@Override
public Future<?> submit(Runnable task) {
FutureTask<?> futureTask = new FutureTask<>(task, "RESULT");
synchronized (taskQueue) {
taskQueue.add(new MyTask(false, futureTask));
taskQueue.notifyAll();
}
return futureTask;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
List<Future<T>> futures = new ArrayList<>();
for (Callable<T> task : tasks) {
futures.add(submit(task));
}
return futures;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
FutureTask<List<Future<T>>> futureWorkerTask = new FutureTask<>(() -> invokeAll(tasks));
try {
return futureWorkerTask.get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return new ArrayList<>();
}
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException {
invokeAll(tasks);
synchronized (completed) {
while (completed.isEmpty()) {
// System.out.println("Waiting");
completed.wait();
}
}
return (T) completed.removeFirst();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
FutureTask<T> futureWorkerTask = new FutureTask<>(() -> invokeAny(tasks));
try {
return (T) futureWorkerTask.get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return null;
}
}
@Override
public void execute(Runnable task) {
FutureTask<String> futureTask = new FutureTask<>(task, "RESULT");
synchronized (taskQueue) {
taskQueue.add(new MyTask(false, futureTask));
taskQueue.notifyAll();
}
}
private void submitToStop() {
synchronized (taskQueue) {
// System.out.println("added to queue");
taskQueue.add(new MyTask(true, null));
taskQueue.notify();
}
}
private class MyWorkerThread extends Thread {
public void run() {
while (true) {
MyTask myTask;
try {
synchronized (taskQueue) {
while (taskQueue.isEmpty())
taskQueue.wait();
myTask = taskQueue.removeFirst();
}
if (myTask.toStop)
break;
FutureTask futureTask = myTask.getFutureTask();
futureTask.run();
// System.out.println("Running");
synchronized (completed) {
completed.add(futureTask.get());
completed.notifyAll();
}
} catch (Exception e) {
break;
}
}
}
}
private static class MyTask {
private FutureTask task;
private final boolean toStop;
MyTask(boolean toStop, FutureTask task) {
this.task = task;
this.toStop = toStop;
}
public FutureTask getFutureTask() {
return task;
}
}
}