4

Imagine I'm building a library, that will receive a Stream of Integers, and all the library code needs to do is return a stream of Strings with the string representation of the number.

public Stream<String> convertToString(Stream<Integer> input) {
  return input.map(n -> String.valueOf(n));
}

However, imagine someone decides to invoke my function with the following code:

Stream<Integer> input = Stream.of(1,2,3)
  .map(n -> {
    if (n %3 ) { throw new RuntimeException("3s are not welcome here!"); }
    else { return n; }
  }

Stream<String> output = convertToString(input);

Since I want my API contract to be one that can handle this situation and still return a Stream of Strings, I'd like to know how to change my code so it detects that there's an exception in the stream and "recover" from the exception by mapping it to a special "NaN" value.

In the end, I want my output Stream to be "1","2","NaN"

Notes:

  • I don't want my API to have to define a special wrapper for the input.
  • I don't want to rely on a 3rd party library due to implications of dependencies.
  • I don't know how big the stream will be or at what rate it will be generated, so I can't use any sort of caching/preloading of all values.

Is this possible with Java 8 Streams?

With an Iterator, I imagine I could have done:

public class SafeIntegerToStringIterator implements Iterator<String> {
  Iterator<Integer> innerIterator;
  ...
  public String next() throws NoSuchElementException {
    try { return String.valueOf(this.innerIterator.next()); }
    catch (RuntimeException e) { return "NaN"; }
  }

}
...
public Iterator<String> convertToString(Iterator<Integer> input) {
  return new SafeIntegerToStringIterator(input);
}

Thanks

Alejandro
  • 635
  • 1
  • 6
  • 16
  • 1
    Introduce catch block inside your logic or use Java 8 Optional class (orElse) – fg78nc Jun 18 '17 at 15:40
  • What if the `Iterator` throws an exception is the `hasNext()` method? – Holger Jun 19 '17 at 07:48
  • Which `Iterator`, are you referring to @Holger? The inner or the outer (SafeIntegerToStringIterator)? In the inner, i'd hope to catch all unchecked exceptions (except for those where someone has used "sneaky throws", which i'm fine with). If the inner throws the NoSuchElementException, i just bubble it up. If it's the outer, you're referring to, i'm more in control, so i can make sure my own handling logic doesn't introduce an exception of it's own. – Alejandro Jun 19 '17 at 11:18
  • 5
    You show only the overriding `next()` method, but not, how you deal with the situation that `hasNext()` throws an exception. You may catch it, but do you assume that there is a next element or not? Keep in mind that you don’t know whether the source iterator actually advanced its internal state or not when an exception has been thrown, so assuming that there is a next element can bring you into an infinite loop, repeating the failed operation over and over again. – Holger Jun 19 '17 at 11:31

3 Answers3

3

Note: Please see the edit at the end of this post, which fixes a bug in my original answer. I'm leaving my original answer anyway, because it's still useful for many cases and I think it helps solve OP's question, at least with some restrictions.


Your approach with Iterator goes in the right direction. The solution might be drafted as follows: convert the stream to an iterator, wrap the iterator as you have already done, and then create a stream from the wrapper iterator, except that you should use a Spliterator instead. Here's the code:

private static <T> Stream<T> asNonThrowingStream(
        Stream<T> stream,
        Supplier<? extends T> valueOnException) {

    // Get spliterator from original stream
    Spliterator<T> spliterator = stream.spliterator();

    // Return new stream from wrapper spliterator
    return StreamSupport.stream(

        // Extending AbstractSpliterator is enough for our purpose
        new Spliterators.AbstractSpliterator<T>(
                spliterator.estimateSize(),
                spliterator.characteristics()) {

            // We only need to implement tryAdvance
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                try {
                    return spliterator.tryAdvance(action);
                } catch (RuntimeException e) {
                    action.accept(valueOnException.get());
                    return true;
                }
            }
        }, stream.isParallel());
}

We are extending AbstractSpliterator to wrap the spliterator returned by the original stream. We only need to implement the tryAdvance method, which either delegates to the original spliterator's tryAdvance method, or catches RuntimeException and invokes the action with the supplied valueOnException value.

Spliterator's contract specifies that the return value of tryAdvance must be true if the action is consumed, so if a RuntimeException is catched, it means that the original spliterator has thrown it from within its own tryAdvance method. Thus, we return true in this case, meaning that the element was consumed anyway.

The original spliterator's estimate size and characteristics are preserved by passing these values as arguments to the constructor of AbstractSpliterator.

Finally, we create a new stream from the new spliterator via the StreamSupport.stream method. The new stream is parallel if the original one was also parallel.

Here's how to use the above method:

public Stream<String> convertToString(Stream<Integer> input) {
    return asNonThrowingStream(input.map(String::valueOf), () -> "NaN");
}

Edit

As per Holger's comment below, user holi-java has kindly provided a solution that avoids the pitfalls pointed out by Holger.

Here's the code:

<T> Stream<T> exceptionally(Stream<T> source, BiConsumer<Exception, Consumer<? super T>> handler) {
    class ExceptionallySpliterator extends AbstractSpliterator<T>
            implements Consumer<T> {

        private Spliterator<T> source;
        private T value;
        private long fence;

        ExceptionallySpliterator(Spliterator<T> source) {
            super(source.estimateSize(), source.characteristics());
            this.fence = source.getExactSizeIfKnown();
            this.source = source;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> it = source.trySplit();
            return it == null ? null : new ExceptionallySpliterator(it);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return fence != 0 && consuming(action);
        }

        private boolean consuming(Consumer<? super T> action) {
            Boolean state = tryConsuming(action);
            if (state == null) {
                return true;
            }
            if (state) {
                action.accept(value);
                value = null;
                return true;
            }
            return false;
        }


        private Boolean tryConsuming(Consumer<? super T> action) {
            fence--;
            try {
                return source.tryAdvance(this);
            } catch (Exception ex) {
                handler.accept(ex, action);
                return null;
            }
        }

        @Override
        public void accept(T value) {
            this.value = value;
        }
    }

    return stream(new ExceptionallySpliterator(source.spliterator()), source.isParallel()).onClose(source::close);
}

Please refer to the tests if you want to further know about this solution.

fps
  • 33,623
  • 8
  • 55
  • 110
  • 3
    This is not correct. If the `Consumer` throws a `RuntimeException` within its `accept` method, it may receive another element right afterwards due to this exception handler, which violates the `tryAdvance` contract. To do this correctly, you have to catch exceptions thrown in the source `tryAdvance` method, excluding exceptions thrown by the actual consumer, which is much more complicated… – Holger Jun 19 '17 at 07:52
  • @Holger I think I don't understand. Could you please show me how you would break it? – fps Jun 19 '17 at 11:17
  • 2
    `asNonThrowingStream(Stream.of(1, 2, 3), ()->4) .collect(ArrayList::new, (l,x)->{l.add(x);if(x<4) throw new RuntimeException();}, List::addAll)` – Holger Jun 19 '17 at 11:25
  • @Holger it works as expected if you move the exception throwing lambda "before" the call to `asNonThrowingStream` (`asNonThrowingStream(Stream.of(1, 2, 3).peek(x -> Exception....`). This might be good enough for Alejandro's case. But I believe that it would still be easy to come up with examples that cause side effects. – Malte Hartwig Jun 19 '17 at 13:41
  • 2
    @Malte Hartwig: this was just a contrived example, to show something with an effect, however, with the *current stream implementation*, violations of the `tryAdvance` contract have little effect (see [Is there any danger in making the action .accept() more than one element in an implementation of Spliterator's .tryAdance()?](https://stackoverflow.com/q/36921725/2711488)). It’s possible that such misbehavior will cause real havoc in some future implementation, just like violating the comparator’s contact suddenly caused exceptions when TimSort was introduced… – Holger Jun 19 '17 at 14:21
  • @Holger Thanks for your example. I will try it later and will think of a way to circumvect this problem. By the way, could you clarify why `tryAdvance`'s contract is violated? – fps Jun 19 '17 at 17:25
  • 3
    As said, it may invoke the consumer’s `accept` method two times (if the first invocation caused an exception). – Holger Jun 19 '17 at 17:38
  • @Holger sir, please help me to see the new bug here: https://stackoverflow.com/questions/45256232/the-behavior-of-stream-oft-is-different-with-stream-oft – holi-java Jul 22 '17 at 16:17
  • 1
    Hey, I know how to fix the bug, this is my great progress after sleeping, :) – holi-java Jul 22 '17 at 22:53
  • cheers, all of the tests passed. I'll put the code in your answer, but you need to describe it. my english is not very good, I wish you can describe it perfectly. – holi-java Jul 22 '17 at 23:11
  • yeah, english is your strength. `exceptionally` support 3 mode & can run it in parallel stream, the method support rethrow exception, skipping processing invalid items, providing default value for each operation. – holi-java Jul 22 '17 at 23:13
  • the `fence` just is a attribute of the spliterator estimateSize if the value ==0 , then all of the task is completed. – holi-java Jul 23 '17 at 00:16
  • " I don't fully understand all possible cases " - sir, here is all of my tests. https://github.com/holi-java/api-test/blob/master/src/test/java/test/java/streams/StreamExceptionallyTest.java – holi-java Jul 23 '17 at 00:21
  • the purpose of `fence` field is for fixing the `Stream.of(T)` bug, :) – holi-java Jul 23 '17 at 00:51
  • " Why would some operation of the stream pipeline try to advance the underlying spliterator if the source spliterator has been already consumed? " - jdk also does the same thing like as me, e.g: `java.util.stream.StreamSpliterators.DistinctSpliterator`. – holi-java Jul 23 '17 at 00:54
  • "But it's not fair, you've done all the research" - it doesn't matter. I stay on stackoverflow is not for votes. I need to learn English while enhancing my skills & make some friends like as you. let it live in your answer, and thanks @Holger reporting such a bug. I also can obtain reputation from my `Iterator` solution, :) – holi-java Jul 23 '17 at 01:08
2

The error occurs in the stream intermediate operation, a clever way like as you to solving the problem is using the Proxy Design Pattern. for using the stream api you just need to proxying an Iterator from the source Stream to another Stream by StreamSupport#stream & Spliterators#spliterator(Iterator, long, int) , for example:

Stream<String> result = convertToString(Stream.of("1", "bad", "2")
                       .map(Integer::parseInt));



public Stream<String> convertToString(Stream<Integer> input) {
    return exceptionally(input, (e, action) -> action.accept(null))
            .map(it -> String.format("%s", it == null ? "NaN" : it));
}

Current version Stream is base on Iterator that fixed the Stream.of(T) bug, for more details please see my question.

<T> Stream<T> exceptionally(Stream<T> source,
                            BiConsumer<Exception, Consumer<? super T>> handler) {
    Spliterator<T> s = source.spliterator();
    return StreamSupport.stream(
            spliterator(
                    exceptionally(s, handler),
                    s.estimateSize(),
                    s.characteristics()
            ),
            source.isParallel()
    ).onClose(source::close);
}


//Don't worried the thread-safe & robust since it is invisible for anyone
private <T> Iterator<T> exceptionally(Spliterator<T> spliterator,
                            BiConsumer<Exception, Consumer<? super T>> handler) {
    class ExceptionallyIterator implements Iterator<T>, Consumer<T> {
        private Iterator<T> source = Spliterators.iterator(spliterator);
        private T value;
        private boolean valueInReady = false;
        private boolean stop = false;

        @Override
        public boolean hasNext() {

            while (true) {
                if (valueInReady) return true;
                if (stop) return false;
                try {
                    return source.hasNext();
                } catch (Exception ex) {
                    stop = shouldStopTraversing(ex);
                    handler.accept(ex, this);
                }
            }
        }

        @Override
        public T next() {
            return valueInReady ? dump() : source.next();
        }

        private T dump() {
            T result = value;
            valueInReady = false;
            value = null;
            return result;
        }

        @Override
        public void accept(T value) {
            this.value = value;
            this.valueInReady = true;
        }
    }
    return new ExceptionallyIterator();
}

static final String BUG_CLASS = "java.util.stream.Streams$StreamBuilderImpl";

public static boolean shouldStopTraversing(Exception ex) {
    for (StackTraceElement element : ex.getStackTrace()) {
        if (BUG_CLASS.equals(element.getClassName())) {
            return true;
        }
    }
    return false;
}
holi-java
  • 29,655
  • 7
  • 72
  • 83
0
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;

public class RecoverableOptional<T> {
/**
 * Common instance for {@code empty()}.
 */
private static final RecoverableOptional<?> EMPTY = new     RecoverableOptional<>();

/**
 * If non-null, the value; if null, indicates no value is present
 */
private final T value;

/**
 * If non-null, the value; if null, indicates recover value is present
 */
private final T defaultValue;

/**
 * Constructs an empty instance.
 *
 * should exist per VM.
 */
private RecoverableOptional() {
    this.value = null;
    this.defaultValue = null;
}

/**
 * Returns an empty {@code Optional} instance.  No value is present for this
 * Optional.
 *
 * @param <T> Type of the non-existent value
 * @return an empty {@code Optional}
 * @apiNote Though it may be tempting to do so, avoid testing if an object
 * is empty by comparing with {@code ==} against instances returned by
 * {@code Option.empty()}. There is no guarantee that it is a singleton.
 */
public static <T> T empty() {
    @SuppressWarnings("unchecked")
    RecoverableOptional<T> t = (RecoverableOptional<T>) EMPTY;
    return t.get();
}

/**
 * Constructs an instance with the value present.
 *
 * @param value the non-null value to be present
 * @throws NullPointerException if value is null
 */
private RecoverableOptional(T value, T value2) {
    this.value = Objects.requireNonNull(value);
    this.defaultValue = value2;
}


/**
 * Returns an {@code Optional} with the specified present non-null value.
 *
 * @param <T>   the class of the value
 * @param value the value to be present, which must be non-null
 * @return an {@code Optional} with the value present
 * @throws NullPointerException if value is null
 */
private static <T> RecoverableOptional<T> of(T value, T value2) {
    return new RecoverableOptional<>(value, value2);
}

/**
 * Returns an {@code Optional} with the specified present non-null value.
 *
 * @param <T>   the class of the value
 * @param value2 the value to be present on recovery
 * @return an {@code Optional} with the value present
 * @throws NullPointerException if value is null
 */
public <T> RecoverableOptional<T> recoverWith(T value2) {
    return new RecoverableOptional<T>((T) value, value2);
}

/**
 * Returns an {@code Optional} describing the specified value, if non-null,
 * otherwise returns an empty {@code Optional}.
 *
 * @param <T>   the class of the value
 * @param value the possibly-null value to describe
 * @return an {@code Optional} with a present value if the specified value
 * is non-null, otherwise an empty {@code Optional}
 */
public static <T> RecoverableOptional<T> ofNullable(T value, T value2) {
    return value == null ? empty() : of(value, value2);
}

/**
 * Returns an {@code Optional} describing the specified value, if non-null,
 * otherwise returns an empty {@code Optional}.
 *
 * @param <T>   the class of the value
 * @param value the possibly-null value to describe
 * @return an {@code Optional} with a present value if the specified value
 * is non-null, otherwise an empty {@code Optional}
 */
public static <T> RecoverableOptional<T> ofNullable(T value) {
    return value == null ? empty() : of(value, null);
}

public T get() {
    if (value == null) {
        throw new NoSuchElementException("No value present");
    }
    return value;
}

public <U> U map(Function<? super T, ? extends U> mapper) {
    Objects.requireNonNull(mapper);
    if (!isPresent())
        return empty();
    else {
        try {
            return value == null ? null : mapper.apply(value);
        } catch (Exception e) {
            if (defaultValue == null) {
                return null;
            }
            return mapper.apply(defaultValue);
        }
    }
}

/**
 * Return {@code true} if there is a value present, otherwise {@code false}.
 *
 * @return {@code true} if there is a value present, otherwise {@code false}
 */
public boolean isPresent() {
    return value != null;
}

}

and now tests

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.File;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


@RunWith(SpringRunner.class)
public class RecoverOptionalTest {

@Test
public void recoverSilentlyForExceptionWithRecoverValue() {
    File file = mock(File.class);

    when(file.getName()).thenThrow(new RuntimeException(""));

    String value = RecoverableOptional
            .ofNullable(file)
            .recoverWith(new File("eliko"))
            .map(f -> f.getName());

    assertEquals(value, "eliko");
}

@Test
public void recoverSilentlyForExceptionWithNullForNoRecoveryValue() {
    File file = mock(File.class);

    when(file.getName()).thenThrow(new RuntimeException(""));

    String value = RecoverableOptional
            .ofNullable(file)
            .map(f -> f.getName());

    assertNull(value);
}

@Test
public void noRecover() {
    File file = new File("check");

    String value = RecoverableOptional
            .ofNullable(file)
            .recoverWith(new File("eliko"))
            .map(f -> f.getName());

    assertEquals(value, "check");
}

}
Eliko
  • 61
  • 1
  • 2