1

We are saving our processing output to HDFC in ORC format. Now we have files with more than one schema and i want to read all files and create Dataset.

One option is that i will write some job and convert all these files to single schema which i want to avoid bcz data is too big and it is one time solution if after some day schema will again change i have to regenerate all data

My question is that is there any mechanism so that i can read these files let say i will supply higher schema while reading this and reader will automatically assign null value if some field is not present in any orc file.

aibotnet
  • 1,326
  • 1
  • 13
  • 27
  • Have you tried something like this? https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar – FurryMachine Aug 23 '17 at 09:28
  • No but my problem is different – aibotnet Aug 23 '17 at 09:58
  • 1
    In which way is it different? If I understand what you want to do correctly, the answer you seek is that, yes, you can read an ORC reader with a schema that has more columns that the ORC file you are reading, but only if the additional columns are added at the end and you don't have any column reordering. If you need schema evolution with column reordering, then you should probably aim for Parquet or Avro. – FurryMachine Aug 23 '17 at 14:58
  • I will support FurryMachine. If you need schema evolution/merging, you need to stay away from orc. This is the main reason we switched to parquet. – Michel Lemay Aug 24 '17 at 03:54
  • yes true but current i want fix of orc bcz data is too much' – aibotnet Aug 24 '17 at 17:25

2 Answers2

1

I had a similar problem. The ORC schema merge is an open feature request, we also switched to parquet like other users in your comments.

It is still possible (not recommented because it's very slow) to load file by file and save it to .parquet and then load all .parquet files with automatic schema merge and save that big in-memory to .orc

Fabian
  • 3,139
  • 2
  • 23
  • 49
0

AIBOTNET--

Use this to combined ORC files of different schema into single ORC file. My schemas were:

  1. file 1: first:int,second:int
  2. file 2: first:int,fourth:string
  3. file 3: first:int,third:map

I can post the file generators if you need them as well.

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.google.common.collect.ImmutableList;

public class ReaderSample {

  public static void main(String[] args) throws IOException {

    Path testFilePath1 = new Path("file1.orc");
    Path testFilePath2 = new Path("file2.orc");
    Path testFilePath3 = new Path("file3.orc");
    Path mergePath = new Path("merge.orc");
    Configuration conf = new Configuration();

    FileSystem fileSystem = mergePath.getFileSystem(conf);
    fileSystem.delete(mergePath, false);

    List<Path> fileList = ImmutableList.of(testFilePath1, testFilePath2, testFilePath3);

    TypeDescription schema = mergeSchema(conf, fileList);
    System.out.println(schema);
    try (Writer writer = OrcFile.createWriter(mergePath, OrcFile.writerOptions(conf)
                                                                .setSchema(schema))) {
      VectorizedRowBatch writerBatch = schema.createRowBatch();
      for (Path file : fileList) {
        merge(file, conf, writer, writerBatch, schema);
      }
    }
  }

  private static TypeDescription mergeSchema(Configuration conf, List<Path> fileList) throws IOException {
    List<TypeDescription> schemaList = new ArrayList<>();
    for (Path path : fileList) {
      Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
      schemaList.add(reader.getSchema());
    }

    TypeDescription masterSchema = new TypeDescription(TypeDescription.Category.STRUCT);
    for (TypeDescription td : schemaList) {
      List<String> fieldNames = td.getFieldNames();
      for (int f = 0; f < fieldNames.size(); f++) {
        String field = fieldNames.get(f);
        List<String> mergeFields = masterSchema.getFieldNames();
        int indexOf = mergeFields.indexOf(field);
        if (indexOf < 0) {
          // add
          masterSchema.addField(field, td.getChildren()
                                         .get(f));
        } else {
          // check type at some point...
        }
      }
    }
    return masterSchema;

  }

  private static void merge(Path testFilePath1, Configuration conf, Writer writer, VectorizedRowBatch writerBatch,
      TypeDescription masterSchema) throws IOException {
    Reader reader = OrcFile.createReader(testFilePath1, OrcFile.readerOptions(conf));

    int[] mapping = createMapping(masterSchema, reader.getSchema());

    try (RecordReader rows = reader.rows()) {
      VectorizedRowBatch readerBatch = reader.getSchema()
                                             .createRowBatch();
      while (rows.nextBatch(readerBatch)) {
        for (int r = 0; r < readerBatch.size; ++r) {
          for (int c = 0; c < mapping.length; c++) {
            int index = mapping[c];
            if (index == -1) {
              writerBatch.cols[c].isNull[writerBatch.size] = true;
              writerBatch.cols[c].noNulls = false;
            } else {
              writerBatch.cols[c] = readerBatch.cols[index];
            }
          }
          writerBatch.size++;
        }
        writer.addRowBatch(writerBatch);
        writerBatch.reset();
      }
    }
  }

  private static int[] createMapping(TypeDescription masterSchema, TypeDescription currentSchema) {
    List<String> masterFieldNames = masterSchema.getFieldNames();
    List<String> fieldNames = currentSchema.getFieldNames();
    int[] mappings = new int[masterFieldNames.size()];
    for (int f = 0; f < masterFieldNames.size(); f++) {
      String name = masterFieldNames.get(f);
      int indexOf = fieldNames.indexOf(name);
      if (indexOf < 0) {
        mappings[f] = -1;
      } else {
        mappings[f] = indexOf;
      }
    }
    return mappings;
  }

}
Chris C
  • 1,012
  • 2
  • 12
  • 19