I have the following code:
while(slowIterator.hasNext()) {
performLengthTask(slowIterator.next());
}
Because both iterator and task are slow it makes sense to put those into separate threads. Here is a quick and dirty attempt for an Iterator wrapper:
class AsyncIterator<T> implements Iterator<T> {
private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while(delegate.hasNext()) {
queue.put(delegate.next()); // try/catch removed for brevity
}
}
}.start();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
return queue.take(); // try/catch removed for brevity
}
// ... remove() throws UnsupportedOperationException
}
However this implementation lacks support for "hasNext()". It would be ok of course for the hasNext() method to block until it knows whether to return true or not. I could have a peek object in my AsyncIterator and I could change hasNext() to take an object from the queue and have next() return this peek. But this would cause hasNext() to block indefinitely if the delegate iterator's end has been reached.
Instead of utilizing the ArrayBlockingQueue I could of course do thread communication myself:
private static class AsyncIterator<T> implements Iterator<T> {
private final Queue<T> queue = new LinkedList<T>();
private boolean delegateDone = false;
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while (delegate.hasNext()) {
final T next = delegate.next();
synchronized (AsyncIterator.this) {
queue.add(next);
AsyncIterator.this.notify();
}
}
synchronized (AsyncIterator.this) {
delegateDone = true;
AsyncIterator.this.notify();
}
}
}.start();
}
@Override
public boolean hasNext() {
synchronized (this) {
while (queue.size() == 0 && !delegateDone) {
try {
wait();
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
return queue.size() > 0;
}
@Override
public T next() {
return queue.remove();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
However all the extra synchronizations, waits and notifys don't really make the code any more readable and it is easy to hide a race condition somewhere.
Any better ideas?
Update
Yes I do know about common observer/observable patterns. However the usual implementations don't foresee an end to the flow of data and they are not iterators.
I specifically want an iterator here, because actually the above mentioned loop exists in an external library and it wants an Iterator.