19

I want to loop over a huge array and do a complicated set of instructions that takes a long time. However, if more than 30 seconds have passed, I want it to give up.

ex.

final long start = System.currentTimeMillis();
myDataStructure.stream()
    .while(() -> System.currentTimeMillis() <= start + 30000)
    .forEach(e ->
    {
      ...
    });

I want to avoid just saying return inside the forEach call if a certain condition is met.

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Hatefiend
  • 3,416
  • 6
  • 33
  • 74
  • 2
    https://stackoverflow.com/questions/41392286/java-8-completablefuture-stream-and-timeouts Here is a sample answer, maybe it helps you. – utkusonmez Aug 01 '17 at 08:17
  • 1
    If you were computing the changes of the side effects and applying them afterwards (assuming computing is much more expensive than applying) you may be able to use a special implementation of `Collector` that stops collecting when the timeout is reached. – SpaceTrucker Aug 01 '17 at 09:06
  • 2
    Related: https://stackoverflow.com/questions/20746429/limit-a-stream-by-a-predicate – Klitos Kyriacou Aug 01 '17 at 09:13
  • 1
    Also, Java 9 Streams will have a new `takeWhile` method which will probably be exactly what you're requesting. – Klitos Kyriacou Aug 01 '17 at 09:16
  • Question... are the complicated instructions IO-bound or CPU-bound? – fps Aug 01 '17 at 21:48
  • @FedericoPeraltaSchaffner CPU-bound. – Hatefiend Aug 04 '17 at 11:42

5 Answers5

19

I would create a custom pool for that, something like:

ForkJoinPool forkJoinPool = new ForkJoinPool(1);
    try {
        forkJoinPool.submit(() ->
        IntStream.range(1, 1_000_000).filter(x -> x > 2).boxed().collect(Collectors.toList()))
                .get(30, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        // job not done in your interval
    }
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • As far as I understand this should return early on timeouts, but how would the fork join pool stop processing to not consume more CPU resources than necessary? In my mind it will continue processing the stream in the background. Also where is the OPs operation in your code? – SpaceTrucker Aug 01 '17 at 11:21
  • 2
    @SpaceTrucker you could explicitly call `shutDown` or `shutDownNow` when timeout is reached. about the code - this was just an example... – Eugene Aug 01 '17 at 12:24
  • I think your solution is the best solution. +1 – holi-java Aug 01 '17 at 12:54
  • 3
    @SpaceTrucker Alternatively, the `submit` method from `ForkJoinPool` returns a `Future` which can be cancelled by doing `future.cancel(true)` which would sent an interruption to the submitted task, which if written properly could be stopped this way. – Edwin Dalorzo Aug 03 '17 at 23:16
15

Since Stream forEach doesn't have break, I think you can create Custom Exception for this to break the loop:

myDataStructure.stream()
    .forEach(e ->
    {
      if (System.currentTimeMillis() <= start + 30000) {
          throw new MyTimeOutException()
      }
    });

and you can catch this Exception for catch this.

chengpohi
  • 14,064
  • 1
  • 24
  • 42
  • Exceptions have a bit too much overhead for this application unfortunately. – Hatefiend Aug 01 '17 at 08:23
  • 14
    @Hatefiend The exception is only thrown once after 30 seconds. I think the overhead of creating and throwing the exception there is negligible. – SpaceTrucker Aug 01 '17 at 08:25
  • this is the most efficient way of doing it. one time exception thrown is much more efficient than checking a condition for each unneeded element of a huge data set. – nafas Aug 01 '17 at 08:44
  • 2
    You have a typo, should be `30000` and not `30` because it's in milliseconds – Lino Aug 01 '17 at 09:26
  • Actually, [this does not work](https://stackoverflow.com/a/27668305/4718288). You cannot throw checks exceptions from inside of a stream and catch it outside. – Hatefiend Aug 01 '17 at 10:09
  • 2
    Hi @Hatefiend, this post point out `Stream` can't throw **checked exception** without `catch` and **rethrow** it. for the above code, It throw the custom `Exception` in code block. it should work. – chengpohi Aug 01 '17 at 10:19
  • Would the custom exception extend `Exception` or `RuntimeException`? Because `Exception` is a checked exception, surely? – Hatefiend Aug 01 '17 at 11:25
  • 4
    This would break with parallel streams – Ferrybig Aug 01 '17 at 11:28
  • hi, this will can't working if the `forEach` operation is blocked. In fact, I'd like @Eugene's solution. – holi-java Aug 01 '17 at 12:53
  • 6
    @SpaceTrucker: indeed, one exception in 30 seconds doesn’t hurt. Further, `MyTimeOutException` could use [a super constructor that disables stack traces](https://docs.oracle.com/javase/8/docs/api/java/lang/RuntimeException.html#RuntimeException-java.lang.String-java.lang.Throwable-boolean-boolean-) to avoid the most expensive part of exceptions. – Holger Aug 01 '17 at 12:56
15

If iterating the stream or array in this case is cheap compared to actually executing the operation than just use a predicate and filter whether time is over or not.

final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(30L);
myDataStructure.stream()
    .filter(e -> System.nanoTime() <= end)
    .forEach(e ->
    {
      ...
    });

Question is if you need to know which elements have been processed or not. With the above you have to inspect whether a side effect took place for a specific element afterwards.

SpaceTrucker
  • 13,377
  • 6
  • 60
  • 99
  • Oh I didn't think to use `filter` like that – Hatefiend Aug 01 '17 at 08:22
  • 11
    Nice idea, but if the stream is huge, it may be wasteful to evaluate `System.currentTimeMillis() <= start + 30000L` for each of the elements after the 30 seconds have passed. – Eran Aug 01 '17 at 08:32
  • 1
    @Eran yes, looks like a custom pool with a `get` would be much more suitable – Eugene Aug 01 '17 at 09:13
  • @Eran Then perhaps `takeWhile()` instead of `filter()`? But really using such an impure predicate is not in the style of the stream API, I think a normal `for` loop would be better. – Tavian Barnes Aug 01 '17 at 15:25
5

You can use the fact that .allMatch() is a short-circuiting operator to terminate the stream:

final long start = System.currentTimeMillis();
myDataStructure.stream()
    .allMatch(e ->
    {
      // your task here
        return System.currentTimeMillis() <= start + 30000;
    });
Robin Topper
  • 2,295
  • 1
  • 17
  • 25
  • This will work but `allMatch` terminates the stream. I am using an `anyMatch` currently and the two do not mix well together. I cannot short circuit with an `anyMatch` either. – Hatefiend Aug 01 '17 at 10:13
  • 1
    @Hatefiend It is a solution to the problem described in the question. If you have restrictions/boundary conditions/additional requirements like you describe now, it would be a good idea to mention those in the question/problem statement – Robin Topper Aug 01 '17 at 10:19
3

As what the comments said under the OP, takeWhile/dropWhile are missed in Java 8 (will be added in Java 9). There is no any reason to try to implement the logic by exception or other codes because the code just looks so ugly and total nonscenes even it's just for practice. I think using 3rd party library is a much, much better solution, for example StreamEx

StreamEx(source).takeWhile(() -> System.currentTimeMillis() <= start + 30000)
                .forEach(e -> { ... });
123-xyz
  • 619
  • 4
  • 5