I am working on a utility which reads multiple parquet files at a time and writing them into one single output file.
the implementation is very straightforward. This utility reads parquet files from the directory, reads Group
from all the file and put them into a list .Then uses ParquetWrite to write all these Groups into a single file.
After reading 600mb it throws Out of memory error for Java heap space. It also takes 15-20 minutes to read and write 500mb of data.
Is there a way to make this operation more efficient?
Read method looks like this:
ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
ParquetMetadata readFooter = reader.getFooter();
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
reader.close();
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
long rows = pages.getRowCount();
System.out.println("Number of rows: " + pages.getRowCount());
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
Group g = (Group) recordReader.read();
//printGroup(g);
groups.add(g);
}
}
} finally {
System.out.println("close the reader");
r.close();
}
Write method is like this:
for(Path file : files){
groups.addAll(readData(file));
}
System.out.println("Number of groups from the parquet files "+groups.size());
Configuration configuration = new Configuration();
Map<String, String> meta = new HashMap<String, String>();
meta.put("startkey", "1");
meta.put("endkey", "2");
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path(outputFile),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
2147483647,
268435456,
134217728,
true,
false,
ParquetProperties.WriterVersion.PARQUET_2_0,
configuration);
System.out.println("Number of groups to write:"+groups.size());
for(Group g : groups) {
writer.write(g);
}
writer.close();