This is the first time I've ever created a multi-threaded application in Java that that will run continuously until canceled and I'm having trouble shutting down/interrupting my threads.
I have some threads that communicate with a Mediator which encapsulates a TransferQueue, an ExecutorService, and facilitates communication between producing and consuming threads.
I'm using this Mediator instead of Future because TransferQueue is a lot more block-friendly when it comes to a single consumer handling multiple producers (producer threads can mediator.put(E e) any time they want, and consumer threads can just wait on E e = mediator.take() for something to be available) and I do not want to waste CPU cycles polling.
The design is very clean, fast, and effective, but I'm having trouble interrupting blocking on queue.take(), serverSocket.accept(), and interrupting the threads as a whole.
The producers:
public class SocketProducer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
while (runnable) {
Socket socket = serverSocket.accept(); // blocks until connection
mediator.putIntoQueue(socket);
}
return null;
}
public void interrupt() {
// ?
runnable = false;
serverSocket.close();
// ?
}
}
and the consumer:
private class SocketConsumer implements Colleague<Socket> {
private Mediator<Socket> mediator;
private volatile boolean runnable = true;
public SomeConsumer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Void call() throws Exception {
while (runnable) {
Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
}
return null;
}
public void interrupt() {
// ?
runnable = false;
// ?
}
}
The Colleague interface just extends Callable, to give some additional capability to the Mediator in managing its producer/consumer colleagues (ie: calling for:each colleague.interrupt()).
I've tried a lot of methods, throwing InterruptedException in various places, catching InterruptedException in various places, letting threads return an instance of their Thread to the mediator for interruption. Everything I've tried has been so ineffective that it feels like I'm missing some crucial piece to this puzzle.
So far the most effective method I've seen is the poison pill (which would be great if the queues didn't throw NPE on a null insertion), and all the methods I've tried of introducing a poison generic have failed because of ClassCastException (trying to cast Object to Socket, trying to instantiate a generic Socket, etc.).
I'm really not sure where to go from here. I'd really like to be able to cleanly terminate these threads on demand.
Completed solutions:
public class SocketProducer implements Colleague<Socket> {
private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
private Mediator<Socket> mediator;
private ServerSocket serverSocket;
private Integer listeningPort;
private volatile boolean runnable = true;
public SocketProducer(Mediator<Socket> mediator) {
this.mediator = mediator;
}
public Colleague<Socket> setListeningPort(Integer listeningPort) {
this.listeningPort = listeningPort;
return this;
}
public Void call() throws Exception {
serverSocket = new ServerSocket(listeningPort, 10);
logger.info("Listening on port " + listeningPort);
while (runnable) {
try {
Socket socket = serverSocket.accept();
logger.info("Connected on port " + socket.getLocalPort());
mediator.putIntoQueue(socket);
} catch (SocketException e) {
logger.info("Stopped listening on port " + listeningPort);
}
}
return null;
}
public void interrupt() {
try {
runnable = false;
serverSocket.close();
} catch (IOException e) {
logger.error(e);
}
}
}
public class SocketConsumer implements Colleague<Socket> {
private static final Logger logger = getLogger(SocketConsumer.class.getName());
private Mediator<Socket> socketMediator;
public SocketConsumer(Mediator<Socket> mediator) {
this.socketMediator = mediator;
}
public Void call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
Socket socket = socketMediator.takeFromQueue();
logger.info("Received socket on port: " + socket.getLocalPort());
} catch (InterruptedException e) {
logger.info("Interrupted.");
Thread.currentThread().interrupt();
}
}
return null;
}
public void interrupt() {
Thread.currentThread().interrupt();
}
}