5

I have hundreds of large (6GB) gziped log files that I'm reading using GZIPInputStreams that I wish to parse. Suppose each one has the format:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

I'm streaming the gziped file contents line by line through BufferedReader.lines(). The stream looks like:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

The start of every log entry can by identified by the predicate: line -> line.startsWith("Start of log entry"). I would like to transform this Stream<String> into a Stream<Stream<String>> according to this predicate. Each "substream" should start when the predicate is true, and collect lines while the predicate is false, until the next time the predicate true, which denotes the end of this substream and the start of the next. The result would look like:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

From there, I can take each substream and map it through new LogEntry(Stream<String> logLines) so as to aggregate related log lines into LogEntry objects.

Here's a rough idea of how that would look:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

Constraint: I have hundreds of these large files to process, in parallel (but only a single sequential stream per file), which makes loading them them entirely into memory (e.g. by storing them as a List<String> lines) is not feasible.

Any help appreciated!

Jim Garrison
  • 85,615
  • 20
  • 155
  • 190
Alexander
  • 59,041
  • 12
  • 98
  • 151
  • 1
    Sounds like a job for [StreamEx](https://github.com/amaembo/streamex). – shmosel Mar 28 '18 at 00:01
  • @shmosel Interesting, looking into it! Do you have any idea what the API would might be called? I tried keywords like "partition", "slice", "chunk" and "seperated" to no avail – Alexander Mar 28 '18 at 00:07
  • Maybe you can use [`collapse()`](http://amaembo.github.io/streamex/javadoc/one/util/streamex/StreamEx.html#collapse-java.util.function.BiPredicate-java.util.stream.Collector-) with a predicate of `(line1, line2) -> line1.startsWith(...) && !line2.startsWith(...)`. – shmosel Mar 28 '18 at 00:11
  • I would suggest Spring Integration to process multiple files in parallel.I have used to process 50 files of 4-5 GBs each in seperate core in parallel https://stackoverflow.com/questions/31819189/move-file-after-successful-ftp-transfer-using-java-dsl – Harish Mar 28 '18 at 00:15
  • On second thought, `collapse()` probably won't work if it's evaluating against adjacent elements. – shmosel Mar 28 '18 at 00:21
  • Would you be able to index the file by looking for the lines "Start of log entry... [0,150] , Start of log entry [151,323] , Start of log entry[324,534] and so on.Then you can group much easier – Harish Mar 28 '18 at 00:28
  • @Harish no, different log entried have different line counts, so they're not randomly accessible by index – Alexander Mar 28 '18 at 00:32
  • Which is preceisely why I am asking to index it first and then read the file seperately and group based on it.Meaning I am saying just have the start Index of "Start of log entry" don't worry about other lines.The assumption here is the begining of the line is +1 charecter to previous group end – Harish Mar 28 '18 at 00:34
  • try (BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { String line; boolean logStart = false; while ((line = bufferedReader.readLine()) != null) { if (line.startsWith("Start of log entry")) { logStart = true; } if (!logStart) { } } } catch (Exception ex) { } – Harish Mar 28 '18 at 00:57

2 Answers2

3

Frederico's answer is probably the nicest way for this particular problem. Following his last thought about custom Spliterator, I'll add an adapted version of an answer to a similar question, where I proposed using a custom iterator to created a chunked stream. This approach would also work on other streams that are not created by input readers.

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

Example

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

The iterator always looks one line ahead. As soon as that lline is the beginning of a new entry, it will wrapp the previous entry in a stream and return it as next. The factory method streamOf turns this iterator into a stream to be used as in the example I gave above.

I changed the split condition from a regex to a Predicate, so you can specify more complicated conditions with the help of multiple regexes, if-conditions, and so on.

Note that I only tested it with the example data above, so I don't know how it would behave with more complicated, errornous, or empty input.

Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
2

I think the main problem is that you are reading line by line and trying to create a LogEntry instance out of the lines, instead of reading block by block (which might cover many lines).

For this, you could use Scanner.findAll (available since Java 9) with a proper regex:

String input =
        "Start of log entry 1\n"        +
        "    ...some log details 1.1\n" +
        "    ...some log details 1.2\n" +
        "    ...some log details 1.3\n" +
        "Start of log entry 2\n"        +
        "    ...some log details 2.1\n" +
        "    ...some log details 2.2\n" +
        "    ...some log details 2.3\n" +
        "Start of log entry 3\n"        +
        "    ...some log details 3.1\n" +
        "    ...some log details 3.2\n" +
        "    ...some log details 3.3";

try (ByteArrayInputStream gzip = 
         new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
     InputStreamReader reader = new InputStreamReader(gzip);
     Scanner scanner = new Scanner(reader)) {

    String START = "Start of log entry \\d+";
    Pattern pattern = Pattern.compile(
            START + "(?<=" + START + ").*?(?=" + START + "|$)", 
            Pattern.DOTALL);

    scanner.findAll(pattern)
            .map(MatchResult::group)
            .map(s -> s.split("\\R"))
            .map(LogEntry::new)
            .forEach(System.out::println);

} catch (IOException e) {
    throw new UncheckedIOException(e);
}

So, this works by lazily finding matches in the Scanner instance. Scanner.findAll returns a Stream<MatchResult> and MatchResult.group() returns the matched String. Then we are splitting this string by line-breaks (\\R). This returns a String[] with each element of the array being each line. Then, assuming LogEntry has a constructor that accepts a String[] argument, we transform each one of these arrays to a LogEntry instance. Finally, assuming LogEntry has an overriden toString() method, we're printing each LogEntry instance to the output.

It is worth mentioning that the Scanner starts its work when forEach is invoked on the stream.

A note apart is the regex we're using to match log entries in the input. I'm not an expert in the regex world, so I'm almost sure there's quite some room for improvement here. First of all, we're using Pattern.DOTALL so that the . matches not only common characters but also line breaks. Then, there is the actual regex. The idea is that it matches and consumes Start of log entry \\d+, then it uses a look-behind against Start of log entry \\d+, then it consumes characters from the input in a non-greedy manner (this is the .*? part) and finally it looks-ahead to check if there is another occurence of Start of log entry \\d+ or if the end of the input has been reached. Please refer to this amazing article about regular expressions if you want to dig into this subject.


I don't know of any similar alternative if you're not on Java 9+. What you could do, though, is to create a custom Spliterator that wraps the Spliterator returned by the stream returned by BufferedReader.lines() and add the desired parsing behavior to it. Then, you'd need to create a new Stream out of this Spliterator. Not a trivial task at all...

fps
  • 33,623
  • 8
  • 55
  • 110
  • 1
    Nice, this looks like a good approach. The start of log entries are varied and no-so-easy to match, but I'll give it a shot and report back! – Alexander Mar 28 '18 at 16:19
  • Great find, this method should make a lot of input processing much simpler! Probably better suited for this particular case than a custom (Spl)Iterator. The only advantage that would have is being able to process streams from other sources than an readers. – Malte Hartwig Mar 29 '18 at 09:08