0

Hello encountered a problem. I will try to describe it in General terms. I need to read data FROM the file as quickly as possible .csv and process them. There are no problems with processing. But how to make multithreading in reading. I see several options. The order of data is not important.

  1. Split the file into several parts and read it at the same time.
  2. I've heard that you can make a BufferReader and sync it, but I couldn't find a working example.

My code

Instant start = Instant.now();
int i = 0;
try (BufferedReader reader = new BufferedReader(new FileReader("lng.csv"))) {
    String line = reader.readLine();
    while (line != null && i < 150000) {
        System.out.println(i + ") " + line);
        // read next line
        line = reader.readLine();

        //my data processing
        if (verifyLine(line)) {
            groupAdder(line);
        }
        else{
            System.out.println("Wrong line: "+ line);
        }
        i++;               
    }
} catch (IOException e) {
    e.printStackTrace();
}

Instant end = Instant.now();
System.out.println(Duration.between(start, end));

I will be happy with your solutions to this problem. I will also be very happy to look at the code examples

WBLord
  • 874
  • 6
  • 29
  • You should explore java.util.concurrent.RecursiveTask to implement your fork/join call to read file in chunks, this will definitely improve your performance and solve your problem. – Ravi Sapariya Jul 27 '20 at 04:27
  • 1
    [This question](https://stackoverflow.com/questions/18971951/multithreading-to-read-a-file-in-java) seems to be related. You could also [check this one](https://stackoverflow.com/questions/40412008/how-to-read-a-file-using-multiple-threads-in-java-when-a-high-throughput3gb-s/40432357#40432357). In both of these, the answers advise against trying to read a file with multiple threads as it is counter-productive in most cases. What you could do however is to process the lines in parallel after they were read from the file by a single thread. I can write a solution for that if you want. – Patrick Jul 27 '20 at 06:38
  • @Patrick It would help me. If you don't mind writing it – WBLord Jul 27 '20 at 13:08
  • Why? You can read millions of lines per second with `BufferedReader`. That's already more than fast enough. In any case the disk isn't multi-threaded, so there is no advantage in what you propose. Your bottleneck lies elsewhere. – user207421 Jul 28 '20 at 02:13
  • Have you profiled the task to find where it is slow? Premature optimization is the root of all evil. – NomadMaker Jul 28 '20 at 02:36

1 Answers1

1

General idea

One thread reads the entire file and submits "processing tasks" to a thread pool. Each task submitted to the thread pool is processed independently and in parallel.

A possible implementation

Class LineProcessingTask is in charge of processing one line.

public class LineProcessingTask implements Runnable {

    public static boolean verifyLine(String line) {
        return false; // Use your implementation
    }
    
    public static void groupAdder(String s) { 
        //Use your implementation
    }
    
    String s;
    
    public LineProcessingTask(String line) {
        s = line;
    }
    
    @Override
    public void run() {
        if (verifyLine(s)) {
            groupAdder(s);
        }
    }
}

Main method:

public static void main(String [] args) {
    // Create an executor service to which tasks will be submitted
    final int PARALLELISM = 4;
    ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    
    // Submit each line as a processing task
    int i = 0;
    try (BufferedReader reader = new BufferedReader(new FileReader("lng.csv"))) {
        String line = reader.readLine();
        while (line != null && i < 150000) {
            System.out.println(i + ") " + line);
            pool.execute(new LineProcessingTask(line));
            line = reader.readLine(); //For the next iteration
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
        
    // Wait for all the tasks to be finished
    try {
        pool.awaitTermination(60l, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
        System.err.println("All tasks did not complete in the allocated time");
        return;
    }
    
    //Rest of your program (after line processing)
}

Nota Bene

Depending on how computation-heavy your processing is, you may not experience significant speedups. If individual tasks are very small, they may get completed so fast that the thread that reads the file cannot submit tasks fast enough to keep all threads in the ExecutorService busy. It all depends on where your bottleneck is in the first place: was it reading the file from disk or processing the read data?

You need to make sure that what you do in method groupAdder(String) can be done by multiple threads concurrently. Be wary not to create any bottlenecks in that method.

Patrick
  • 1,458
  • 13
  • 27