13

I want to use a Stream to parallelize processing of a heterogenous set of remotely stored JSON files of unknown number (the number of files is not known upfront). The files can vary widely in size, from 1 JSON record per file up to 100,000 records in some other files. A JSON record in this case means a self-contained JSON object represented as one line in the file.

I really want to use Streams for this and so I implemented this Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

The problem I'm having is that while the Stream parallelizes beautifully at first, eventually the largest file is left processing in a single thread. I believe the proximal cause is well documented: the spliterator is "unbalanced".

More concretely, appears that the trySplit method is not called after a certain point in the Stream.forEach's lifecycle, so the extra logic to distribute small batches at the end of trySplit is rarely executed.

Notice how all the spliterators returned from trySplit share the same paths iterator. I thought this was a really clever way to balance the work across all spliterators, but it hasn't been enough to achieve full parallelism.

I would like the parallel processing to proceed first across files, and then when few large files are still left spliterating, I want to parallelize across chunks of the remaining files. That was the intent of the else block at the end of trySplit.

Is there an easy / simple / canonical way around this problem?

Alex R
  • 11,364
  • 15
  • 100
  • 180
  • 2
    You need a size estimate. It can be totally bogus, as long as it roughly reflects the ratio of your unbalanced split. Otherwise, the stream doesn’t know that the splits are unbalanced and will stop once a certain number of chunks has been created. – Holger Oct 29 '19 at 09:55
  • @Holger can you elaborate on "will stop once a certain number of chunks has been created" or point me at JDK source for this? What is the number of chunks where it stops? – Alex R Oct 29 '19 at 15:39
  • The code is irrelevant, as it would show too many irrelevant implementation details, which could change at any time. The relevant point is, that the implementation tries to call split often enough, so that every worker thread (adjusted towards the number of CPU cores) has something to do. To compensate unpredictable differences in the computing time, it will likely produce even more chunks than worker threads to allow work-stealing and use the estimated sizes as heuristic (e.g. to decide which sub spliterator to split further). See also https://stackoverflow.com/a/48174508/2711488 – Holger Oct 29 '19 at 16:04
  • I did some experiments to try to understand your comment. The heuristics seem to be quite primitive. It looks like, returning `Long.MAX_VALUE` causes excessive and unnecessary splitting, while any estimate other than `Long.MAX_VALUE` causes further splitting to halt, killing parallelism. Returning a mix of accurate estimates doesn't seem to lead to any intelligent optimizations. – Alex R Nov 10 '19 at 05:41
  • I'm not claiming that the implementation's strategy was very smart, but at least, it works for some scenarios with estimated sizes (otherwise, there were far more bug reports about that). So it seems, there were some errors on your side during the experiments. E.g., in your question's code, you're extending `AbstractSpliterator` but overriding `trySplit()` which is a bad combo for anything other than `Long.MAX_VALUE`, as you are not adapting the size estimate in `trySplit()`. After `trySplit()`, the size estimate should be reduced by the number of elements that have been split off. – Holger Nov 11 '19 at 07:39

3 Answers3

1

Your trySplit should output splits of equal size, regardless of the size of the underlying files. You should treat all the files as a single unit and fill up the ArrayList-backed spliterator with the same number of JSON objects each time. The number of objects should be such that processing one split takes between 1 and 10 milliseconds: lower than 1 ms and you start approaching the costs of handing off the batch to a worker thread, higher than that and you start risking uneven CPU load due to tasks which are too coarse-grained.

The spliterator is not obliged to report a size estimate, and you are already doing this correctly: your estimate is Long.MAX_VALUE, which is a special value meaning "unbounded". However, if you have many files with a single JSON object, resulting in batches of size 1, this will hurt your performance in two ways: the overhead of opening-reading-closing the file may become a bottleneck and, if you manage to escape that, the cost of thread handoff may be significant compared to the cost of processing one item, again causing a bottleneck.

Five years ago I was solving a similar problem, you can have a look at my solution.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • Yes you are "not obliged to report a size estimate" and `Long.MAX_VALUE` is correctly describing an unknown size, but that doesn't help when the actual Stream implementation performs poorly then. Even using the result of `ThreadLocalRandom.current().nextInt(100, 100_000)` as estimated size yields better results. – Holger Oct 29 '19 at 16:11
  • It performed well for my use cases, where the computational cost of each item was substantial. I was easily achieving 98% total CPU usage and throughput scaled almost linearly with parallelism. Basically, it's important to get the batch size right so that processing it takes between 1 and 10 milliseconds. That's well above any thread handoff costs and not too long to cause task granularity issues. I have published benchmark results towards the end of [this post](https://www.airpair.com/java/posts/parallel-processing-of-io-based-data-with-java-streams). – Marko Topolnik Oct 29 '19 at 16:41
  • Your solution splits off an `ArraySpliterator` which *has* an estimated size (even an exact size). So the Stream implementation will see the array size vs `Long.MAX_VALUE`, consider this unbalanced and split the "larger" spliterator (ignoring that `Long.MAX_VALUE` means "unknown"), until it can't split further. Then, if there are not enough chunks, it will split the array based spliterators utilizing their known sizes. Yes, this works very well, but doesn't contradict my statement that you need a size estimate, regardless of how poor it is. – Holger Oct 29 '19 at 16:51
  • OK, so it seems to be a misunderstanding---because you don't need a size estimate on the input. Just on the individual splits, and you can always have that. – Marko Topolnik Oct 29 '19 at 16:57
  • Well, my [first comment](https://stackoverflow.com/questions/58601518/can-you-rebalance-an-unbalanced-spliterator-of-unknown-size/58611185?noredirect=1#comment103521117_58601518) was "*You need a size estimate. It can be totally bogus, as long as it roughly reflects the ratio of your unbalanced split.*" The key point here was that the OP's code creates another spliterator containing a single element but still reporting an unknown size. This is what makes the Stream implementation helpless. Any estimate number for the new spliterator being significantly smaller the `Long.MAX_VALUE` would do. – Holger Oct 29 '19 at 17:03
  • Maybe you're better familiar with the intimate details of that implementation, but my experience with it is that it never tries to subdivide the splits taken from an unbounded spliterator. So I don't know how the declared size matters to it. It is more probable to me that the many single-object files are a bottleneck at the open-read-close IO cycle, and the cost of thread handoff vs. the processing of a single object is unfavourable so it may also be the bottleneck. – Marko Topolnik Oct 29 '19 at 17:09
  • We can summarize the actual behavior as horrible, as it blatantly ignores the fact that `Long.MAX_VALUE` has been specified as "unknown size", which *should* be appropriate for streams that might be short, but treats it like a literal `Long.MAX_VALUE` estimate (there is no difference to, e.g. `Long.MAX_VALUE-1`, which is unsuited for all cases where the stream isn't anywhere near that number (in other words, every real life stream). – Holger Oct 29 '19 at 17:55
  • In terms of performance optimization they only cared about the known-size case. When I mentioned to the Java core team the complaint that most real-life streams are of unknown size and, even if known, aren't random-accessible, they asked me on what authority do I claim that this is the case, that their user research shows otherwise. – Marko Topolnik Oct 29 '19 at 18:56
  • @MarkoTopolnik your `FixedBatchSpliteratorWrapper` is not directly usable for me (because `trySplit` method would be too blocking and slow, taking a long time to ramp up all CPUs), but it gives me an idea of something to try. – Alex R Oct 30 '19 at 03:01
  • Can you explain what would the bottleneck be? – Marko Topolnik Oct 30 '19 at 05:43
  • @MarkoTopolnik because the files are stored in a Cloud filesystem, there is a long latency in opening each file. So if the opening of files occurs in `trySplit`, there is a long delay in ramping up the computation. The opening of files should be deferred to the `tryAdvance` method, which is called in worker threads, instead of blocking the calling thread in `trySplit`. My implementation actually handles this initial ramp very well, because it hands off a separate file to each thread (that's what the top half of `trySplit` is doing). My problem is the parallelism dies off after initial ramp. – Alex R Oct 30 '19 at 06:06
  • If your cloud file operations are blocking, you have a mismatch between the optimum for getting the data vs. the optimum for processing it (assuming the processing itself is CPU-heavy). I think you should have file opening and buffering in a separate step ahead of the Stream pipeline, using an elastic thread pool that will use many more threads than cores for the blocking operations. Then you can stream the data into the computational part of the pipeline. – Marko Topolnik Oct 30 '19 at 06:47
  • What’s puzzling me the most is that any attempt to move some of the splitting into the tryAdvance (by synchronizing on shared collections) leads to a deadlock! – Alex R Oct 30 '19 at 19:14
  • Have you found out the cause of this? Do you have critical sections covered by more than one lock? – Marko Topolnik Oct 31 '19 at 16:48
  • I ended up discarding that particular branch of code and rewriting to utilize the file sizes available in my metadata. I got it working but the solution wasn't what I expected. – Alex R Nov 10 '19 at 01:41
0

After much experimentation, I was still not able to get any added parallelism by playing with the size estimates. Basically, any value other than Long.MAX_VALUE will tend to cause the spliterator to terminate too early (and without any splitting), while on the other hand a Long.MAX_VALUE estimate will cause trySplit to be called relentlessly until it returns null.

The solution I found is to internally share resources among the spliterators and let them rebalance amongst themselves.

Working code:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R
  • 11,364
  • 15
  • 100
  • 180
-1

This is not a direct answer to your question. But I think it is worth a try with Stream in library abacus-common:

void test_58601518() throws Exception {
    final File tempDir = new File("./temp/");

    // Prepare the test files:
    //    if (!(tempDir.exists() && tempDir.isDirectory())) {
    //        tempDir.mkdirs();
    //    }
    //
    //    final Random rand = new Random();
    //    final int fileCount = 1000;
    //
    //    for (int i = 0; i < fileCount; i++) {
    //        List<String> lines = Stream.repeat(TestUtil.fill(Account.class), rand.nextInt(1000) * 100 + 1).map(it -> N.toJSON(it)).toList();
    //        IOUtil.writeLines(new File("./temp/_" + i + ".json"), lines);
    //    }

    N.println("Xmx: " + IOUtil.MAX_MEMORY_IN_MB + " MB");
    N.println("total file size: " + Stream.listFiles(tempDir).mapToLong(IOUtil::sizeOf).sum() / IOUtil.ONE_MB + " MB");

    final AtomicLong counter = new AtomicLong();
    final Consumer<Account> yourAction = it -> {
        counter.incrementAndGet();
        it.toString().replace("a", "bbb");
    };

    long startTime = System.currentTimeMillis();
    Stream.listFiles(tempDir) // the file/data source could be local file system or remote file system.
            .parallel(2) // thread number used to load the file/data and convert the lines to Java objects.
            .flatMap(f -> Stream.lines(f).map(line -> N.fromJSON(Account.class, line))) // only certain lines (less 1024) will be loaded to memory. 
            .parallel(8) // thread number used to execute your action. 
            .forEach(yourAction);

    N.println("Took: " + ((System.currentTimeMillis()) - startTime) + " ms" + " to process " + counter + " lines/objects");

    // IOUtil.deleteAllIfExists(tempDir);
}

Till end, the CPU usage on my laptop is pretty high(about 70%), and it took about 70 seconds to process 51,899,100 lines/objects from 1000 files with Intel(R) Core(TM) i5-8365U CPU and Xmx256m jvm memory. Total file size is about: 4524 MB. if yourAction is not a heavy operation, sequential stream could be even faster than parallel stream.

F.Y.I I'm the developer of abacus-common

user_3380739
  • 1
  • 14
  • 14