0

I need to sort large binary file of size M, using t threads. Records in file are all equal size. The task explicitly says that the amount of memory I can allocate is m, and is much smaller than M. Also hard drive is guaranteed to have at least 2 * M free space. This calls for merge sort ofc, but turned out it's not so obvious. I see three different approaches here:

A. Map files input, temp1 and temp2 into memory. Perform merge sort input -> temp1 -> temp2 -> temp1 ... until one of temps sorted. Threads only contend for selecting next portion of work , no contention on read/write.

B. fopen 3 files t times each, each thread gets 3 FILE pointers, one per file. Again they contend only for next portion of work, reads and writes should work in parallel.

C. fopen 3 files one time each, keep them under mutexes, all threads work in parallel but to grab more work or to read or to write they lock respective mutex.

Notes:

In real life I would choose A for sure. But doesn't it defeat the whole purpose of having limited buffer? (In other words isn't it cheating?). With such approach I can even radix sort whole file in place without extra buffer. Also this solution is Linux-specific, I think Linux is implied from conversation, but it's not stated explicitly in task description.

Regarding B, I think it works on Linux but isn't portable, see Linux note above.

Regarding C, it's portable but I am not sure how to optimize it (e.g. 8 threads with small enough m will just bump waiting their turn in queue, then read/write tiny portion of data, then instantly sort it and bump into each other again. IMO unlikely to work faster than 1 thread).

Questions:

  1. Which solution is a better match for the task?
  2. Which solution is a better design in real life (assuming Linux)?
  3. Does B work? In other words is opening file multiple times and writing in parallel (to different parts of it) legal?
  4. Any alternative approaches?
Uprooted
  • 941
  • 8
  • 21

3 Answers3

2

Your question has many facets, so I will try to break it down a bit, while trying to answer almost all of your questions:

  • You are given a large file on a storage device that probably operates on blocks, i.e. you can load and store many entries at the same time. If you access a single entry from storage, you have to deal with rather large access latency which you can only try to hide by loading many elements at the same time thus amortizing the latency over all element load times.

  • Your main memory is quite fast compared to the storage (especially for random access), so you want to keep as much data in main memory as possible and only read and write sequential blocks on the storage. This is also the reason why A is not really cheating, since if you tried to use your storage for random access, you would be waaay slower than using main memory.

Combining these results, you can arrive at the following approach, which is basically A but with some engineering details that are usually used in external algorithms.

  • Use only a single dedicated thread for reading and writing on the storage. This way, you need only one file descriptor for every file and could in theory even collect and reorder read and write requests from all threads within a small timeframe to get nearly sequential access patterns. Additionally, your threads can just queue a write request and continue with the next block without waiting for the IO to finish.

  • Load t blocks (from input) into main memory of a maximum size such that you can run mergesort in parallel on each of these blocks. After the blocks are sorted, write them onto the storage as temp1.
    Repeat this until all blocks in the file have been sorted.

  • Now do a so-called multiway merge on the sorted blocks: Every thread loads a certain number k of consecutive blocks from temp1 into memory and merges them using a priority queue or tournament tree to find the next minimum to be inserted into the resulting block. As soon as your block is full, you write it onto your storage at temp2 to free up memory for the next block. After this step, conceptually swap temp1 and temp2

  • You still need to do several merge steps, but this number is down by a factor of log k compared to regular two-way merges you probably meant in A. After the first few merge steps, your blocks will probably be too large to fit into main memory, so you split them into smaller blocks and, starting from the first small block, fetch the next block only when all of the previous elements have already been merged. Here, you might even be able to do some prefetching since the order of block accesses is predetermined by the block minima, but this is probably outside the scope of this question. Note that the value for k is usually only limited by available memory.

  • Finally, you arrive at t huge blocks which need to be merged together. I don't really know if there is a nice parallel approach to this, it might be necessary to just merge them sequentially, so again you can work with a t-way merge as above to result in a single sorted file.

Tobias Ribizel
  • 5,331
  • 1
  • 18
  • 33
  • If you are looking for a more detailed description and analysis, the most fitting one I could find with a quick search was in the following paper by [Sanders and Dementiev](http://algo2.iti.kit.edu/dementiev/files/DS03.pdf), although it uses a more general hardware model (parallel disks). – Tobias Ribizel Apr 01 '18 at 20:29
0

Gnu sort is a multi-threaded merge sort for text files, but it's basic features could be used here. Define a "chunk" as the number of records that can be sorted in memory of size m.

Sort phase: for each "chunk" of records, read a "chunk" of records, use a multi-threaded sort on the "chunk" then write a "chunk" of records to a temp file, ended up with ceiling(M / m) temp files. Gnu sort sorts an array of pointers to records, partially because the records are variable length. For fixed size records, in my testing, due to cache issues, it's faster to sort records directly rather than sort an array of pointers to records (which results in cache unfriendly random access of records), unless record size is greater than somewhere between 128 and 256 bytes.

Merge phase: perform single threaded k-way merges (such as priority queue) on the temp files until a single file is produced. Multi-threading doesn't help here since it's assumed that the k-way merge phase is I/O bound and not cpu bound. For Gnu sort the default for k is 16 (it does 16-way merges on the temp files).

To keep from exceeding 2 x M space, files will need to be deleted once they have been read.

rcgldr
  • 27,407
  • 3
  • 36
  • 61
0

If your file is way bigger than your RAM size then This is the solution. https://stackoverflow.com/a/49839773/1647320

If your file size is 70-80% of your RAM size then following is the solution. It's in-memory parallel merge sort.

enter image description here

Change this lines according to your system . fpath is your one big input file. shared path is where the execution log is stored.fdir is where the intermediate files will be stored and merged. Change these paths according to your machine.

public static final String fdir = "/tmp/";
    public static final String shared = "/exports/home/schatterjee/cs553-pa2a/";
    public static final String fPath = "/input/data-20GB.in";
    public static final String opLog = shared+"Mysort20GB.log";

Then run the following programme. Your final sorted file will be created with the name op2GB in fdir path. the last line Runtime.getRuntime().exec("valsort " + fdir + "op" + (treeHeight*100)+1 + " > " + opLog); checks the output is sorted or not . Remove this line if you dont have valsort installed in your machine or the input file is not generated using gensort(http://www.ordinal.com/gensort.html) .

Also, don't forget to change int totalLines = 20000000; to the total lines in your file. and thread count (int threadCount = 8) should be always in power of 2.

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.stream.Stream;


class SplitJob extends Thread {
    LinkedList<String> chunkName;
    int startLine, endLine;

    SplitJob(LinkedList<String> chunkName, int startLine, int endLine) {
        this.chunkName = chunkName;
        this.startLine = startLine;
        this.endLine = endLine;
    }

    public void run() {
        try {
            int totalLines = endLine + 1 - startLine;
            Stream<String> chunks =
                    Files.lines(Paths.get(Mysort2GB.fPath))
                            .skip(startLine - 1)
                            .limit(totalLines)
                            .sorted(Comparator.naturalOrder());
            chunks.forEach(line -> {
                chunkName.add(line);
            });
            System.out.println(" Done Writing " + Thread.currentThread().getName());

        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

class MergeJob extends Thread {
    int list1, list2, oplist;
    MergeJob(int list1, int list2, int oplist) {
        this.list1 = list1;
        this.list2 = list2;
        this.oplist = oplist;
    }

    public void run() {
        try {
            System.out.println(list1 + " Started Merging " + list2 );
            LinkedList<String> merged = new LinkedList<>();
            LinkedList<String> ilist1 = Mysort2GB.sortedChunks.get(list1);
            LinkedList<String> ilist2 = Mysort2GB.sortedChunks.get(list2);

            //Merge 2 files based on which string is greater.
            while (ilist1.size() != 0 || ilist2.size() != 0) {
                if (ilist1.size() == 0 ||
                        (ilist2.size() != 0 && ilist1.get(0).compareTo(ilist2.get(0)) > 0)) {
                    merged.add(ilist2.remove(0));
                } else {
                    merged.add(ilist1.remove(0));
                }
            }
            System.out.println(list1 + " Done Merging " + list2 );
            Mysort2GB.sortedChunks.remove(list1);
            Mysort2GB.sortedChunks.remove(list2);
            Mysort2GB.sortedChunks.put(oplist, merged);
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

public class Mysort2GB {
    //public static final String fdir = "/Users/diesel/Desktop/";
    public static final String fdir = "/tmp/";
    public static final String shared = "/exports/home/schatterjee/cs553-pa2a/";
    public static final String fPath = "/input/data-2GB.in";
    public static HashMap<Integer, LinkedList<String>> sortedChunks = new HashMap();
    public static final String opfile = fdir+"op2GB";
    public static final String opLog = shared + "mysort2GB.log";


    public static void main(String[] args) throws Exception{
        long startTime = System.nanoTime();
        int threadCount = 8; // Number of threads
        int totalLines = 20000000;
        int linesPerFile = totalLines / threadCount;
        LinkedList<Thread> activeThreads = new LinkedList<Thread>();


        for (int i = 1; i <= threadCount; i++) {
            int startLine = i == 1 ? i : (i - 1) * linesPerFile + 1;
            int endLine = i * linesPerFile;
            LinkedList<String> thisChunk = new LinkedList<>();
            SplitJob mapThreads = new SplitJob(thisChunk, startLine, endLine);
            sortedChunks.put(i,thisChunk);
            activeThreads.add(mapThreads);
            mapThreads.start();
        }
        activeThreads.stream().forEach(t -> {
            try {
                t.join();
            } catch (Exception e) {
            }
        });

        int treeHeight = (int) (Math.log(threadCount) / Math.log(2));

        for (int i = 0; i < treeHeight; i++) {
            LinkedList<Thread> actvThreads = new LinkedList<Thread>();
            for (int j = 1, itr = 1; j <= threadCount / (i + 1); j += 2, itr++) {
                int offset = i * 100;
                int list1 = j + offset;
                int list2 = (j + 1) + offset;
                int opList = itr + ((i + 1) * 100);
                MergeJob reduceThreads =
                        new MergeJob(list1,list2,opList);
                actvThreads.add(reduceThreads);
                reduceThreads.start();
            }
            actvThreads.stream().forEach(t -> {
                try {
                    t.join();
                } catch (Exception e) {
                }
            });
        }
        BufferedWriter writer = Files.newBufferedWriter(Paths.get(opfile));
        sortedChunks.get(treeHeight*100+1).forEach(line -> {
            try {
                writer.write(line+"\r\n");
            }catch (Exception e){

            }
        });
        writer.close();
        long endTime = System.nanoTime();
        double timeTaken = (endTime - startTime)/1e9;
        System.out.println(timeTaken);
        BufferedWriter logFile = new BufferedWriter(new FileWriter(opLog, true));
        logFile.write("Time Taken in seconds:" + timeTaken);
        Runtime.getRuntime().exec("valsort  " + opfile + " > " + opLog);
        logFile.close();
    }
}


  [1]: https://i.stack.imgur.com/5feNb.png
sapy
  • 8,952
  • 7
  • 49
  • 60