1

In this example files reader the solution focuses on just reading any file any file and loading it into the memory.

I've been working on it to improve it so it processes a csv file with keeping the header in each thread, so each thread can output a separate and a correctly-formatted csv file.

Unfortunately I'm not able to do so since it reads from random locations (lines), this means it might read from the middle of the line and I'll get lines mixed up.

Is there a way to utilize this code and make is csv specific?

Here is the code I changed:

public static void main(String[] args) throws IOException {
        long start = System.currentTimeMillis();
        CSVReader reader = new CSVReader(new FileReader("file.csv"));
        String[] columnsNames = reader.readNext();
        reader.close();
        FileInputStream fileInputStream = new FileInputStream("file.csv");
        FileChannel channel = fileInputStream.getChannel();
        long remaining_size = channel.size(); //get the total number of bytes in the file
        long chunk_size = remaining_size / Integer.parseInt("4"); //file_size/threads

        //Max allocation size allowed is ~2GB
        if (chunk_size > (Integer.MAX_VALUE - 5))
        {
            chunk_size = (Integer.MAX_VALUE - 5);
        }

        //thread pool
        ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt("4"));

        long start_loc = 0;//file pointer
        int i = 0; //loop counter
        while (remaining_size >= chunk_size)
        {
            //launches a new thread
            executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i, String.join(",", columnsNames)));
            remaining_size = remaining_size - chunk_size;
            start_loc = start_loc + chunk_size;
            i++;
        }

        //load the last remaining piece
        executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i, String.join(",", columnsNames)));

        //Tear Down
        executor.shutdown();

        //Wait for all threads to finish
        while (!executor.isTerminated())
        {
            //wait for infinity time
        }
        System.out.println("Finished all threads");
        fileInputStream.close();


        long finish = System.currentTimeMillis();
        System.out.println( "Time elapsed: " + (finish - start) );
    }



class FileRead implements Runnable {

    private FileChannel _channel;
    private long _startLocation;
    private int _size;
    int _sequence_number;
    String _columns;

    public FileRead(long loc, int size, FileChannel chnl, int sequence, String header) {
        _startLocation = loc;
        _size = size;
        _channel = chnl;
        _sequence_number = sequence;
        _columns = header;
    }

    @Override
    public void run() {
        try {
            System.out.println( "Reading the channel: " + _startLocation + ":" + _size );

            //allocate memory
            ByteBuffer buff = ByteBuffer.allocate( _size );

            //Read file chunk to RAM
            _channel.read( buff, _startLocation );

            //chunk to String
            String string_chunk = new String( buff.array(), Charset.forName( "UTF-8" ) );

            string_chunk = _columns + System.getProperty( "line.separator" ) + string_chunk;
            if (string_chunk.length() > 0) {
                BufferedWriter out = new BufferedWriter( new FileWriter( "output_" + System.currentTimeMillis() + ".csv" ) );

                try {

                    out.write( string_chunk );  //Replace with the string
                    //you are trying to write
                } catch (IOException e) {
                    System.out.println( "Exception " );

                } finally {
                    out.close();
                }
            }
            System.out.println( "Done Reading the channel: " + _startLocation + ":" + _size );

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
basel.ai
  • 157
  • 2
  • 15
  • I would suggest you to look at following thread (if you haven't seen) https://stackoverflow.com/questions/11206130/a-java-theading-program-which-reads-lines-of-a-huge-csv – i.bondarenko Aug 29 '19 at 12:42

0 Answers0