For a homework assignment, I need to implement external sorting such that I can sort a 10GB file with 1GB physical memory. Currently, I'm using a BufferedReader on the large file and constructing/sorting the smaller files sequentially. Then in the merge step, I have BufferedReaders open for all small files and a single BufferedWriter for the large final file where I write to the large file using the merge k sorted lists algorithm with a PriorityQueue. This works, but it needs to be faster (take half as much time to be exact).
The entire splitting step happens sequentially and the entire merging step also happens sequentially. I think I can at least split and sort the files in parallel using multiple threads with different virtual memory spaces. Then the memory used is mostly memory-mapped files and the OS will take care of optimally paging in and out data from physical memory. I was wondering if there was a way for Java to do this using parallel streams. Something along the lines of:
largeFile.splitInParallel(100000)
.lines()
.map((s) -> new LineObject(s))
.sorted()
.forEach(writeSmallFileToDisk)
where the argument to splitInParallel
is the number of lines I want in the smaller files. Any help is appreciated, thanks!
EDIT: My code is
public class Main {
private static final int BUFFER_SIZE = 10_000_000;
/**
* A main method to run examples.
*
* @param args not used
*/
public static void main(String[] args) throws IOException {
System.out.println("Starting...");
String file = args[0];
int batchSize = Integer.parseInt(args[1]);;
try {
FileInputStream fin = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
int lineNumber = 0;
int batchId = 0;
String line;
TaxiEntry[] batch = new TaxiEntry[batchSize];
int i = 0;
while ((line = br.readLine()) != null) {
TaxiEntry taxiEntry = parseLine(line);
batch[i++] = taxiEntry;
lineNumber++;
if (lineNumber % batchSize == 0) {
String outputFileName = String.format("batches/batch_%d.txt", batchId);
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
Arrays.parallelSort(batch);
for (int j = 0; j < i; j++) {
bf.write(batch[j].toString());
if (j != i) {
bf.newLine();
}
}
batchId++;
i = 0;
bf.flush();
}
}
String outputFileName = String.format("batches/batch_%d.txt", batchId);
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
Arrays.parallelSort(batch, 0, i);
for (int j = 0; j < i; j++) {
bf.write(batch[j].toString());
if (j != i) {
bf.newLine();
}
}
batchId++;
bf.flush();
System.out.println("Processed " + lineNumber + " lines");
merge(batchId);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void merge(int numBatches) throws IOException {
System.out.println("Starting merge...");
// Open readers
BufferedReader[] readers = new BufferedReader[numBatches];
for (int i = 0; i < numBatches; i++) {
String file = String.format("batches/batch_%d.txt", i);
FileInputStream fin = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
readers[i] = br;
}
// Merge
String outputFileName = "result/final.txt";
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
PriorityQueue<IndexedTaxiNode> curEntries = new PriorityQueue<>();
for (int i = 0; i < numBatches; i++) {
BufferedReader reader = readers[i];
String next = reader.readLine();
if (next != null) {
TaxiEntry curr = parseLine(next);
curEntries.add(new IndexedTaxiNode(curr, i));
}
}
while (!curEntries.isEmpty()) {
// get max from curEntries
IndexedTaxiNode maxNode = curEntries.remove();
bf.write(maxNode.toString());
bf.newLine();
int index = maxNode.index;
String next = readers[index].readLine();
if (next != null) {
TaxiEntry newEntry = parseLine(next);
curEntries.add(new IndexedTaxiNode(newEntry, index));
}
}
bf.flush();
}
public static TaxiEntry parseLine(String line) {
return new TaxiEntry(line, Double.parseDouble(line.split(",")[16]));
}
}