2

I have a usecase, where I have a source of data, let's say: every second, a new string is coming from that data source.

I want to create a pipeline, where if a new string arrives, it's pushed through that pipeline for processing.

I'd guess the Java 8 introduced Stream API could do exactly that, since it has convenient functions for processing data of an arbitrary collection, however I'd like to skip the part where I collect my data to a separate collection and dispatch the arriving data straight to the Stream I just created.

Is there any way to do that?

László Stahorszki
  • 1,102
  • 7
  • 23
  • 6
    Java 8 streams are not for that, have a look at Reactive frameworks. – Sleiman Jneidi Jul 15 '18 at 14:40
  • That should be possible. If you want to try, implement [the `Spliterator` interface](https://docs.oracle.com/javase/9/docs/api/java/util/Spliterator.html) and use [`StreamSupport.stream(Spliterator, boolean)`](https://docs.oracle.com/javase/9/docs/api/java/util/stream/StreamSupport.html#stream-java.util.Spliterator-boolean-) to create your stream. – Ole V.V. Jul 15 '18 at 14:44
  • You could do that with java streams, but it would be a hack. – Johannes Kuhn Jul 15 '18 at 14:44
  • Are you saying that your code is *already* processing data in a stream pipeline, and that arriving new data should be immediately *injected* into the stream data? What if the stream processing has already completed? Do you then start a new stream? Your question doesn't seem to make sense for how streams work. – Andreas Jul 15 '18 at 14:53
  • okay, multiple answers here: 1) isn't reactive programming for something much bigger? I mean for complete web apps and stuff? I think it'd be too heavy for this purpose (I don't really know much about reactive programming) – László Stahorszki Jul 15 '18 at 17:30
  • 2) why exactly is it a hack? I think I have a pretty good grasp at the Stream API, even tho I'm far from expert and that should be something it can do, I just needed some implementation of the Supplier interface, which could provide the data. and the BlockingQueue seems to be exactly that – László Stahorszki Jul 15 '18 at 17:32
  • 3
    The problems start with the part you have omitted from your question. What are you gonna do with your stream? “dispatch the arriving data straight to the Stream” is not an end in itself. Start discussing about the actual goal and you’ll learn why the stream is not the best choice. And no, reactive programming hasn’t to be “big”. Tasks like “whenever a new item arrives, do xyz” are the simplest example of it. – Holger Jul 16 '18 at 09:42
  • I'm implementing a Spring Autowire-like functionality (not with actual beans, it's just similar). The thing is, that I have to do some stuff to fields I get, which I wrote some pure functions for, to be clean and nice. I have a function, which takes a Functional interface to retrieve the fields in a class (field -> ...). I don't really want to fill up that functional interface with all these transformations, I want something more readable and separated. Which lead me to the idea of using pipelines. And I don't at this time know any better way to do this other then streams. – László Stahorszki Jul 16 '18 at 10:41
  • 2
    See [`java.util.concurrent.Flow`](https://docs.oracle.com/javase/9/docs/api/?java/util/concurrent/Flow.html)… – Holger Jul 16 '18 at 13:45

2 Answers2

6

In order to do what you described you'd need some kind of blocking. I'd use a BlockingQueue (any kind would do - if you want to avoid collections, use SynchronousQueue, which has no internal state at all), and create an infinite Stream from it using Stream.generate.

Example:

class StreamableQueue<T> {

    private BlockingQueue<T> dataSource;

    Stream<T> asStream() {
        return Stream.generate(this::takeFromDataSource);
    }

    private T takeFromDataSource() {
        try {
            return dataSource.take();
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }
}

Of course, the BlockingQueue provided as dataSource to this class would need to be fed elements from a different thread.


EDIT: A minor addition - instead of using try-catch, you can use:

Tomasz Linkowski
  • 4,386
  • 23
  • 38
0

Okay, so the answer to this particular case has become a bit different.

While the answer of Tomasz Linkowski seemed like a really nice solution, the main problem with it was the fact that my function is more sequential then BlockingQueues would suggest, which was detrimental for readability.

So I came up with Stream.Builder, which is actually just what I need and nothing more.

Tomasz Linkowski
  • 4,386
  • 23
  • 38
László Stahorszki
  • 1,102
  • 7
  • 23
  • 1
    It is not as much about the data source being sequential as about being finite, because the _only_ way to create a `Stream` using `Stream.Builder` is to know all data it should contain. This is something you have misspecified in your question, apparently. But I'm glad you've found a solution to your problem! :) – Tomasz Linkowski Jul 16 '18 at 05:03
  • I'm sorry if I wasn't upfront enough. The _I'd like to skip the part where I collect my data to a separate collection_ part made the solution easy to find, since almost the exact same words are used in the Stream Builder documentation – László Stahorszki Jul 16 '18 at 10:43