0

I have a pipeline where files are processes in parallel, but I am a bit suspicious about the peek function.

File file = articles.parallelStream( )
                    .map( article -> {
                        String fileName = processer.getFriendlyName( article, locale );
                        currentCount.incrementAndGet();
                        return new ImmutablePair<>( fileName, converted );
                    } )
                    .peek( pair -> statusMessageSender.sendStatusMessage( totalCount, currentCount.get(), pair.getKey( ) ) )
                    .collect( new Archiver( archivePath ) );

By reading the javadocs, I am not completely sure if the counter that is supposed to send the current status of progress is doing its job (basically, looking for assurance in the docs here)

For parallel stream pipelines, the action may be called at whatever time and in whatever thread the element is made available by the upstream operation.

It seems to me that an observer would get the current count, regardless if the file name is correct in relation to the processing order, which is fine. but in the end of the day,I am in a path where I am distrusting the peek, and leading towards sync on sendStatusMessage's receptor.

In the end I am looking for a way to send status in a parallel stream, any thoughts?

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Victor
  • 3,520
  • 3
  • 38
  • 58
  • Looks fine to me! Peek is definitely the way to go, although you should use `.peek( pair -> statusMessageSender.sendStatusMessage( totalCount, currentCount.incrementAndGet(), pair.getKey( ) ) )`. Does this not work for you? – Bohemian Aug 13 '18 at 23:43
  • it works, I am just afraid it is working by chance here. the case I am interesting is two messages being sent, and due to the order the status message could decrease. – Victor Aug 13 '18 at 23:51
  • how many articles are in that `articles`-list? because calling `parallelStream` may not execute the stream in parallel at all – Lino Aug 14 '18 at 05:51
  • 2
    No, this isn't a correct way to do things. Your map increments a counter, then your peek reads it. So if 4 threads execute the mapping function in parallel, then the 4 ones execute the peek consomer, you'll send the same counter value 4 times. The counter value (returned by incrementAndGet()) should be part of the object returned by the mapping function. – JB Nizet Aug 14 '18 at 05:58
  • Using the result of `incrementAndGet` is indeed the key (i.e. don't use `get` which gives other threads the chance to increment it in the meantime). Extend your pair to a triple, or send the message as part of the `map` already. – Malte Hartwig Aug 14 '18 at 08:43
  • I love when a gold medal makes as duplicated without being helpful at all. but anyway, I end up keeping the peek there, as the map function in my mind is supposed to, well, map. however, I moved the synchronization issue to an accumulator as I really do not care about the order in with the files are processed, so I would not need a tripe. so peek sends a message to a sync accumulator that keeps track of what is going on, while map just maps and the collector just collects (I was considering the collector to keep track of the current status, but it was not really its job). – Victor Aug 15 '18 at 00:10
  • thanks everyone for your input, I would post the results here, but someone single-handily closed the question. – Victor Aug 15 '18 at 00:10
  • What do you mean with “I moved the synchronization issue to an accumulator”? Do you have a counter or not? – Holger Aug 20 '18 at 16:38
  • I moved the place were the counting occurs. instead of sending in a message with the name of the current file, the message receiver is now responsible for keeping track of the numbers. as the order is not important, the message receiver could take over this job. – Victor Aug 20 '18 at 17:28
  • @Victor I've re-opened and if you really feel that I've closed it for a bad reason, just ping me at whatever different question and I'll review... sorry, I did not mean to be rude. – Eugene Aug 20 '18 at 18:59
  • By no means I think you were rude @Eugene, I could say premature, but never rude. I can know post an answer and maybe help someone in the future. This problem is different due to the unordered and sync message queue . thanks for reconsidering. – Victor Aug 20 '18 at 20:04
  • @Victor your comments create the impression that there is some connection between the file name and the counter whereas in your question’s code example, there is no relationship at all. There, you are passing a file name as returned by `getFriendlyName` and an additional number whose purpose lies in the dark. If you remove that number aspect from that code (as now, the message receiver is responsible for keeping track of the numbers for whatever purpose), there’s no actual question remaining (as your question was about something connected to a counter incremented in `peek`. – Holger Aug 21 '18 at 16:20
  • there’s no actual question remaining, yes you are correct that was actually the final result of this discussion. (as now, the message receiver is responsible for keeping track of the numbers for whatever purpose). I will put in an answer, the question was closed when we got to this point. – Victor Aug 21 '18 at 16:28

1 Answers1

0

Initially the discussion had a lot about the peek and why I was splitting the messaging part from the mapping expression. This was more a matter of style as I tend to favor mapping functions for mapping and nothing more.

I could see why people would defend peek or argue against it. But button line it acts to consume a value and pass it along in the pipe. So, as I was looking for a colateral behavior (passing a message) the peek function seemed perfect.

In the parallel stream the issue is that one cannot predict when peek is actually called. but there was two aspects to consider: when the message is sent was irrelevant for the problem at hands and the message itself could be sent at anytime.

In the end the counter could be in the peek part as well with the message receiver was the only true factor here. The message receiver could have its own counter or only consider the highest received in the time frame.

Button line, the question that begun with suggestions around peek, ended up with the following: In terms of functionality, the peek function would do its job just fine: mainly because the sequence in the pipe was not ordered.

However the message consumer would tell if it could consume that message correctly.Given that only one consumer was using this information and the others were not, the final conclusion was that we had a problem in the protocol design and not around the peek function. We removed the counter from the std message and the problem was gone. peek could be used in a safe way for this problem, yes it could but...

so: It could be:

File archive = articles.parallelStream( )
                       .map( article -> {
                           File converted = converter.getFile( ... );
                           String fileName = converter.getFriendlyName( ... );
                           return new ImmutablePair<>( fileName, converted );
                       } )
                       .peek( pair -> statusMessageSender.sendStatusMessage( pair.getKey() ) )
                       .collect( new Archiver( archivePath, deleteArchivedFiles ) );

or:

File archive = articles.parallelStream( )
                       .map( article -> {
                           File converted = converter.getFile( ... );
                           String fileName = converter.getFriendlyName( ... );
                           return new ImmutablePair<>( fileName, converted );
                       } )
                       .peek( pair -> statusMessageSender.sendStatusMessage( currentCount.incrementAndGet(), pair.getKey() ) )
                       .collect( new Archiver( archivePath, deleteArchivedFiles ) );

But in the end it was about the protocol and not peek. peek could definitely be used, and the non ordered nature of the problem was the reason way it could be used. (thanks for your help people on SO)

Victor
  • 3,520
  • 3
  • 38
  • 58