AIBOTNET--
Use this to combined ORC files of different schema into single ORC file. My schemas were:
- file 1: first:int,second:int
- file 2: first:int,fourth:string
- file 3: first:int,third:map
I can post the file generators if you need them as well.
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import com.google.common.collect.ImmutableList;
public class ReaderSample {
public static void main(String[] args) throws IOException {
Path testFilePath1 = new Path("file1.orc");
Path testFilePath2 = new Path("file2.orc");
Path testFilePath3 = new Path("file3.orc");
Path mergePath = new Path("merge.orc");
Configuration conf = new Configuration();
FileSystem fileSystem = mergePath.getFileSystem(conf);
fileSystem.delete(mergePath, false);
List<Path> fileList = ImmutableList.of(testFilePath1, testFilePath2, testFilePath3);
TypeDescription schema = mergeSchema(conf, fileList);
System.out.println(schema);
try (Writer writer = OrcFile.createWriter(mergePath, OrcFile.writerOptions(conf)
.setSchema(schema))) {
VectorizedRowBatch writerBatch = schema.createRowBatch();
for (Path file : fileList) {
merge(file, conf, writer, writerBatch, schema);
}
}
}
private static TypeDescription mergeSchema(Configuration conf, List<Path> fileList) throws IOException {
List<TypeDescription> schemaList = new ArrayList<>();
for (Path path : fileList) {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
schemaList.add(reader.getSchema());
}
TypeDescription masterSchema = new TypeDescription(TypeDescription.Category.STRUCT);
for (TypeDescription td : schemaList) {
List<String> fieldNames = td.getFieldNames();
for (int f = 0; f < fieldNames.size(); f++) {
String field = fieldNames.get(f);
List<String> mergeFields = masterSchema.getFieldNames();
int indexOf = mergeFields.indexOf(field);
if (indexOf < 0) {
// add
masterSchema.addField(field, td.getChildren()
.get(f));
} else {
// check type at some point...
}
}
}
return masterSchema;
}
private static void merge(Path testFilePath1, Configuration conf, Writer writer, VectorizedRowBatch writerBatch,
TypeDescription masterSchema) throws IOException {
Reader reader = OrcFile.createReader(testFilePath1, OrcFile.readerOptions(conf));
int[] mapping = createMapping(masterSchema, reader.getSchema());
try (RecordReader rows = reader.rows()) {
VectorizedRowBatch readerBatch = reader.getSchema()
.createRowBatch();
while (rows.nextBatch(readerBatch)) {
for (int r = 0; r < readerBatch.size; ++r) {
for (int c = 0; c < mapping.length; c++) {
int index = mapping[c];
if (index == -1) {
writerBatch.cols[c].isNull[writerBatch.size] = true;
writerBatch.cols[c].noNulls = false;
} else {
writerBatch.cols[c] = readerBatch.cols[index];
}
}
writerBatch.size++;
}
writer.addRowBatch(writerBatch);
writerBatch.reset();
}
}
}
private static int[] createMapping(TypeDescription masterSchema, TypeDescription currentSchema) {
List<String> masterFieldNames = masterSchema.getFieldNames();
List<String> fieldNames = currentSchema.getFieldNames();
int[] mappings = new int[masterFieldNames.size()];
for (int f = 0; f < masterFieldNames.size(); f++) {
String name = masterFieldNames.get(f);
int indexOf = fieldNames.indexOf(name);
if (indexOf < 0) {
mappings[f] = -1;
} else {
mappings[f] = indexOf;
}
}
return mappings;
}
}