In this example files reader the solution focuses on just reading any file any file and loading it into the memory.
I've been working on it to improve it so it processes a csv file with keeping the header in each thread, so each thread can output a separate and a correctly-formatted csv file.
Unfortunately I'm not able to do so since it reads from random locations (lines), this means it might read from the middle of the line and I'll get lines mixed up.
Is there a way to utilize this code and make is csv specific?
Here is the code I changed:
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
CSVReader reader = new CSVReader(new FileReader("file.csv"));
String[] columnsNames = reader.readNext();
reader.close();
FileInputStream fileInputStream = new FileInputStream("file.csv");
FileChannel channel = fileInputStream.getChannel();
long remaining_size = channel.size(); //get the total number of bytes in the file
long chunk_size = remaining_size / Integer.parseInt("4"); //file_size/threads
//Max allocation size allowed is ~2GB
if (chunk_size > (Integer.MAX_VALUE - 5))
{
chunk_size = (Integer.MAX_VALUE - 5);
}
//thread pool
ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt("4"));
long start_loc = 0;//file pointer
int i = 0; //loop counter
while (remaining_size >= chunk_size)
{
//launches a new thread
executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i, String.join(",", columnsNames)));
remaining_size = remaining_size - chunk_size;
start_loc = start_loc + chunk_size;
i++;
}
//load the last remaining piece
executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i, String.join(",", columnsNames)));
//Tear Down
executor.shutdown();
//Wait for all threads to finish
while (!executor.isTerminated())
{
//wait for infinity time
}
System.out.println("Finished all threads");
fileInputStream.close();
long finish = System.currentTimeMillis();
System.out.println( "Time elapsed: " + (finish - start) );
}
class FileRead implements Runnable {
private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;
String _columns;
public FileRead(long loc, int size, FileChannel chnl, int sequence, String header) {
_startLocation = loc;
_size = size;
_channel = chnl;
_sequence_number = sequence;
_columns = header;
}
@Override
public void run() {
try {
System.out.println( "Reading the channel: " + _startLocation + ":" + _size );
//allocate memory
ByteBuffer buff = ByteBuffer.allocate( _size );
//Read file chunk to RAM
_channel.read( buff, _startLocation );
//chunk to String
String string_chunk = new String( buff.array(), Charset.forName( "UTF-8" ) );
string_chunk = _columns + System.getProperty( "line.separator" ) + string_chunk;
if (string_chunk.length() > 0) {
BufferedWriter out = new BufferedWriter( new FileWriter( "output_" + System.currentTimeMillis() + ".csv" ) );
try {
out.write( string_chunk ); //Replace with the string
//you are trying to write
} catch (IOException e) {
System.out.println( "Exception " );
} finally {
out.close();
}
}
System.out.println( "Done Reading the channel: " + _startLocation + ":" + _size );
} catch (Exception e) {
e.printStackTrace();
}
}
}