1

I would like to use the Java Streams API to create a pipeline and then terminate it with iterator(), but I want it to "prepare X elements" asynchronously in advance. (Streams API not required, but preferred).

The situation I have is:

  • Our application loads data from remote files (over network) to a database
    1. Opening a remote file (i.e. executing the pipeline synchronously for a single element) takes a non-trivial amount of time
    2. I cannot open too many files at once (results in connection timeout)
  • The order in which the files are loaded matters

The Java Streams API can create the pipeline of commands to do, but to my knowledge, it cannot satisfy both requirements above. It can do either single execution:

files.stream().map(this::convertToInputStream).iterator()

which exacerbates the first requirement. Or it can do wholesale execution:

files.stream().map(this::convertToInputStream).collect(toList())

which fails the second.

I have implemented the requirements using a Deque<InputStream> and Thread logic to keep it populated up to a certain element count, but it is not pretty. I am asking if anyone knows of a way to create pipelines like so (perhaps using libraries) in a more elegant fashion. It seemed like a reasonable use case for the Streams API.

Hau
  • 443
  • 6
  • 13
  • 1. Looks like you want to `loadFile` in `parallel` with a `limit` (too many). 2. Still, keep an offset of what was last open and continue processing from there. 3. Not an apt use case for java-streams in my opinion. – Naman Feb 25 '19 at 08:26
  • Do you want the first batch to be loaded before the stream has been queried at all? If yes, if the first batch is exhausted, do you want further elements to also be processed in batches? – Felk Feb 25 '19 at 09:03
  • @nullpointer The more I think about it, the more it feels like I'm stretching the Streams API's use case. I'm asking to be able to [control its parallelism](https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream), and I also want it to run in parallel in the order I want (not as strict a requirement). I'm still asking for it though :) – Hau Feb 25 '19 at 09:06
  • @Felk I wouldn't say I want the elements to be processed in batches per say, but batches would somewhat satisfy the requirements. I was looking for something like: Initially, files 1-5 are opened asynchronously. When `iter.next()` is called to get the 1st stream, the 6th file is opened asynchronously. When `iter.next()` returns the 2nd stream, the 7th file is opened, like a pipeline. The downstream code will close the stream to prevent connection timeout. – Hau Feb 25 '19 at 09:11

0 Answers0