0

For a homework assignment, I need to implement external sorting such that I can sort a 10GB file with 1GB physical memory. Currently, I'm using a BufferedReader on the large file and constructing/sorting the smaller files sequentially. Then in the merge step, I have BufferedReaders open for all small files and a single BufferedWriter for the large final file where I write to the large file using the merge k sorted lists algorithm with a PriorityQueue. This works, but it needs to be faster (take half as much time to be exact).

The entire splitting step happens sequentially and the entire merging step also happens sequentially. I think I can at least split and sort the files in parallel using multiple threads with different virtual memory spaces. Then the memory used is mostly memory-mapped files and the OS will take care of optimally paging in and out data from physical memory. I was wondering if there was a way for Java to do this using parallel streams. Something along the lines of:

largeFile.splitInParallel(100000)
.lines()
.map((s) -> new LineObject(s))
.sorted()
.forEach(writeSmallFileToDisk)

where the argument to splitInParallel is the number of lines I want in the smaller files. Any help is appreciated, thanks!

EDIT: My code is

public class Main {

    private static final int BUFFER_SIZE = 10_000_000;

    /**
     * A main method to run examples.
     *
     * @param args not used
     */
    public static void main(String[] args) throws IOException {
        System.out.println("Starting...");
        String file = args[0];
        int batchSize = Integer.parseInt(args[1]);;

        try {
            FileInputStream fin = new FileInputStream(file);
            BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
            BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);

            int lineNumber = 0;
            int batchId = 0;
            String line;
            TaxiEntry[] batch = new TaxiEntry[batchSize];
            int i = 0;
            while ((line = br.readLine()) != null) {
                TaxiEntry taxiEntry = parseLine(line);
                batch[i++] = taxiEntry;
                lineNumber++;
                if (lineNumber % batchSize == 0) {
                    String outputFileName = String.format("batches/batch_%d.txt", batchId);
                    BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
                    Arrays.parallelSort(batch);
                    for (int j = 0; j < i; j++) {
                        bf.write(batch[j].toString());
                        if (j != i) {
                            bf.newLine();
                        }
                    }
                    batchId++;
                    i = 0;
                    bf.flush();
                }
            }

            String outputFileName = String.format("batches/batch_%d.txt", batchId);
            BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
            Arrays.parallelSort(batch, 0, i);
            for (int j = 0; j < i; j++) {
                bf.write(batch[j].toString());
                if (j != i) {
                    bf.newLine();
                }
            }
            batchId++;
            bf.flush();

            System.out.println("Processed " + lineNumber + " lines");
            merge(batchId);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void merge(int numBatches) throws IOException {
        System.out.println("Starting merge...");
        // Open readers
        BufferedReader[] readers = new BufferedReader[numBatches];
        for (int i = 0; i < numBatches; i++) {
            String file = String.format("batches/batch_%d.txt", i);
            FileInputStream fin = new FileInputStream(file);
            BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
            BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
            readers[i] = br;
        }
        
        // Merge
        String outputFileName = "result/final.txt";
        BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);

        PriorityQueue<IndexedTaxiNode> curEntries = new PriorityQueue<>();
        for (int i = 0; i < numBatches; i++) {
            BufferedReader reader = readers[i];
            String next = reader.readLine();
            if (next != null) {
                TaxiEntry curr = parseLine(next);
                curEntries.add(new IndexedTaxiNode(curr, i));
            }
        }

        while (!curEntries.isEmpty()) {
            // get max from curEntries
            IndexedTaxiNode maxNode = curEntries.remove();
            bf.write(maxNode.toString());
            bf.newLine();

            int index = maxNode.index;
            String next = readers[index].readLine();
            if (next != null) {
                TaxiEntry newEntry = parseLine(next);
                curEntries.add(new IndexedTaxiNode(newEntry, index));
            }
        }

        bf.flush();
    }

    public static TaxiEntry parseLine(String line) {
        return new TaxiEntry(line, Double.parseDouble(line.split(",")[16]));
    }

}
Badr B
  • 998
  • 1
  • 9
  • 17
  • 2
    Seems like you need *external sorting* rather than sorting via parallel streams. Have a look at [*this question*](https://stackoverflow.com/questions/7918060/how-do-i-sort-very-large-files). – Alexander Ivanchenko Sep 03 '22 at 18:38
  • @AlexanderIvanchenko Yeah that's what I have right now and everything runs on a single thread. It'll be nice if I could include some parallelism where I can use multiple threads on both CPU cores in order to achieve the time reduction required for this assignment. – Badr B Sep 03 '22 at 18:45
  • 1
    It’s not clear whether you really understood the issue. Your pseudo code still is sorting everything as a whole in memory, *before* writing chunks into smaller files, which is entirely pointless. The task definition itself rules out parallel processing. The point of splitting the file into chunks of size X is that there’s only memory for processing one X, hence you can’t process more than one X at a time. Because there’s not enough memory for that. – Holger Sep 05 '22 at 07:58
  • Which phase is taking the most time for you? Is it the splitting and sorting phase, or the merging phase? – mpette Sep 05 '22 at 11:10
  • @Holger I was wondering if Java had some implementation that used thread-private memory-mapped files that the OS automatically optimizes with paging. Yes this would still require disk IO for paging, but I heard that using memory-mapped files was a lot faster than manually reading from disk and allocating on the heap and I can also adjust the batch size to be smaller such that multiple batches can be computed in parallel. – Badr B Sep 05 '22 at 23:45
  • @mpette It's the merging phase. I don't have exact metrics, but it's considerably slower than batching the data, sorting it, and writing it. – Badr B Sep 05 '22 at 23:46
  • 2
    Using memory mapped files does not affect the performance much, as long as you still create `String` instances and even wrap them into another object, `LineObject`, as the code snippet suggests. You’d have to change your program logic to operate on the memory mapped file directly, e.g. by assuming a particular charset encoding. See [this answer](https://stackoverflow.com/a/25566876/2711488) or [that answer](https://stackoverflow.com/a/52062570/2711488), for example. Since JDK 11, there’s a method for comparing `CharSequence` but even `ByteBuffer`s can be compared which would work for UTF-8. – Holger Sep 06 '22 at 07:48
  • Can you share your current code? – Alexander Ivanchenko Sep 06 '22 at 14:44
  • @AlexanderIvanchenko Yep just updated the post! – Badr B Sep 08 '22 at 03:16
  • Your buffer size is quite big at `10_000_000`. When you do the merging phase, how many separate files do you have? Multiply the number of files by 10_000_000 and that's how much memory the buffers will occupy. Perhaps you need two different buffer sizes. – mpette Sep 08 '22 at 23:40

1 Answers1

1

Doing some timings. I found that the time to read from disk and the time to do a sort are similar order of magnitude.

System.out.println("Begin loading file");
// do loading stuff
System.out.format("elapsed %.03f ms%n%n", (finishTime - startTime) / 1e6);

System.out.println("Sorting lines");
// do sorting stuff
System.out.format("elapsed %.03f ms%n", (finishTime - startTime) / 1e6);

Console output is:

Begin loading file
elapsed 918.933 ms

Sorting lines
elapsed 1360.896 ms

I used a modest file of about 150 MB for the timings. It might not be a good idea to have lots of threads all reading from disk at the same time.

My suggestion for what it's worth is to have one thread that does all of the disk reading, and another thread that concurrently does sorting. I could only see a way to do this for the splitting and sorting phase.

Separate threads for disk and sort

For the splitting phase, you cannot read all the segments in one go because that would consume too much memory. So you read a few segments, write a few, read a few, and so on. The idea of this interleaving, is to ensure the disk is continuously kept busy, by delegating the sorting operation to another thread. Hopefully by the time the disk is ready to write a segment the sort on that segment has completed so the disk never has to wait.

List<String> lines = new ArrayList<>();
int i = 0;
while (someCondition()) {
    String line = reader.readLine();
    lines.add(line);
    if (lines.size() == BATCH_SIZE) {
        sendMsgToWorker(lines); // send to worker thread
        if (i == MAX_MESSAGE_QUEUE - 1) {
            for (int j = 0; j < MAX_MESSAGE_QUEUE; j++) {
                List<String> sortedLines = waitForLineFromWorker(); // wait for worker thread
                writeTmpFile(sortedLines);
            }
        }
        lines = new ArrayList<>();
        i = (i + 1) % MAX_MESSAGE_QUEUE;
    }
}

An outline for the splitting and sorting phase is shown above, without covering any edge cases. The amount of memory used would be proportional to BATCH_SIZE * MAX_MESSAGE_QUEUE.

Unfortunately, I don't see a way to apply concurrency to the phase of merging the multiple files. The disk is just the disk so cannot go any faster even with multiple threads.

You could try investigating parallel quicksort, but the problem with quicksort is choosing a pivot point so that the partitions end up a reasonable size.

mpette
  • 83
  • 5
  • I think this is a very clever way to minimize busy waiting from disk IO. Following this same logic, do you think it'll be worth it to have another thread just for writing? This way reading isn't bottlenecked by the slower writing speeds? – Badr B Sep 05 '22 at 23:52
  • 1
    Possibly, but you'd always have to wait for the write to finish so it frees up memory, to avoid going over the memory limitation. – mpette Sep 06 '22 at 11:36
  • As OP has confirmed the merging phase is much slower, it probably not worth implementing this. – mpette Sep 08 '22 at 23:34