7

I have the following code:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

Because both iterator and task are slow it makes sense to put those into separate threads. Here is a quick and dirty attempt for an Iterator wrapper:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

However this implementation lacks support for "hasNext()". It would be ok of course for the hasNext() method to block until it knows whether to return true or not. I could have a peek object in my AsyncIterator and I could change hasNext() to take an object from the queue and have next() return this peek. But this would cause hasNext() to block indefinitely if the delegate iterator's end has been reached.

Instead of utilizing the ArrayBlockingQueue I could of course do thread communication myself:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

However all the extra synchronizations, waits and notifys don't really make the code any more readable and it is easy to hide a race condition somewhere.

Any better ideas?

Update

Yes I do know about common observer/observable patterns. However the usual implementations don't foresee an end to the flow of data and they are not iterators.

I specifically want an iterator here, because actually the above mentioned loop exists in an external library and it wants an Iterator.

yankee
  • 38,872
  • 15
  • 103
  • 162
  • sounds like a classic Producer/Consumer problem http://stackoverflow.com/questions/2332537/producer-consumer-threads-using-a-queue except you want only one thread for each – Liviu Stirb Jan 15 '14 at 17:30
  • 5
    Just use the iterator normally, and dump the tasks into an `ExecutorService`. This shouldn't require reinventing abstractions. – Louis Wasserman Jan 15 '14 at 17:35
  • 1
    Consider using rxjava (https://github.com/Netflix/RxJava): It does exactly what you are trying to do. It is a library centered around an async iterable type called "Observable". It is fully fleshed out with a full suite of transformations, aggregations, and concurrency features. – user2684301 Jan 15 '14 at 18:34
  • @LouisWasserman: No, I specifically need an implementation if Iterator (updated my question). – yankee Jan 15 '14 at 19:14
  • @yankee Why don't you simply call the external library method that expects an iterator from another thread and pass a plain old iterator? Do you really get any benefit from iterating and processing the elements in two different threads? – isnot2bad Jan 15 '14 at 21:03
  • 1
    @isnot2bad: Yes exactly. The iterator creates lots of I/O load and the processing creates lots of CPU load. If I do this in sync my CPU and hard disk take turns in idling around with nothing to do while the other one is on stress. – yankee Jan 15 '14 at 21:25

2 Answers2

7

This is a tricky one, but I think I got the right answer this time. (I deleted my first answer.)

The answer is to use a sentinel. I haven't tested this code, and I removed try/catches for clarity:

public class AsyncIterator<T> implements Iterator<T> {

    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
    private T sentinel = (T) new Object();
    private T next;

    private AsyncIterator(final Iterator<T> delegate) {
        new Thread() {
            @Override
            public void run() {
                while (delegate.hasNext()) {
                    queue.put(delegate.next());
                }
                queue.put(sentinel);
            }
        }.start();
    }

    @Override
    public boolean hasNext() {
        if (next != null) {
            return true;
        }
        next = queue.take(); // blocks if necessary
        if (next == sentinel) {
            return false;
        }
        return true;
    }

    @Override
    public T next() {
        T tmp = next;
        next = null;
        return tmp;
    }

}

The insight here is that hasNext() needs to block until the next item is ready. It also needs some kind of quit condition, and it can't use an empty queue or a boolean flag for that because of threading issues. A sentinel solves the problem without any locking or synchronization.

Edit: cached "next" so hasNext() can be called more than once.

ccleve
  • 15,239
  • 27
  • 91
  • 157
  • That looks great! Actually I thought about a solution like that but I never tried it because I did not want to use `null` as sentinel (could be part of the list) and I thought `(T) new Object()` would surely cause a ClassCastException and thus never tried it. However of course you are right! Thinking about it, it makes perfect sense. – yankee Jan 17 '14 at 13:11
  • However there is one (solvable) problem here: `hasNext()` might be called more often than `next()`. So: `iterator.hasNext(); iterator.hasNext(); sysout(iterator.next());` should output the same as `iterator.hasNext(); sysout(iterator.next());` but here this is different. You need to cache the result of hasNext(). I propose you implement that and then I'll be glad to accept your answer :-). – yankee Jan 17 '14 at 13:14
  • @ccleve The background thread reading from the delegate should be a daemon thread, else the application can stay hanging when triggered to shutdown with a shutdown-hook. I tested your implementation a little bit and it appears to work OK. – vanOekel Jan 18 '14 at 12:51
  • @ccleve: This won't work for null elements from the iterator. It's not hard to change (I actually went for an additional boolean hasNextKnown property). But if there are no null elements, this would be fine. I'll accept your answer, however I suggest that you either fix the null issue or add a remark about the limitations for other people who find this code on google and try to use it naively. – yankee Jan 18 '14 at 20:18
  • @ccleve I have an improved (full) version of the AsyncIterator. Should I edit the code in the answer to show my version? – vanOekel Jan 19 '14 at 16:51
3

Or save yourself the headache and use RxJava:

import java.util.Iterator;

import rx.Observable;
import rx.Scheduler;
import rx.observables.BlockingObservable;
import rx.schedulers.Schedulers;

public class RxAsyncIteratorExample {

    public static void main(String[] args) throws InterruptedException {
        final Iterator<Integer> slowIterator = new SlowIntegerIterator(3, 7300);

        // the scheduler you use here will depend on what behaviour you
        // want but io is probably what you want
        Iterator<Integer> async = asyncIterator(slowIterator, Schedulers.io());
        while (async.hasNext()) {
            performLengthTask(async.next());
        }
    }

    public static <T> Iterator<T> asyncIterator(
            final Iterator<T> slowIterator,
            Scheduler scheduler) {

        final Observable<T> tObservable = Observable.from(new Iterable<T>() {
            @Override
            public Iterator<T> iterator() {
                return slowIterator;
            }
        }).subscribeOn(scheduler);

        return BlockingObservable.from(tObservable).getIterator();
    }

    /**
     * Uninteresting implementations...
     */
    public static void performLengthTask(Integer integer)
            throws InterruptedException {
        log("Running task for " + integer);
        Thread.sleep(10000l);
        log("Finished task for " + integer);
    }

    private static class SlowIntegerIterator implements Iterator<Integer> {
        private int count;
        private final long delay;

        public SlowIntegerIterator(int count, long delay) {
            this.count = count;
            this.delay = delay;
        }

        @Override
        public boolean hasNext() {
            return count > 0;
        }

        @Override
        public Integer next() {
            try {
                log("Starting long production " + count);
                Thread.sleep(delay);
                log("Finished long production " + count);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return count--;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private static final long startTime = System.currentTimeMillis();

    private static void log(String s) {
        double time = ((System.currentTimeMillis() - startTime) / 1000d);
        System.out.println(time + ": " + s);
    }
}

Gives me:

0.031: Starting long production 3
7.332: Finished long production 3
7.332: Starting long production 2
7.333: Running task for 3
14.633: Finished long production 2
14.633: Starting long production 1
17.333: Finished task for 3
17.333: Running task for 2
21.934: Finished long production 1
27.334: Finished task for 2
27.334: Running task for 1
37.335: Finished task for 1
Duncan Irvine
  • 141
  • 1
  • 4
  • Hi, I've seen BlockingObservable is not recommended by Rx docs. Is it the only way to make a normal iterator of many items into an observable, because that's what i'm trying to do. – WindRider May 04 '16 at 16:20