-1

I want to have a thread which does some I/O work when it is interrupted by a main thread and then go back to sleep/wait until the interrupt is called back again.

So, I have come up with an implementation which seems to be not working. The code snippet is below.

Note - Here the flag is a public variable which can be accessed via the thread class which is in the main class

// in the main function this is how I am calling it
if(!flag) {
    thread.interrupt()
}

//this is how my thread class is implemented
class IOworkthread extends Thread {
@Override
public void run() {
    while(true) {
         try {
             flag = false;
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             flag = true;
             try {
                  // doing my I/O work
             } catch (Exception e1) {
                   // print the exception message
             }
         }
    }
}
}

In the above snippet, the second try-catch block catches the InterruptedException. This means that both of the first and second try-catch block are catching the interrupt. But I had only called interrupt to happen during the first try-catch block.

Can you please help me with this?

EDIT If you feel that there can be another solution for my objective, I will be happy to know about it :)

unkemptArc99
  • 39
  • 1
  • 5
  • That's why "monitors" has been devised. https://stackoverflow.com/questions/3362303/whats-a-monitor-in-java – Soner from The Ottoman Empire Jun 02 '19 at 06:41
  • @snr But I am not changing the ``flag`` in my main thread, so I thought I do not need to use monitors in this case. I am a bit of noob in the programming world and I might be wrong, please correct me if I am. – unkemptArc99 Jun 02 '19 at 06:45
  • Arent't you polling the "flag" in main? If not, where? – Soner from The Ottoman Empire Jun 02 '19 at 06:48
  • @snr Yes, I am polling the ``flag``, but not changing it. I mean, I don't see any critical section to use monitors. Basically, the ``flag`` is such an indicator which indicates that the thread is doing the intended I/O work and there is no requirement to poll it yet again. – unkemptArc99 Jun 02 '19 at 06:50
  • 1
    Are you calling `interrupt () ` to wake up the thread? That's the opposite of what it's intended for. To give you a proper alternative solution, you'll have to give some more info, like where the walking up is coming from and what condition it's based on. – daniu Jun 02 '19 at 07:43
  • @daniu Yes, I am calling ``interrupt()`` to wake the thread from sleep. I am sorry if it is not supposed to be used this way. The I/O thread is being given the wake up call from the main thread which has created the I/O thread. The only condition is that if it is already doing the I/O work, there is no need to ``interrupt()`` it at that time. But this is precisely what is actually happening. Basically, the I/O thread is supposed to do some I/O task and give a result, while the main thread can do its computation work. – unkemptArc99 Jun 02 '19 at 08:51
  • I mean what triggers the main thread to even query whether io is already running or start it not? Würdet is the result supposed to go? This looks like an XY problem to me. – daniu Jun 02 '19 at 09:00
  • @daniu This is a code for a distributed algo. There is no trigger. It is part of the algo, which makes the IO process necessary after a certain computation step. To increase the performance, I sought making a thread to do the IO work for me. The IO work does not overlap with the computation. The result of the IO process is kept in a set which the computation part can access later on using monitors, i.e., the ``synchronized`` method. – unkemptArc99 Jun 03 '19 at 04:49
  • So there _is_ a trigger, namely the "completion of a certain computation step". What data does this computation provide that makes the io possible/necessary? Or is it just a query for more external data? What kind of data does the io provide to the computation? Will the created `Set` be consumed, or will its elements remain in there? Right now, it sounds like it might be a classical producer/consumer problem with the io providing input for the computation which will then work on it. – daniu Jun 03 '19 at 07:13
  • @daniu The IO is just a query for more external data. The ``set`` contains some integer data that IO provides to the computation part, and the computation part consumes the set's elements. – unkemptArc99 Jun 03 '19 at 17:34

2 Answers2

0

If it's important to respond fast to the flag you could try the following:

class IOworkthread extends Thread {//implements Runnable would be better here, but thats another story
    @Override
    public void run() {
        while(true) {
            try {
                flag = false;
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                flag = true;
            }
            //after the catch block the interrupted state of the thread should be reset and there should be no exceptions here
            try {
                // doing I/O work
            }
            catch (Exception e1) {
                // print the exception message
                // here of course other exceptions could appear but if there is no Thread.sleep() used here there should be no InterruptedException in this block
            }
        }
    }
}

This should do different because in the catch block when the InterruptedException is caught, the interrupted flag of the thread is reset (at the end of the catch block).

Tobias
  • 2,547
  • 3
  • 14
  • 29
0

It does sound like a producer/consumer construct. You seem to kind of have it the wrong way around, the IO should be driving the algorithm. Since you stay very abstract in what your code actually does, I'll need to stick to that.

So let's say your "distributed algorithm" works on data of type T; that means that it can be described as a Consumer<T> (the method name in this interface is accept(T value)). Since it can run concurrently, you want to create several instances of that; this is usually done using an ExecutorService. The Executors class provides a nice set of factory methods for creating one, let's use Executors.newFixedThreadPool(parallelism).

Your "IO" thread runs to create input for the algorithm, meaning it is a Supplier<T>. We can run it in an Executors.newSingleThreadExecutor().

We connect these two using a BlockingQueue<T>; this is a FIFO collection. The IO thread puts elements in, and the algorithm instances take out the next one that becomes available.

This makes the whole setup look something like this:

void run() {
    int parallelism = 4; // or whatever
    ExecutorService algorithmExecutor = Executors.newFixedThreadPool(parallelism);
    ExecutorService ioExecutor = Executors.newSingleThreadExecutor();

    // this queue will accept up to 4 elements
    // this might need to be changed depending on performance of each
    BlockingQueue<T> queue = new ArrayBlockingQueue<T>(parallelism);
    ioExecutor.submit(new IoExecutor(queue));

    // take element from queue
    T nextElement = getNextElement(queue);
    while (nextElement != null) {
        algorithmExecutor.submit(() -> new AlgorithmInstance().accept(nextElement));
        nextElement = getNextElement(queue);
        if (nextElement == null) break;
    }
    // wait until algorithms have finished running and cleanup
    algorithmExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.YEARS);
    algorithmExecutor.shutdown();
    ioExecutor.shutdown(); // the io thread should have terminated by now already
}

T getNextElement(BlockingQueue<T> queue) {
    int timeOut = 1; // adjust depending on your IO
    T result = null;
    while (true) {
        try {
            result = queue.poll(timeOut, TimeUnits.SECONDS);
        } catch (TimeoutException e) {} // retry indefinetely, we will get a value eventually
    }
    return result;
}

Now this doesn't actually answer your question because you wanted to know how the IO thread can be notified when it can continue reading data.

This is achieved by the limit to the BlockingQueue<> which will not accept elements after this has been reached, meaning the IO thread can just keep reading and try to put in elements.

abstract class IoExecutor<T> {
    private final BlockingQueue<T> queue;
    public IoExecutor(BlockingQueue<T> q) { queue = q; }
    public void run() {
        while (hasMoreData()) {
            T data = readData();
            // this will block if the queue is full, so IO will pause
            queue.put(data);
        }
        // put null into queue
        queue.put(null);
    }
    protected boolean hasMoreData();
    protected abstract T readData();
}

As a result during runtime you should at all time have 4 threads of the algorithm running, as well as (up to) 4 items in the queue waiting for one of the algorithm threads to finish and pick them up.

daniu
  • 14,137
  • 4
  • 32
  • 53