4

I have a "plain old text file" where lines end with a new line character. For arbitrary reasons I need to read and parse this text file 4 (X for generality) lines at a time.

I'd like to use Java streams for this task and I know I can turn the file into a stream like so:

try (Stream<String> stream = Files.lines(Paths.get("file.txt""))) {
    stream.forEach(System.out::println);
} catch (IOException e) {
    e.printStackTrace();
}

But how can I use Java's Stream API to "bunch" the file into groups of 4 consecutive lines?

Szymon Stepniak
  • 40,216
  • 10
  • 104
  • 131
urig
  • 16,016
  • 26
  • 115
  • 184
  • 2
    This is often called "chunking," though I'm not aware of an easy, standard way of doing it Java streams. – yshavit Jan 11 '18 at 21:43

5 Answers5

4

There is a way to partition and process your file content into n-size chunks using standard Java 8 Stream API. You can use Collectors.groupingBy() to partition your file content into chunks - you can collect them as a Collection<List<String>> or you can apply some processing while collecting all lines (e.g. you can join them to a single String).

Take a look at following example:

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class ReadFileWithStream {

    public static void main(String[] args) throws IOException {
        // Path to a file to read
        final Path path = Paths.get(ReadFileWithStream.class.getResource("/input.txt")‌​.toURI());
        final AtomicInteger counter = new AtomicInteger(0);
        // Size of a chunk
        final int size = 4;

        final Collection<List<String>> partitioned = Files.lines(path)
                .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size))
                .values();

        partitioned.forEach(System.out::println);
    }
}

My input file contains some numbers (one number at a line), and when I run following code I get something like:

[0, 0, 0, 2]
[0, -3, 2, 0]
[1, -3, -8, 0]
[2, -12, -11, -11]
[-8, -1, -8, 0]
[2, -1, 2, -1]
... and so on

Collectors.groupingBy() allows me also to use different downstream collector. By default Collectors.toList() is being used so my result is accumulated into a List<String> and I get Collection<List<String>> as a final result.

Let's say I want to read 4-size chunks and I want to sum all numbers in a chunk. In this case I will use Collectors.summingInt() as my downstream function and the returned result is Collection<Integer>:

final Collection<Integer> partitioned = Files.lines(path)
        .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size, Collectors.summingInt(Integer::valueOf)))
        .values();

Output:

2
-1
-10
-32
-17
2
-11
-49
... and so on

And last but not least. Collectors.groupingBy() returns a map where values are grouped by specific keys. That's why in the end we call Map.values() to get a collection of the values this contained in this map.

Hope it helps.

Szymon Stepniak
  • 40,216
  • 10
  • 104
  • 131
  • this will work indeed, but there are better ways https://stackoverflow.com/a/48216421/1059372 without side-effects... – Eugene Jan 12 '18 at 09:55
  • `Paths.get(ReadFileWithStream.class.getClassLoader().getResource("input.txt").getPath())` should be `Paths.get(ReadFileWithStream.class.getResource("/input.txt").toURI())`… – Holger Jan 12 '18 at 12:51
4

This is a job for java.util.Scanner. In Java 9, you can simply use

try(Scanner s = new Scanner(PATH)) {
    s.findAll("(.*\\R){1,4}")
     .map(mr -> Arrays.asList(mr.group().split("\\R")))
     .forEach(System.out::println);
}

For Java 8, you can use the back-port of findAll of this answer. After adding an import static for that method, you can use it like

try(Scanner s = new Scanner(PATH)) {
    findAll(s, Pattern.compile("(.*\\R){1,4}"))
        .map(mr -> Arrays.asList(mr.group().split("\\R")))
        .forEach(System.out::println);
}

Note that the result of the match operation is a single string containing up to four lines (less for the last line(s)). If that’s suitable for your follow-up operation, you can skip splitting that string into individual lines.

You may even use the MatchResult’s properties for a more sophisticated processing of the chunks, e.g.

try(Scanner s = new Scanner(PATH)) {
    findAll(s, Pattern.compile("(.*)\\R(?:(.*)\\R)?(?:(.*)\\R)?(?:(.*)\\R)?"))
        .flatMap(mr -> IntStream.rangeClosed(1, 4)
                           .mapToObj(ix -> mr.group(ix)==null? null: ix+": "+mr.group(ix)))
        .filter(Objects::nonNull)
        .forEach(System.out::println);
}
Holger
  • 285,553
  • 42
  • 434
  • 765
3

Here's a straightforward way using Guava's Iterators.partition method:

try (Stream<String> stream = Files.lines(Paths.get("file.txt""))) {

    Iterator<List<String>> iterator = Iterators.partition(stream.iterator(), 4);

    // iterator.next() returns each chunk as a List<String>

} catch (IOException e) {
    // handle exception properly
}

This is only suitable for sequential processing, but if you are reading a file from disk, I can hardly imagine any benefit from parallel processing...


EDIT: If you want, instead of working with the iterator, you could convert it again to a stream:

Stream<List<String>> targetStream = StreamSupport.stream(
      Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
      false);
fps
  • 33,623
  • 8
  • 55
  • 110
2

If you want to stick with the streams, the only solution I see is to write your own custom collector. It's not intended for that purpose, but you can make use of it.

private static final class CustomCollector {

    private List<String> list = new ArrayList<>();

    private List<String> acumulateList = new ArrayList<>();

    public void accept(String str) {
        acumulateList.add(str);
        if (acumulateList.size() == 4) { // acumulate 4 strings
            String collect = String.join("", acumulateList);
            // I just joined them in on string, you can do whatever you want
            list.add(collect);
            acumulateList = new ArrayList<>();
        }
    }

    public CustomCollector combine(CustomCollector other) {
        throw new UnsupportedOperationException("Parallel Stream not supported");
    }

    public List<String> finish() {
        if(!acumulateList.isEmpty()) {
            list.add(String.join("", acumulateList));
        }
        return list;
    }

    public static Collector<String, ?, List<String>> collector() {
        return Collector.of(CustomCollector::new, CustomCollector::accept, CustomCollector::combine, CustomCollector::finish);
    }
}

And use it like so :

stream.collect(CustomCollector.collector());
Schidu Luca
  • 3,897
  • 1
  • 12
  • 27
  • 2
    a slightly better name would be `PartitioningByCollector` or something along the lines... also seems like a `StringBuilder` (or plain concatenation since java-9 would fit some much better then `acumulateList`) – Eugene Jan 12 '18 at 08:09
  • 2
    and the entire beauty of Streams is when you really can compute the parallel processing... for this case you can ;) see this https://stackoverflow.com/a/44357446/1059372 – Eugene Jan 12 '18 at 08:22
  • @FedericoPeraltaSchaffner I admit I just re-read it to understand it again ))) and well, we both think something is smart(er), until this: https://stackoverflow.com/a/48225443/1059372 :) – Eugene Jan 12 '18 at 13:03
  • @FedericoPeraltaSchaffner and another confession - the idea is not mine. It's Tagir Valeev actually. In java-9 he added this `(left, right) -> { if (left.size() < right.size()) { right.addAll(left); return right; } else { left.addAll(right); return left; } }` does not look like much and neither very cleaver, but it is – Eugene Jan 12 '18 at 13:08
  • @FedericoPeraltaSchaffner he is not, still at IntelliJ; but was granted committer rights for the enormous work in Streams API and StreamEx. If only you would understand russian - he has amazing talks on Stream API... – Eugene Jan 12 '18 at 13:11
2

If you're open to using RxJava, you could use its buffer capability:

Stream<String> stream = Files.lines(Paths.get("file.txt"))

Observable.fromIterable(stream::iterator)
          .buffer(4)                      // Observable<List<String>>
          .map(x -> String.join(", ", x)) // Observable<String>
          .forEach(System.out::println);

buffer creates an Observable that collects elements in lists of a certain size. In the above example, I added another transformation via map to make the list more print-friendly, but you can transform the Observable as you see fit. For example, if you had a method processChunk that took as an argument a List<String> and returned a String, you could do:

Observable<String> fileObs =
    Observable.fromIterable(stream::iterator)
              .buffer(4)
              .map(x -> processChunk(x));
Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54