5

Lets say I have huge webserver log file that does not fit in memory. I need to stream this file to a mapreduce method and save to database. I do this using Java 8 stream api. For example, I get a list after the mapreduce process such as, consumption by client, consumption by ip, consumption by content. But, my needs are not that like that given in my example. Since I cannot share code, I just want to give basic example.

By Java 8 Stream Api, I want to read file exactly once, get 3 lists at the same time, while I am streaming file, parallel or sequential. But parallel would be good. Is there any way to do that?

Lino
  • 19,604
  • 6
  • 47
  • 65
Yılmaz
  • 185
  • 2
  • 14
  • 4
    only with a custom collector. Java-12 has a proposal to implement something like a `BiCollector` (name is not exact yet), but definitely not a `TriCollector`... – Eugene Aug 13 '18 at 08:37
  • You can check out [this question](https://stackoverflow.com/questions/51545341/split-java-stream-into-two-lazy-streams-without-terminal-operation/51556624#comment90494511_51556624). In my answer, I use a custom spliterator to wrap the stream. It works in parallel (you can simplify it if you do not need the consumers to run in sync), but I don't know if it can easily be applied to reductions. – Malte Hartwig Aug 13 '18 at 08:38
  • what you should be aware of that lines from a File are by definition hard to parallelize, so IIRC there will be an internal buffer of 1024 lines initially, and then increased to 2048 and than 3072 and so on... so if your file is smaller than 1024 lines, parallel processing is much worse than a sequential one – Eugene Aug 13 '18 at 08:41
  • actually, I am getting source from NoSQL database. I just wanted to keep example simple. Thanks for your reply @Eugene – Yılmaz Aug 13 '18 at 08:43
  • 1
    There's a proposal for a future JDK for this sort of thing - see the extensive discussion at http://marxsoftware.blogspot.com/2018/08/jdk-12-merging-collectors-naming-challenge.html. I think you'll find some code there doing something similar to what you want (IIRC it will stream into 2 collectors, but I'm sure you could expand further) – Brian Agnew Aug 13 '18 at 08:52

2 Answers2

7

Generally collecting to anything other than standard API's gives you is pretty easy via a custom Collector. In your case collecting to 3 lists at a time (just a small example that compiles, since you can't share your code either):

private static <T> Collector<T, ?, List<List<T>>> to3Lists() {
    class Acc {

        List<T> left = new ArrayList<>();

        List<T> middle = new ArrayList<>();

        List<T> right = new ArrayList<>();

        List<List<T>> list = Arrays.asList(left, middle, right);

        void add(T elem) {
            // obviously do whatever you want here
            left.add(elem);
            middle.add(elem);
            right.add(elem);
        }

        Acc merge(Acc other) {

            left.addAll(other.left);
            middle.addAll(other.middle);
            right.addAll(other.right);

            return this;
        }

        public List<List<T>> finisher() {
            return list;
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

And using it via:

Stream.of(1, 2, 3)
      .collect(to3Lists());

Obviously this custom collector does not do anything useful, but just an example of how you could work with it.

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • Thank your for your response. I tried it. But it is hard to me implement this to this. Could you help me in this? What kind of custom class, I can write to pass for this method. Stream.of(1, 2, 3).collect(Collectors.toMap(g-> { return null; }, v -> { return null; }, (t, u) -> { return t; } )); – Yılmaz Aug 13 '18 at 09:29
  • @Yılmaz I would help you, if you could help me... the piece of code you wrote makes very very little sense – Eugene Aug 13 '18 at 09:31
  • I created github project. This is the link https://github.com/ftylmz1/java-stream-multiple-grouping – Yılmaz Aug 13 '18 at 12:13
4

I have adapted the answer to this question to your case. The custom Spliterator will "split" the stream into multiple streams that collect by different properties:

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

public static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>         sourceSpliterator;

    private List<BlockingQueue<T>> queues = new ArrayList<>();

    private boolean                sourceDone;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();

        for (Consumer<Stream<T>> fork : consumers)
        {
            LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private BlockingQueue<T> queue;

        private ForkedConsumer(BlockingQueue<T> queue)
        {
            super(Long.MAX_VALUE, 0);
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            while (queue.peek() == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(queue.poll());

            return true;
        }
    }
}

You can use it as follows:

streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
                       new Row("content2", "client1", "location1", 2),
                       new Row("content1", "client1", "location2", 3),
                       new Row("content2", "client2", "location2", 4),
                       new Row("content1", "client2", "location2", 5)),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getContent,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))));

// Output
// {client2={location2=9}, client1={location1=3, location2=3}}
// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}
// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}

Note that you can do pretty much anything you want with your the copies of the stream. As per your example, I used a stacked groupingBy collector to group the rows by two properties and then summed up the int property. So the result will be a Map<String, Map<String, Integer>>. But you could also use it for other scenarios:

rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))
Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
  • I tried your method. I feel that we are on the same page. Your code is beyond my java skills. I created a github project. Could you check my project, please? https://github.com/ftylmz1/java-stream-multiple-grouping – Yılmaz Aug 13 '18 at 12:15
  • @Yılmaz I have updated my code now that I understand what you want to do. You should be able to copy and paste that (might have to remove some 'static' modifiers from the classes, I wrote it all in one file). – Malte Hartwig Aug 13 '18 at 13:26
  • Thank you much for your answer @Malte Hartwig. I implemented and tested your code. It works for me. The usage method like I wanted. Thank you again :) – Yılmaz Aug 13 '18 at 14:06
  • @MalteHartwig +1, I did not even had the time to open that GitHub project, but you... nice! – Eugene Aug 13 '18 at 18:45
  • @MalteHartwig how I can make streamForked method syncronize with three mapreduce method. I mean, After completing those three jobs, streamForked method goes next line. – Yılmaz Aug 14 '18 at 11:58
  • Do you mean that the three groupings process one Row, and only when they are all done, then continue with the next Row? In that case, please follow the link at the beginning of my answer to a similar question. There you'll find how to have the consuming streams wait for one another by having them all access the same queue. – Malte Hartwig Aug 14 '18 at 12:12