1

I have to read a huge file contains text, around 3GB (and 40 Million lines). Just reading it happens really fast:

try (BufferedReader br = new BufferedReader(new FileReader("file.txt"))) {
  while ((line = br.readLine()) != null) {
    //Nothing here
  }
}

With each read line from above code i do some parsing on the string and process it further.(a huge task). I try to do that multiple threads.

A) I have tried BlockingQueue like this

try (BufferedReader br = new BufferedReader(new FileReader("file.txt"))) {
            String line;
            BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
            int numThreads = 5;
            Consumer[] consumer = new Consumer[numThreads];
            for (int i = 0; i < consumer.length; i++) {
                consumer[i] = new Consumer(queue);
                consumer[i].start();
            }
            while ((line = br.readLine()) != null) {
                queue.put(line);
            }
            queue.put("exit");
        } catch (FileNotFoundException ex) {
            Logger.getLogger(ReadFileTest.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException | InterruptedException ex) {
            Logger.getLogger(ReadFileTest.class.getName()).log(Level.SEVERE, null, ex);
        }

class Consumer extends Thread {
        private final BlockingQueue<String> queue;
        Consumer(BlockingQueue q) {
            queue = q;
        }
        public void run() {
            while (true) {
                try {
                    String result = queue.take();
                    if (result.equals("exit")) {
                        queue.put("exit");
                        break;
                    }
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    Logger.getLogger(ReadFileTest.class.getName()).log(Level.SEVERE, null, ex);
                }
            }

        }
    }

This approach took more time than normal single threaded processing. I am not sure why - what am I doing wrong?

B) I have tried ExecutorService:

 try (BufferedReader br = new BufferedReader(new FileReader("file.txt"))) {
            String line; 
            ExecutorService pool = Executors.newFixedThreadPool(10);         
             while ((line = br.readLine()) != null) {
                 pool.execute(getRunnable(line));
            }
             pool.shutdown();
         } catch (FileNotFoundException ex) {
            Logger.getLogger(ReadFileTest.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(ReadFileTest.class.getName()).log(Level.SEVERE, null, ex);
        }

  private static Runnable getRunnable(String run){
        Runnable task = () -> {
            System.out.println(run);
        };
        return task;
    }

This approach also takes more time than printing directly inside while loop. What am I doing wrong?

What is the correct way to do it?

How can I efficiently process the read line with multiple threads?

GhostCat
  • 137,827
  • 25
  • 176
  • 248
user3164187
  • 1,382
  • 3
  • 19
  • 50
  • This is just my guess: Your bottleneck is the reading itself, not the processing. So Processing the lines that are already read on a different Thread will not speedup. A quick search gave me this results: https://stackoverflow.com/questions/25711616/how-to-read-all-lines-of-a-file-in-parallel-in-java-8 and from accepted response: https://stackoverflow.com/questions/9093888/fastest-way-of-reading-relatively-huge-byte-files-in-java – Chrisport Jun 29 '17 at 09:05
  • Your second proposal seems quite idiomatic. I would recommend to parallelize larger chunks of data rather than single lines, e.g., 1 MB per chunk. – Sebastian Kruse Jun 29 '17 at 09:05
  • 2
    Your examples are not really suited to test multithreading. For example, in case A, your all threads are getting blocked by the 'queue' and then system i/o. Try with some real code and may see difference. – mesibo Jun 29 '17 at 09:06
  • 2
    You said that the parsing stuff of the read data will be a "huge task". Just add some waiting time in your Consumers and after each read of line in your original one-thread-piece-of-code and check if the multi threaded version performs better then. – Florian S. Jun 29 '17 at 09:09
  • By the way, using `System.out.println()` to test multithreading is an extra bad idea, as it is synchronized. So your code will effectively be single-threaded, but with all the overhead of multithreading added on. – biziclop Jun 29 '17 at 10:20

3 Answers3

3

Answering one part here - why is the BlockingQueue option slower.

It is important to understand that threads don't come for "free". There is always certain overhead required to get them up and "manage" them.

And of course, when you are actually using more threads than your hardware can support "natively" then context switching is added to the bill.

Beyond that, also the BlockingQueue doesn't come free either. You see, in order to preserve order, that ArrayBlockingQueue probably has to synchronize somewhere. Worst case, that means locking and waiting. Yes, the JVM and JIT are usually pretty good about such things; but again, a certain "percentage" gets added to the bill.

But just for the record, that shouldn't matter. From the javadoc:

This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

As you are not setting "fairness"

BlockingQueue queue = new ArrayBlockingQueue<>(100);

that shouldn't affect you. On the other hand: I am pretty sure you expected those lines to be processed in order, so you would actually want to go for

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100, true);

and thereby further slowing down the whole thing.

Finally: I agree with the comments given so far. Benchmarking such things is a complex undertaking; and many aspects influence the results. The most important question is definitely: where is your bottle neck?! Is it IO performance (then more threads don't help much!) - or is it really overall processing time (and then the "correct" number of threads for processing should definitely speed up things).

And regarding "how to do this in the correct way" - I suggest to check out this question on softwareengineering.SE.

GhostCat
  • 137,827
  • 25
  • 176
  • 248
  • A good starting point for benchmarking code like this is to simply monitor the size of your queue. If the queue is almost always empty, it means the bottleneck is on the producer side (i.e. your I/O operation is so slow anyway that you don't win anything by multithreading the processing part). – biziclop Jun 29 '17 at 10:19
2

How to process contents from large text file using multiple threads?

If your computer has enough RAM, I would do the following:

  • read the entire file into a variable (an ArrayList for example) - using only one thread to read the whole file.

  • then launch one ExecutorService (with a thread pool that uses no more than the maximum number of cores that your computer can run simultaneously)

        int cores = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(cores);
    
  • finally, divide the lines read, among a limited number of callables/runnables and submit those callables/runnables to your ExecutorService (so all of them can execute simultaneously in your ExecutorService).

unless your processing of lines uses I/O, I assume that you will reach near 100% CPU utilization, and none of your threads will be in waiting state.

do you want even faster processing?

scale vertically is the easiest option: buy even more RAM, better CPU (with more cores), use a Solid State Drive

Jose Zevallos
  • 685
  • 4
  • 3
1

May be all thread accessing same shared resource concurrently so result more contentious. One thing you can try reader thread put all line in single key do submit in partition way so it will less contentious.

public void execute(Runnable command) {

    final int key= command.getKey();
     //Some code to check if it is runing
    final int index = key != Integer.MIN_VALUE ? Math.abs(key) % size : 0;
    workers[index].execute(command);
}

Create worker with queue so that if you want some task required sequentially then run.

private final AtomicBoolean scheduled = new AtomicBoolean(false);

private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(maximumQueueSize);

public void execute(Runnable command) {
    long timeout = 0;
    TimeUnit timeUnit = TimeUnit.SECONDS;
    if (command instanceof TimeoutRunnable) {
        TimeoutRunnable timeoutRunnable = ((TimeoutRunnable) command);
        timeout = timeoutRunnable.getTimeout();
        timeUnit = timeoutRunnable.getTimeUnit();
    }

    boolean offered;
    try {
        if (timeout == 0) {
            offered = workQueue.offer(command);
        } else {
            offered = workQueue.offer(command, timeout, timeUnit);
        }
    } catch (InterruptedException e) {
        throw new RejectedExecutionException("Thread is interrupted while offering work");
    }

    if (!offered) {
        throw new RejectedExecutionException("Worker queue is full!");
    }

    schedule();
}

private void schedule() {
    //if it is already scheduled, we don't need to schedule it again.
    if (scheduled.get()) {
        return;
    }

    if (!workQueue.isEmpty() && scheduled.compareAndSet(false, true)) {
        try {
            executor.execute(this);
        } catch (RejectedExecutionException e) {
            scheduled.set(false);
            throw e;
        }
    }
}

public void run() {
    try {
        Runnable r;
        do {
            r = workQueue.poll();
            if (r != null) {
                r.run();
            }
        }
        while (r != null);
    } finally {
        scheduled.set(false);
        schedule();
    }
}

As suggested above there is no fixed rule for thread pool size .But there is some suggestion or best practice available can be used depending upon your use case.

CPU Bound Tasks

For CPU bound tasks, Goetz (2002, 2006) recommends

threads = number of CPUs + 1

IO Bound Tasks

Working out the optimal number for IO bound tasks is less obvious. During an IO bound task, a CPU will be left idle (waiting or blocking). This idle time can be better used in initiating another IO bound request.
gati sahu
  • 2,576
  • 2
  • 10
  • 16