5

Let's say I have the following method I want to refactor

protected Stream<T> parseFile(File file, Consumer<File> cleanup) {
  try {
    return parser.parse(file); // returns a Stream<T>
  } catch (XmlParseException e) { // child of RuntimeException
    throw new CustomRuntimeException(e);
  } finally {
    if (file != null) {
      cleanup.accept(file);
    }
  }

  throw new IllegalStateException("Should not happen");
}

This method's purpose is to act as a proxy attaching error handling on the stream rethrowing in a wrapping exception CustomRuntimeException. So when we consume it later in the flow, I don't have to handle those exceptions everywhere but only CustomRuntimeException.

Upstream, I used that method as follow

try {
  Stream<T> stream = parseFile(someFile);
  stream.map(t -> ...);
catch (CustomRuntimeException e) {
  // do some stuff
}

And here's what the parser.parse method looks like

public Stream<T> parse() {
  // ValueIterator<T> implements Iterator<T>, AutoCloseable
  XmlRootParser.ValueIterator<T> valueIterator = new XmlRootParser.ValueIterator(this.nodeConverter, this.reader, this.nodeLocalName, this.nodeName);
  Stream<T> stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(valueIterator, 1040), false);
  stream.onClose(valueIterator::close);
  return stream;
}

The exceptions I want to handle will be thrown by the ValueIterator.hasNext method. Which means they won't be thrown at Stream creation but only at Stream consumption (calling foreach/map/count/collect/... on the stream).

How do I attach error handling on my stream in method parseFile nicely without having to consume the stream? Is it possible?

Obviously this code will work only if the parser.parse method consume its stream before returning it. Which is against using streams.

Jeep87c
  • 1,050
  • 16
  • 36
  • 3
    @RomanC What's the connection? – shmosel Feb 08 '18 at 03:11
  • The zipWithIndex method @RomanC right? – Jeep87c Feb 08 '18 at 03:19
  • I don't think it's possible without getting into `parser.parse()`. – shmosel Feb 08 '18 at 04:30
  • @Holger I updated my question answering your question. – Jeep87c Feb 08 '18 at 15:54
  • 2
    When the iterator’s `hasNext()` method throws an exception, the only place to encounter it, is at the terminal operation. In your question’s example, there isn’t even a terminal operation. For the specified purpose, all these `catch` blocks are useless. So, all you want is catch and translate exceptions thrown by the `hasNext()` method? – Holger Feb 08 '18 at 16:08
  • @Holger exactly! – Jeep87c Feb 08 '18 at 16:10
  • 1
    Some remaining questions: what is `T`? What’s the point of the `throw new IllegalStateException("Should not happen");` statement at a place where even the compiler will reject any statement as unreachable? Why is the caller supposed to provide a `Consumer cleanup` argument, when the caller is the one who knows when to perform the cleanup, i.e. when the stream has been consumed? Are you aware that the example caller lacks closing the stream (a `try(…) {}` statement)? How do you pass a `File` to a `parse` method that doesn’t declare any parameters? – Holger Feb 08 '18 at 16:16
  • `T` can be any object we're trying to get from a file (Xml files in our case having list of `T`s). You're correct that `throw new IllegalStateException("Should not happen");` will be rejected by compiler, it's a cleanup missed between my actual code and the simplified question code. I forgot in the caller example the closing stream logic. – Jeep87c Feb 08 '18 at 16:51

2 Answers2

6

The Stream’s backend which provides the iterator logic, is the Spliterator.

So you can wrap the element processing using a wrapper Spliterator like this:

class Wrapper<T> implements Spliterator<T> {
    final Spliterator<T> source;
    public Wrapper(Spliterator<T> source) {
        this.source = source;
    }
    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            return source.tryAdvance(action);
        }
        catch(XmlParseException ex) {
            throw new CustomRuntimeException(ex);
        }
    }
    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        try {
            source.forEachRemaining(action);
        }
        catch(XmlParseException ex) {
            throw new CustomRuntimeException(ex);
        }
    }
    @Override public Spliterator<T> trySplit() {
        Spliterator<T> srcPrefix = source.trySplit();
        return srcPrefix == null? null: new Wrapper<>(srcPrefix);
    }
    @Override public long estimateSize() { return source.estimateSize(); }
    @Override public int characteristics() { return source.characteristics(); }
    @Override public Comparator<? super T> getComparator(){return source.getComparator();}
}

It retains all properties of the original Spliterator and only translates exceptions thrown during the iteration.

Then you can use it like

protected Stream<T> parseFile(File file) {
    Stream<T> s = parser.parse();
    return StreamSupport.stream(new Wrapper<>(s.spliterator()), s.isParallel())
                        .onClose(s::close);
}

And the caller should not forget to close the stream properly:

    ResultType result;
    try(Stream<T> s = parseFile(file)) {
        result = s.
         // other intermediate ops
         // terminal operation
    }

or

    ResultType result;
    try(Stream<T> s = parseFile(file)) {
        result = s.
         // other intermediate ops
         // terminal operation
    }
    finally {
        // other cleanup actions
    }
Holger
  • 285,553
  • 42
  • 434
  • 765
  • `Splititerator` being a terminal operation, the source stream will be consumed with this code. I'm maybe asking for the impossible but is there a way to do this without calling a terminal operation on the source stream? – Jeep87c Feb 14 '18 at 22:12
  • Every operation consumes the source stream; that’s why you have to use the returned stream when chaining an intermediate operation. But intermediate operations are lazy, they do not start a traversal, and while `spliterator()` is a terminal operation, it is also lazy. – Holger Feb 15 '18 at 07:17
3

You could use helper stream initialization class that handles the process of preparing stream and catch any exception there. Consider following example:

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SafeInitializationStreamExample {

    public static void main(String[] args) {
        int sum = SafeInitializationStream.from(() -> Stream.of(1,2,3,4))
                .onInitializationError(t -> System.out.println(t.getMessage()))
                .mapToInt(it -> it)
                .sum();

        System.out.println(sum);

        List<Object> list = SafeInitializationStream.from(() -> parse("/tmp/test.log"))
                .onInitializationError(t -> System.out.println(t.getMessage()))
                .map(it -> it.toString())
                .collect(Collectors.toList());

        System.out.println(list);
    }

    private static <T> Stream<T> parse(String filename) {
        throw new RuntimeException("File does not exist!");
    }

    static class SafeInitializationStream<T> {
        private final Supplier<Stream<T>> streamSupplier;

        private SafeInitializationStream(Supplier<Stream<T>> streamSupplier) {
            this.streamSupplier = streamSupplier;
        }

        public static <T> SafeInitializationStream<T> from(Supplier<Stream<T>> streamSupplier) {
            return new SafeInitializationStream<>(streamSupplier);
        }

        public Stream<T> onInitializationError(Consumer<Throwable> onError) {
            try {
                return streamSupplier.get();
            } catch (Exception e) {
                onError.accept(e);
            }
            return Stream.empty();
        }
    }
}

In this example we introduce SafeInitializationStream class which expects a Supplier<Stream<T>>:

SafeInitializationStream.from(() -> Stream.of(1,2,3,4))

Using Supplier<Stream<T>> in this case makes stream initialization lazy - until we call Supplier.get() the body of this supplier is not executed. Now, when we call:

.onInitializationError(t -> System.out.println(t.getMessage()))

we execute suppliers body, catch any exception that may be thrown and handle this exception by passing Throwable to Consumer<Throwable> that was passed as a parameter to onInitializationError method. In case of exception Stream.empty() is returned (so you can safely apply all other transformations in chain. When there is no exception, Stream<T> provided by supplier is returned.

If you run following example you will get in console:

10
File does not exist!
[]

The first stream was consumed without any errors and the sum was returned correctly.

The second stream thrown exception during initialization, we have caught it and printed to the console and finally empty list was returned after consuming the stream.

Of course you can use Function<Throwable, Stream<T>> in onInitializationError method if you want to specify what Stream<T> is returned in case of an exception. In this case we assume that Stream.empty() is always returned in this case. Hope it helps.

Szymon Stepniak
  • 40,216
  • 10
  • 104
  • 131
  • This code actually manage exceptions gracefully on stream creation but not on stream consumption correct? I'm seeking for the later one. I updated my question. – Jeep87c Feb 08 '18 at 15:56