6

I have a method which performs processing on a stream. Part of that processing needs to be done under the control of a lock - one locked section for processing all the elements - but some of it doesn't (and shouldn't be because it might be quite time-consuming). So I can't just say:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess);
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing);
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

because the calls to doLockedProcessing would only be executed when the terminal operation forEach is invoked, and that is outside the lock.

So I think I need to make a copy of the stream, using a terminal operation, at each stage so that the right bits are done at the right time. Something like:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).copy();
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).copy();
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

Of course, the copy() method doesn't exist, but if it did it would perform a terminal operation on the stream and return a new stream containing all the same elements.

I'm aware of a few ways of achieving this:

(1) Via an array (not so easy if the element type is a generic type):

copy = Stream.of(stream.toArray(String[]::new));

(2) Via a list:

copy = stream.collect(Collectors.toList()).stream();

(3) Via a stream builder:

Stream.Builder<V> builder = Stream.builder();
stream.forEach(builder);
copy = builder.build();

What I want to know is: which of these methods is the most efficient in terms of time and memory? Or is there another way which is better?

Jacob G.
  • 28,856
  • 5
  • 62
  • 116
Jeremy Hicks
  • 121
  • 6
  • either 2 or 3 - I would go with 3 as it is a bit more idiomatic (to me) – Eugene Jan 22 '19 at 14:39
  • Why either 2 or 3? – Jeremy Hicks Jan 22 '19 at 15:16
  • What is the type of `objects`? – Holger Jan 22 '19 at 16:07
  • In my real-world application, it's a generic type - it could be anything. – Jeremy Hicks Jan 22 '19 at 17:01
  • 1
    Related: [Copy a stream...](https://stackoverflow.com/q/23860533/1371329) – jaco0646 Jan 22 '19 at 19:27
  • 1
    Note that none of your options *copy* the stream, because your code *consumes* the original stream in order to make the new one, thus achieving nothing more than `copy = stream` and pointless processing. – Bohemian Jan 22 '19 at 19:28
  • 1
    this is the part that IMO you get wrong about streams in general *one locked section for processing all the elements* - you can't achieve that. Stream process elements one at a time, your `map` is executed for one element only, not all of them at once. – Eugene Jan 23 '19 at 10:11
  • @JeremyHicks `objects` seems to be an array, as you pass it to `Stream.of`, so is it `V[]`? I’m asking, because your code does only contain `map` steps from `V` to `V`, which raises the question whether they are actually functions receiving a value and producing a value. The requirement of synchronizing one processing step speaks against it. So it looks much like you are actually doing a multi-step process over an array and the Stream API is by far the wrong tool for this job. – Holger Jan 23 '19 at 12:48

3 Answers3

3

I think you have already mentioned all possible options. There's no other structural way to do what you need. First, you'd have to consume the original stream. Then, create a new stream, acquire the lock and consume this new stream (thus invoking your locked operation). Finally, create a yet newer stream, release the lock and go on processing this newer stream.

From all the options you are considering, I would use the third one, because the number of elements it can handle is only limited by memory, meaning it doesn't have an implicit max size restriction, like i.e. ArrayList has (it can contain about Integer.MAX_VALUE elements).

Needless to say, this would be a quite expensive operation, both regarding time and space. You could do it was follows:

Stream<V> temp = Stream.of(objects)
        .map(this::preProcess)
        .collect(Stream::<V>builder,
                 Stream.Builder::accept,
                 (b1, b2) -> b2.build().forEach(b1))
        .build();

synchronized (lockObj) {
    temp = temp
            .map(this::doLockedProcessing)
            .collect(Stream::<V>builder,
                     Stream.Builder::accept,
                     (b1, b2) -> b2.build().forEach(b1))
            .build();
}

temp.map(this::postProcess).forEach(System.out::println);

Note that I've used a single Stream instance temp, so that intermediate streams (and their builders) can be garbage-collected, if needed.


As suggested by @Eugene in the comments, it would be nice to have a utility method to avoid code duplication. Here's such method:

public static <T> Stream<T> copy(Stream<T> source) {
    return source.collect(Stream::<T>builder,
                          Stream.Builder::accept,
                          (b1, b2) -> b2.build().forEach(b1))
                 .build();
}

Then, you could this method as follows:

Stream<V> temp = copy(Stream.of(objects).map(this::preProcess));

synchronized (lockObj) {
    temp = copy(temp.map(this::doLockedProcessing));
}

temp.map(this::postProcess).forEach(System.out::println);
fps
  • 33,623
  • 8
  • 55
  • 110
  • I like this solution; under the hood it uses `SpinedBuffer` - a collection that is not exposed and used by Streams; I have not looked too much inside it, I just hope it is better to collect to it rather that an `ArrayList`. 1+ – Eugene Jan 23 '19 at 10:30
  • btw you could also update the code to provide a utility method that would generate a `Collector` that collects to a `Builder` – Eugene Jan 23 '19 at 10:48
  • 3
    @Eugene a spined buffer is an array of arrays, so when a target array’s capacity is exhausted, a new array will be added to the outer array instead of replacing the array. So unlike `ArrayList`, it does not need to copy all contained elements when expanding. When the outer array’s capacity is exhausted, resizing it only needs cheap copying of the array references, but then you’re beyond the capacity of ordinary collections anyway. So doing random access would be more complicated, but that’s why it is not a `List`. `Stream.Builder` adds special support for empty and single element cases to it. – Holger Jan 23 '19 at 13:02
  • @Holger I should have read the documentation as it seems to touch exactly this... – Eugene Jan 23 '19 at 13:09
  • @FedericoPeraltaSchaffner I don’t know, perhaps they wanted to keep the possibility of refactoring this class in later versions. But is there anything you could do with it when having direct access, which you can’t do with `Stream.Builder`? – Holger Jan 23 '19 at 15:50
  • @FedericoPeraltaSchaffner well, neither of them would work if `SpinedBuffer` was `public`, as that class does not support removal and there’s no way to add this functionality atop of it. You would have to implement that yourself anyway. – Holger Jan 23 '19 at 16:59
2

I created a benchmark test which compares the three methods. This suggested that using a List as the intermediate store is about 30% slower than using an array or a Stream.Builder, which are similar. I am therefore drawn to using a Stream.Builder because converting to an array is tricky where the element type is a generic type.

I've ended up writing a little function that creates a Collector which uses a Stream.Builder as the intermediate store:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

I can then make a copy of any stream str by doing str.collect(copyCollector()) which feels quite in keeping with the idiomatic usage of streams.

The original code I posted would then look like this:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).collect(copyCollector());
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).collect(copyCollector());
}
toPostProcess.map(this::postProcess).forEach(System.out::println);
Jeremy Hicks
  • 121
  • 6
  • I find it weird to "collect" to a `Stream`. I would collect to a `Builder` instead, but it is just me... Also I am not sure about that `Characteristics.UNORDERED`, it seems it should not be present. Otherwise, not a bad solution at all, 1+ – Eugene Jan 23 '19 at 11:11
  • Collecting to a `Builder` would work equally well, but it just puts the responsibility of calling `build()` on the client which seems unnecessary. I included `Characteristics.UNORDERED` because as far as I can see, there is no way of guaranteeing the order of the output stream if the combiner (for combining two intermediate builders) is called. – Jeremy Hicks Jan 24 '19 at 09:22
  • 2
    that is the point when you merge like this, the order will be preserved. Some people call `b1/b2` `left/right` to denote that – Eugene Jan 24 '19 at 09:51
  • OK thank you, I understand now. I thought that you couldn't predict anything about the subsets of the data that would be merged using any particular call to the combiner, but it appears that you can: they will always be derived from adjacent subsets of the source stream. in which case, the collector as written does guarantee order. I'll remove the `Characteristics.UNORDERED`. – Jeremy Hicks Jan 29 '19 at 14:51
  • *adjacent subsets of the source stream* - not always, you could have an `ArrayList` as source, but call `unordered/sorted` on it - and thus loose the initial order... – Eugene Jan 29 '19 at 14:53
0

Wrap doLockedProcessing it in synchronization. Here’s one way:

class SynchronizedFunction<T, R> {
    private Function<T, R> function;
    public SynchronizedFunction(Function<T, R> function) {
        this.function = function;
    }
    public synchronized R apply(T t) {
        return function.apply(t);
    }
}

Then use that in your stream:

stream.parellel()
  .map(this:preProcess)
  .map(new SynchronizedFunction<>(this::doLockedProcessing))
  .forEach(this::postProcessing)

This will serially process the locked code, but be parellel otherwise.

Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • isn't this the same thing I proposed (but it seems it is still not what the OP wants)? This will indeed create a single instance of `SynchronizedFunction`, but for each element of the stream, `apply` will be called, which is `synchronized`, so for each element, that lock will be tried. What OP wants is acquire the lock (once) and execute all the elements under that lock. I don't think that is possible – Eugene Jan 23 '19 at 10:09
  • @Eugene, you're correct. This will establish a separate lock for each element of the stream. I want the processing of all the elements to be done in one lock. – Jeremy Hicks Jan 23 '19 at 10:25