0

Trying to read simple Parquet file into my Google DataFlow Pipeline

using the following code

Read.Bounded<KV<Void, GenericData>> results = HadoopFileSource.readFrom("/home/avi/tmp/db_demo/simple.parquet", AvroParquetInputFormat.class, Void.class, GenericData.class);

trigger always the following exception when running the pipeline

IllegalStateException: Cannot find coder for class org.apache.avro.generic.GenericData

seems like this method inside HadoopFileSource can't handle this type of class as for coder

  private <T> Coder<T> getDefaultCoder(Class<T> c) {
if (Writable.class.isAssignableFrom(c)) {
  Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
  return (Coder<T>) WritableCoder.of(writableClass);
} else if (Void.class.equals(c)) {
  return (Coder<T>) VoidCoder.of();
}
// TODO: how to use registered coders here?
throw new IllegalStateException("Cannot find coder for " + c);

}

any help will be appreciated

Avi

Avi P
  • 53
  • 4
  • Hello, are you building with maven? Would it be possible to provide a dump of your dependency tree? Here are some instructions: http://stackoverflow.com/questions/7953888/how-to-get-maven-dependencies-printed-to-a-file-in-a-readable-format – Alex Amato Jan 30 '17 at 18:37

1 Answers1

0

This is a problem with the design of HadoopFileSource. I would suggest moving to apache-beam or (scio) which is the apache "version" (and the "future") of dataflow sdk. Once you are on the beam, you can:

This is gonna be scala (but you can easily translate to java):

HDFSFileSource.from(
  input,
  classOf[AvroParquetInputFormat[AvroSchemaClass]],
  AvroCoder.of(classOf[AvroSchemaClass]),
  new SerializableFunction[KV[Void, AvroSchemaClass], AvroSchemaClass]() {
    override def apply(e: KV[Void, AvroSchemaClass]): AvroSchemaClass =
      CoderUtils.clone(AvroCoder.of(classOf[AvroSchemaClass]), e.getValue)
  }
)

which is a alternative version of from that accepts coder.

rav
  • 3,579
  • 1
  • 18
  • 18