4

This is the first time I've ever created a multi-threaded application in Java that that will run continuously until canceled and I'm having trouble shutting down/interrupting my threads.

I have some threads that communicate with a Mediator which encapsulates a TransferQueue, an ExecutorService, and facilitates communication between producing and consuming threads.

I'm using this Mediator instead of Future because TransferQueue is a lot more block-friendly when it comes to a single consumer handling multiple producers (producer threads can mediator.put(E e) any time they want, and consumer threads can just wait on E e = mediator.take() for something to be available) and I do not want to waste CPU cycles polling.

The design is very clean, fast, and effective, but I'm having trouble interrupting blocking on queue.take(), serverSocket.accept(), and interrupting the threads as a whole.


The producers:

public class SocketProducer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);

        while (runnable) {
            Socket socket = serverSocket.accept(); // blocks until connection

            mediator.putIntoQueue(socket);
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        serverSocket.close();
        // ?
    }
}

and the consumer:

private class SocketConsumer implements Colleague<Socket> {
    private Mediator<Socket> mediator;
    private volatile boolean runnable = true;

    public SomeConsumer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Void call() throws Exception {
        while (runnable) {
            Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        }

        return null;
    }

    public void interrupt() {
        // ?
        runnable = false;
        // ?
    }
}

The Colleague interface just extends Callable, to give some additional capability to the Mediator in managing its producer/consumer colleagues (ie: calling for:each colleague.interrupt()).

I've tried a lot of methods, throwing InterruptedException in various places, catching InterruptedException in various places, letting threads return an instance of their Thread to the mediator for interruption. Everything I've tried has been so ineffective that it feels like I'm missing some crucial piece to this puzzle.

So far the most effective method I've seen is the poison pill (which would be great if the queues didn't throw NPE on a null insertion), and all the methods I've tried of introducing a poison generic have failed because of ClassCastException (trying to cast Object to Socket, trying to instantiate a generic Socket, etc.).

I'm really not sure where to go from here. I'd really like to be able to cleanly terminate these threads on demand.


Completed solutions:

public class SocketProducer implements Colleague<Socket> {
    private static final Logger logger = LogManager.getLogger(SocketProducer.class.getName());
    private Mediator<Socket> mediator;
    private ServerSocket serverSocket;
    private Integer listeningPort;

    private volatile boolean runnable = true;

    public SocketProducer(Mediator<Socket> mediator) {
        this.mediator = mediator;
    }

    public Colleague<Socket> setListeningPort(Integer listeningPort) {
        this.listeningPort = listeningPort;
        return this;
    }

    public Void call() throws Exception {
        serverSocket = new ServerSocket(listeningPort, 10);
        logger.info("Listening on port " + listeningPort);

        while (runnable) {
            try {
                Socket socket = serverSocket.accept();
                logger.info("Connected on port " + socket.getLocalPort());
                mediator.putIntoQueue(socket);
            } catch (SocketException e) {
                logger.info("Stopped listening on port " + listeningPort);
            }
        }

        return null;
    }

    public void interrupt() {
        try {
            runnable = false;
            serverSocket.close();
        } catch (IOException e) {
            logger.error(e);
        }
    }

}

public class SocketConsumer implements Colleague<Socket> {
    private static final Logger logger = getLogger(SocketConsumer.class.getName());
    private Mediator<Socket> socketMediator;

    public SocketConsumer(Mediator<Socket> mediator) {
        this.socketMediator = mediator;
    }

    public Void call() throws Exception {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Socket socket = socketMediator.takeFromQueue();
                logger.info("Received socket on port: " + socket.getLocalPort());
            } catch (InterruptedException e) {
                logger.info("Interrupted.");
                Thread.currentThread().interrupt();
            }
        }
        return null;
    }

    public void interrupt() {
        Thread.currentThread().interrupt();
    }

}
Community
  • 1
  • 1
CodeShaman
  • 2,131
  • 2
  • 19
  • 34
  • 3
    The first obvious thing I can see right now is that you're missing `volatile` next to `runnable` field. Since you're trying to interrupt a thread from another thread, you need to ensure visibility of field changes. I'd also make it plain `boolean`, not `Boolean`. – Andrew Logvinov Sep 01 '14 at 20:13
  • Maybe I can wrap Socket and ServerSocket into their own classes, which would allow me to create unique poison pill instances of them? – CodeShaman Sep 01 '14 at 20:23
  • If you call `ServerSocket#close()` then the blocking `accept()` call will throw an exception... does that help at all? – Alex Lockwood Sep 01 '14 at 20:30
  • Terminating the entire JVM is not an option? – meriton Sep 01 '14 at 21:19

2 Answers2

2

I think poison pills will only make things more complicated, so I'd keep it simple.

As for the ServerSocket, this answer suggests that calling close() should be enough to interrupt it.

As for BlockingQueue, consumer can look like this:

// you can use volatile flag instead if you like
while (!Thread.currentThread.isInterrupted()) {
    try {
        Object item = queue.take();
        // do something with item 
    } catch (InterruptedException e) {
        log.error("Consumer interrupted", e);
        Thread.currentThread().interrupt(); // restore flag
    }
}

Then in your Mediator you can just call interrupt() on a consumer thread.

Community
  • 1
  • 1
Andrew Logvinov
  • 21,181
  • 6
  • 52
  • 54
  • A closing of the `ServerSocket` would yield nothing IMO as the [TranferQueue.transfer(E)-method](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TransferQueue.html#transfer%28E%29) seems to be the blocking sequence. Closing the socket would just close the stream the socket would provide but would not remove the socket from the queue. Therefore, either the mediator (queue) needs to be interrupted or instead of a blocking statement a polling/blocking with timeout method could be used. – Roman Vottner Sep 01 '14 at 20:45
  • @RomanVottner There are actually two blocking operations (socket and queue) in producer and both need to be handled. – Andrew Logvinov Sep 01 '14 at 20:49
  • I swear I've tried this before, but suddenly it's working wonderfully. I guess I was hoping for one solution that worked for both cases, but thankfully I can work around it. The consumer interrupts cleanly, but I have to use a runnable flag and close the sockets on the producer. It completely ignores thread interruption. – CodeShaman Sep 01 '14 at 20:54
  • @AndrewLogvinov ah, I missed somehow the blocking of `serverSocket.accept()` therefore you are of course right :) – Roman Vottner Sep 01 '14 at 21:00
1

A poison pill is straight forward.

private static Socket poisonPill = new Socket();

public Void call() throws Exception {
    while (runnable) {
        Socket socket = mediator.takeFromQueue(); // blocks until element is in queue
        if (socket == poisonPill) { 
            // quit the thread... 
        }
    }

    return null;
}

Note the socket == poisonPill. This is an equality check that they're the exact same instance, so that's how the poisonPill works yet still being type safe.

Will Hartung
  • 115,893
  • 19
  • 128
  • 203