6

I am trying to read a very big file using Java. That big file will have data like this, meaning each line will have an user id.

149905320
1165665384
66969324
886633368
1145241312
286585320
1008665352

And in that big file there will be around 30Million user id's. Now I am trying to read all the user id's one by one from that big file only once. Meaning each user id should be selected only once from that big file. For example, if I have 30Million user id's then it should print 30 Million user id only once with the use of Multithreading code.

Below is the code I have which is a multithreaded code running with 10 threads but with the below program, I am not able to make sure that each user id is selected only once.

public class ReadingFile {


    public static void main(String[] args) {

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            service.submit(new FileTask());
        }
    }
}

class FileTask implements Runnable {

    @Override
    public void run() {

        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
                //do things with line
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

Can anybody help me with this? What wrong I am doing? And what is the fastest way to do this?

arsenal
  • 23,366
  • 85
  • 225
  • 331
  • 1
    This code seems to send a thread to read the file each time. If there is ten threads in your pool, it seems to me you would want to split up the file into ten different sections, and give each thread a section, and then combine it at the end. Do you know of any packages that would give you random access to anywhere in the file? – nook Jun 20 '13 at 18:20
  • possible duplicate of [NumberFormatException while selecting random elements from a big file](http://stackoverflow.com/questions/17206864/numberformatexception-while-selecting-random-elements-from-a-big-file) – FThompson Jun 20 '13 at 18:20
  • 8
    You can't read from a file in a multithreaded way. This is a very bad idea. The constraining factor is IO. More threads will _slow things down_ not speed things up. Use one thread. – Boris the Spider Jun 20 '13 at 18:21
  • Please don't duplicate the question do some amendments. – sunleo Jun 20 '13 at 18:23
  • 2
    The previous question I was working on a different problem. In this I am trying to read all the id's line by line only once. – arsenal Jun 20 '13 at 18:24

3 Answers3

18

You really can't improve on having one thread reading the file sequentially, assuming that you haven't done anything like stripe the file across multiple disks. With one thread, you do one seek and then one long sequential read; with multiple threads you're going to have the threads causing multiple seeks as each gains control of the disk head.

Edit: This is a way to parallelize the line processing while still using serial I/O to read the lines. It uses a BlockingQueue to communicate between threads; the FileTask adds lines to the queue, and the CPUTask reads them and processes them. This is a thread-safe data structure, so no need to add any synchronization to it. You're using put(E e) to add strings to the queue, so if the queue is full (it can hold up to 200 strings, as defined in the declaration in ReadingFile) the FileTask blocks until space frees up; likewise you're using take() to remove items from the queue, so the CPUTask will block until an item is available.

public class ReadingFile {
    public static void main(String[] args) {

        final int threadCount = 10;

        // BlockingQueue with a capacity of 200
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(200);

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < (threadCount - 1); i++) {
            service.submit(new CPUTask(queue));
        }

        // Wait til FileTask completes
        service.submit(new FileTask(queue)).get();

        service.shutdownNow();  // interrupt CPUTasks

        // Wait til CPUTasks terminate
        service.awaitTermination(365, TimeUnit.DAYS);

    }
}

class FileTask implements Runnable {

    private final BlockingQueue<String> queue;

    public FileTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                // block if the queue is full
                queue.put(line);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

class CPUTask implements Runnable {

    private final BlockingQueue<String> queue;

    public CPUTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String line;
        while(true) {
            try {
                // block if the queue is empty
                line = queue.take(); 
                // do things with line
            } catch (InterruptedException ex) {
                break; // FileTask has completed
            }
        }
        // poll() returns null if the queue is empty
        while((line = queue.poll()) != null) {
            // do things with line;
        }
    }
}
Zim-Zam O'Pootertoot
  • 17,888
  • 4
  • 41
  • 69
  • That makes sense. So you are saying, I should be doing reading the file with single thread right? But if that file is big enough then it is going to take lot of time right? – arsenal Jun 20 '13 at 18:25
  • 2
    @TechGeeky That's correct. However, you can speed up your I/O by doing something like using [Hadoop](http://en.wikipedia.org/wiki/Apache_Hadoop), which distributes the file across multiple disks/machines which then lets you do parallel I/O. – Zim-Zam O'Pootertoot Jun 20 '13 at 18:30
  • @TechGeeky yup. Buy a faster hard drive. – Boris the Spider Jun 20 '13 at 18:31
  • Any idea, how much time will it take to read the file line by line whose size is 332MB and that file contains around 30Million user id's. Each line will have one user id. How do I calculate that time? I have already started the program with one thread and it is still running. But I just need to make a guess on that how much time it will take in general. – arsenal Jun 20 '13 at 22:38
  • @TechGeeky Depends in your hard drive specs, but 332MB is nothing - even with a USB drive you could read that in a few seconds. What exactly is "do things with line" in your program above? Depending on its complexity, it may be worth parallelizing the line processing (but not the line I/O). – Zim-Zam O'Pootertoot Jun 20 '13 at 22:49
  • As I said, each line contains an user id. So I will read each line and get the user id from that and use that user id to make a call to database and see whether that user id exists in the database or not. If it doesn't exists then increment the counter and log it. And then again read another line, read the user id from that line, make another call to database and do the same thing. – arsenal Jun 20 '13 at 22:52
  • 1
    @TechGeeky In that case you can probably benefit from parallelizing line **processing**, just not from line **I/O**. See the edit to my answer for an example of how to do this. – Zim-Zam O'Pootertoot Jun 20 '13 at 23:12
  • Can you also add little bit of explanation about the program to help me understand more? Thanks for the help. – arsenal Jun 20 '13 at 23:18
  • @TechGeeky Sure, I've added some documentation, let me know if you have any questions about any specific methods I'm using. – Zim-Zam O'Pootertoot Jun 20 '13 at 23:30
  • One quick question on this. It will make sure that I will be reading each user id only once. Right? – arsenal Jun 21 '13 at 00:34
  • So suppose if I have `1000 user id` in a text file. So it will read all these `1000 user id` only once. Right? – arsenal Jun 21 '13 at 00:45
  • @TechGeeky Assuming each id only appears once in the file, then yes, each id will be read and processed only once. `FileTask` reads a line and puts it in the queue, and a `CPUTask` removes the line (with `poll`) and processes it; the queue is thread-safe, so even if multiple `CPUTasks` call `poll` at the same time, any given line will only be removed by one of them. – Zim-Zam O'Pootertoot Jun 21 '13 at 03:34
  • @Zim-ZamO'Pootertoot it's usually better to block indefinitely and use interrupts to stop the `Thread`s when you're done - that is what the interrupt is for after all. This is _verging_ on busy waiting. Use a `Future` or the `ExecutorService` itself to signal shutdown, no need for the `AtomicBoolean`. If you use Guava then you can use a `ListeningExecutorService` and then things are even neater. – Boris the Spider Jun 21 '13 at 10:51
  • @Boris the Spider I've edited my answer, let me know if this is what you were talking about – Zim-Zam O'Pootertoot Jun 21 '13 at 13:36
  • @Zim-ZamO'Pootertoot much better! – Boris the Spider Jun 21 '13 at 15:49
  • @Zim-ZamO'Pootertoot Why do we need both `queue.take()` and `queue.poll()` in the `CPUTask` class? – syfantid Feb 10 '16 at 19:20
  • @Sofia `service.shutdownNow()` in the `main` method will interrupt `queue.take()` in `CPUTask.run()` and break out of the infinite `while` loop; the `poll` loop then ensures that the queue is empty before `CPUTask.run()` exits. – Zim-Zam O'Pootertoot Feb 10 '16 at 19:26
  • So, the code that processes the line must be used identical both after `queue.take()` and `queue.poll()`? – syfantid Feb 10 '16 at 19:28
  • @Sofia Yup, to avoid code duplication wrap it in a function. Alternatively, create a boolean `isDone` variable initialized to false, and replace the `break` in the `catch` block with `isDone = true`, and replace `line = queue.take()` with `line = isDone ? queue.poll() : queue.take()` and break out of the infinite `while` loop if `line == null`; then you can eliminate the `poll` loop – Zim-Zam O'Pootertoot Feb 10 '16 at 19:48
  • @Zim-ZamO'Pootertoot Thanks! ;) – syfantid Feb 10 '16 at 20:08
  • @Zim-ZamO'Pootertoot Also, one more question. Imagine that all the code in the ReadingFile class (after the threadCount initialization) needs to be in a `while` loop for multiple file reads. This means that the Thread pool will be initialized and consequently shutdown in every loop, which takes some time. Can I declare the `ExecutorService` outside the while loop and shutdown after the loop as well? Or won't it work? I'm really new to the threads logic, so I'm very grateful for your help! – syfantid Feb 11 '16 at 17:16
  • @Sofia You can declare it outside of the loop, but in that case create a `List>` out of the CPUTasks that you're submitting, and call `interrupt()` on all of them when the FileTask completes. Previously `shutdown` was calling `interrupt` on these tasks, but if you're moving the `shutdown` outside of the loop then you'll need to manually call `interrupt` on the tasks – Zim-Zam O'Pootertoot Feb 11 '16 at 17:43
0

We are talking about an average of a 315 MB file with lines separated by new line. I presume this easily fits into memory. It is implied that there is no particular order in the user names that has to be conserved. So I would recommend the following algorithm:

  • Get the file length
  • Copy each a 10th of the file into a byte buffer (binary copy should be fast)
  • Start a thread for processing each of these buffers
  • Each thread processes all lines in his area except the first and last one.
  • Each thread must return the first and last partitial line in its data when done,
  • the “last” of each thread must be recombined with the “first” one of the one working on the next file block because you may have cut through a line. And these tokens must then be processed afterwards.
Matthias Ronge
  • 9,403
  • 7
  • 47
  • 63
-2

Fork Join API introduced in 1.7 is a great fit for this use case. Check out http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html. If you search, you are going to find lots of examples out there.

helios
  • 2,603
  • 4
  • 21
  • 26