2

I have a thread in Java that is connecting to a socket and sending information to another thread, which is processing that information.

Now, if the "producer" thread fails for any reason, I want the whole program to stop, so some sort of notification must happen.

Here's my program (very simplified):

public class Main {
  private Queue<String> q = new ConcurrentLinkedQueue();

  public static void main(String[] args) throws IOException {
    new Thread(new Producer(q)).start();
    new Thread(new Consumer(q)).start();

    // Catch any error in the producer side, then stop both consumer and producer, and do some extra work to notify me that there's an error...
  }
}

Main code just creates a shared queue, and starts both producer and consumer. So far, I guess it's ok? Now the Producer code is like this:

public class Producer implements Runnable {
  private Queue<String> q;

  public Producer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    try {
      connectToSocket();
      while(true) {
        String data = readFromSocket()
        q.offer(data);
      }
    } catch (Exception e) {
      // Something really bad happened, notify the parent thread so he stops the program...
    }
  }
}

Producer connects to socket, reads and sends to queue the string data... The consumer:

public class Consumer implements Runnable {
  private Queue<String> q;

  public Consumer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    while(true) {
      String dataFromSocket = q.poll();
      saveData(dataFromSocket);
    }
  }
}

My code does a lot more than that, but I think it's now self-explanatory what I'm trying to do. I've read about wait() and notify() but I think that wouldn't work, because I don't want to wait my thread for an exception, I want to deal with it in a better way. What are the alternatives?

In general, does my code look reasonable? Would using ExecutorService help here at all?

Thanks a lot!

Will
  • 181
  • 1
  • 14
  • Have you considered Observer pattern for this? Consumer can add an Observer on the Producer to monitor it's state. – Einar Dec 04 '16 at 20:07
  • If you dont want to `wait` for an exception in the main thread. You could also implement a timer/loop to handle other tasks while your mainThread/program is running. One being a check if an error is thrown (which can be as simple as cheching a boolean value, which would be set if an error is thrown). But before we can help you further in detail. Could you describe a but more what exact behavior you are looking for? – n247s Dec 04 '16 at 20:09
  • @Einar I don't know anything about the observer pattern, I'm not a developer. I'll read about it, thanks a lot. – Will Dec 04 '16 at 20:13
  • @n247s `You could also implement a timer/loop` in the main thread? `cheching a boolean value` that means a shared variable between thread Main and Producer, right? The behaviour I'd want is: if there's an exception in the Producer thread (for any reason) I want the Main thread to catch it, stop the Producer/Consumer thread, and do some extra work (I was thinking notify me via email that the process is broken and I should do something about it). Then exit the program, that's it. – Will Dec 04 '16 at 20:16
  • Use your queue to signal the consumer. You need to use another type than String though. Maybe an interface Data derived to StringData and to MetaData that contains information about the producer (like it crashed) – Bastien Aracil Dec 04 '16 at 20:19
  • @BastienAracil ok I signal the consumer, but I want to end the program, so I guess my aim is to notify the main thread, right? – Will Dec 04 '16 at 20:21

4 Answers4

2

you can use Thread's UncaughtExceptionHandler

Thread.setDefaultExceptionHandler(
new UncaughtExceptionHandler() {
    public void unchaughtException(Thread th, Throwable exception) {
        System.out.println("Exception from Thread" + th + ". Exception:" + exception);
    }
});

Java docs http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.UncaughtExceptionHandler.html

Manish
  • 21
  • 2
  • Thanks. Where do I define this? In the main thread? Also, how do I know there if the thread is the producer or consumer thread? – Will Dec 04 '16 at 20:18
  • yes in the main method, before running producer and consumer threads. You can give each thread a name, and if there is any exception you can get the thread name in unchaughtException method to choose how would you like to handle each scenario. – Manish Dec 04 '16 at 20:22
0

The simplest solution given your current code would be to wait for the producer thread to finish and then interrupt the consumer:

Thread producerThread = new Thread(new Producer(q));
producerThread.start();     
Thread consumerThread = new Thread(new Consumer(q));
consumerThread.start();
try {
    producerThread.join();
} finally {
    consumerThread.interrupt();
}

As you mention, an executor would give you a more general purpose way to shut down everything when you need to exit (for example, when a interrupted in the terminal with ctrl-c).

ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
Producer producer = new Producer(q);
Consumer consumer = new Consumer(q);
executor.submit(producer::run);
executor.submit(consumer::run);
Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdownNow));

Note that your cleanup would have to be more comprehensive than just shutting down the executor. You would have to close the socket beforehand to allow the threads to be interrupted.

Here is a more complete example that handles shutdown from both sides. You can test it by starting a test server with nc -l 1234. Killing either process (nc or the java client) will result in a clean exit of the other.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {
    private ExecutorService executor;
    private Socket socket;
    private AtomicBoolean running = new AtomicBoolean(true);

    public static void main(String[] args) throws IOException {
        Main main = new Main();
        main.run();
    }

    private Main() throws IOException {
        executor = Executors.newCachedThreadPool();
        socket = new Socket("localhost", 1234);
    }

    private void run() throws IOException {
        BlockingQueue<String> q = new SynchronousQueue<>();
        Producer producer = new Producer(socket, q);
        Consumer consumer = new Consumer(q);

        // Start the producer. When it ends call stop
        CompletableFuture.runAsync(producer, executor).whenComplete((status, ex) -> stop());
        // Start the consumer.
        CompletableFuture.runAsync(consumer, executor);
        // Add a shutdown hook to stop everything on break
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    private void stop() {
        if (running.compareAndSet(true, false)) { // only once
            // Close the socket to unblock the producer
            try {
                socket.close();
            } catch (IOException e) {
                // ignore
            }

            // Interrupt tasks
            executor.shutdownNow();
            try {
                // Give tasks some time to clean up
                executor.awaitTermination(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

    static class Producer implements Runnable {
        private BufferedReader in;
        private BlockingQueue<String> q;

        public Producer(Socket socket, BlockingQueue<String> q) throws IOException {
            this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            this.q = q;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    String data = in.readLine();
                    if (data == null) {
                        break;
                    }
                    q.put(data);
                }
            } catch (InterruptedException | IOException e) {
                // Fall through
            }
            System.err.println("Producer done");
        }
    }

    static class Consumer implements Runnable {
        private BlockingQueue<String> q;

        public Consumer(BlockingQueue<String> q) {
            this.q = q;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    System.out.println(q.take());
                }
            } catch (InterruptedException e) {
                // done
            }
            System.err.println("Client done");
        }
    }
}
teppic
  • 7,051
  • 1
  • 29
  • 35
  • Thanks. The producer thread never finishes really, it's a `while(true)` loop. Would what you're saying work in that case? – Will Dec 04 '16 at 20:17
  • Also: `The simplest solution given your current code` can you think of a better way of doing this (simpler)? – Will Dec 04 '16 at 20:19
  • It _will_ exit when there is an exception, which can be forced by closing the socket. What would be the normal way that processing would finish? – teppic Dec 04 '16 at 20:22
  • Ok, reading this: http://stackoverflow.com/questions/15956231/what-does-this-thread-join-code-mean it looks like `join` is blocking the main thread, isn't it? Can I not make it signal in a different way, allowing the main thread to continue it's journey and work meanwhile? – Will Dec 04 '16 at 20:30
  • In your last example with an executor, I don't see how you're handling the fact that the producer may end unexpectedly. Also, why `ScheduledExecutorService` and not (for example) a `ThreadPoolExecutor`? Just asking – Will Dec 04 '16 at 22:54
  • It doesn't handle an unexpected termination of the producer (I've added a more complete example). – teppic Dec 05 '16 at 02:25
  • Oh, and regarding the `ScheduledExecutorService`, `submit` returns a `Future` that can be used as a handle to monitor the completion of the process. I've used a `CompletableFuture` in the edited example to achieve the same effect. – teppic Dec 05 '16 at 02:27
-1

Start consumer thread as 'daemon' thread

Mark the consumer thread as 'daemon' and let the main thread end too:

From the Java API doc for Thread.setDaemon(boolean):

Marks this thread as either a daemon thread or a user thread. The Java Virtual Machine exits when the only threads running are all daemon threads.

public class Main {
    private Queue<String> q = new ConcurrentLinkedQueue();

    public static void main(String[] args) throws IOException {
        Thread producerThread = new Thread(new Producer(q));
        // producerThread.setDefaultUncaughtExceptionHandler(...);
        producerThread.start();
        Thread consumerThread = new Thread(new Consumer(q));
        consumerThread.setDeamon(true);
        consumerThread.start();
    }

}

This way, your application automatically stops, when the main thread and the producer-thread have terminated (sucessfully or by exception).

You could combine this with the UncaughtExceptionHandler as @Manish suggested, if the main thread needs to know about the producerThread failing...

Markus Mitterauer
  • 1,560
  • 1
  • 14
  • 28
  • What if the asker need that the `Consumer ` acts like the `Producer`?Yes, you will still make the whole program to stop, but the whole program is a pile of rubbish. – Dai Kaixian Dec 05 '16 at 12:20
  • @DaiKaixian Then he would have asked for it, but what he stated was: *"Now, if the "producer" thread fails for any reason, I want the whole program to stop, so some sort of notification must happen."* – Markus Mitterauer Dec 05 '16 at 12:31
  • Yeah.And where is the `Scalability` of your answer. In short, you didn't give a really perfect answer.And you said my answer is `dirty`. So I think you are impolite. NIZHEGEXIAOBIAOZI. – Dai Kaixian Dec 06 '16 at 01:28
  • Nobody is perfect. And scalability was no requirement. The OP stated a problem and a desired outcome. I use to follow lean principles, besides others, like [KISS](https://en.wikipedia.org/wiki/KISS_principle) or [YAGNI](https://en.wikipedia.org/wiki/You_aren't_gonna_need_it)... – Markus Mitterauer Dec 07 '16 at 08:05
-1

How about volatile?

public class Main {
  volatile boolean isStopMain = false;
  private Queue<String> q = new ConcurrentLinkedQueue();

  public static void main(String[] args) throws IOException {
    new Thread(new Producer(q)).start(); 
    new Thread(new Consumer(q)).start();
    // Catch any error in the producer side, then stop both consumer and producer, and do some extra work to notify me that there's an error...
    while (true) {
       if(isStopMain){
      System.exit(0); //or other operation to stop the main thread.
     }
    }
  }
}

And In Producer:

public class Producer implements Runnable {
  private Queue<String> q;

  public Producer(Queue<String> q) {
    this.q = q;
  }

  public void run() {
    try {
      connectToSocket();
      while(true) {
        String data = readFromSocket()
        q.offer(data);
      }
    } catch (Exception e) {
  // Something really bad happened, notify the parent thread so he stops the program...
      Main.isStopMain = true;
    }
  }
}

And I wonder if you are trying to kill the parent thread in child thread? If yes, here is something you may need to know:How do I terminate parent thread from child?

Dai Kaixian
  • 1,045
  • 2
  • 14
  • 24
  • Why introduce dirty and unnecessary cross-thread dependencies when you could achive the very same result using the `UncaughtExceptionHandler` as @Manish suggested? – Markus Mitterauer Dec 05 '16 at 10:49
  • It's my freedom to answer the question and give the answer which I think may help to fix the problem. – Dai Kaixian Dec 05 '16 at 11:02
  • Of course it is. And it's my freedom to think that it's no good solution, for the mentioned reason. (in opposite of uncommented downvoting as revenge.) – Markus Mitterauer Dec 05 '16 at 11:12
  • I am not fixing bug just for fixing bug.I am learning how to be more professional as a programmer.So know more conception about `Concurrency` is not a bad thing. – Dai Kaixian Dec 05 '16 at 12:03
  • I appologize for my poor choice of words, it was not my intention to hurt your feelings or make you angry. But I still think it's not a clean solution if it introduces depencies between objects that are not necessary. – Markus Mitterauer Dec 07 '16 at 08:00