1

I'm trying to ingest an Avro file (produced with Spark 3.0) into BigTable using the Dataflow template [1], and get below error.

N.B. This file can be read in Spark and Python avro library without apparent issue.

Any idea ?

Thanks for your support !

Error (short)

Caused by: org.apache.avro.AvroTypeException: Found topLevelRecord, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

Avro schema (extract)

{"type":"record","name":"topLevelRecord","fields":[{"name":"a_a","type": ["string", "null"]}, ...]}

Error (full)

java.io.IOException: Failed to start reading from source: gs://myfolder/myfile.avro range [0, 15197631)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start (WorkerCustomSources.java:610)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start (ReadOperation.java:361)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:194)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start (ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute (MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork (BatchDataflowWorker.java:417)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork (BatchDataflowWorker.java:386)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork (BatchDataflowWorker.java:311)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork (DataflowBatchWorkerHarness.java:140)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call (DataflowBatchWorkerHarness.java:120)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call (DataflowBatchWorkerHarness.java:107)
at java.util.concurrent.FutureTask.run (FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:628)
at java.lang.Thread.run (Thread.java:834)
Caused by: org.apache.avro.AvroTypeException: Found topLevelRecord, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key
at org.apache.avro.io.ResolvingDecoder.doAction (ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance (Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder (ResolvingDecoder.java:130)
at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:215)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145)
at org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord (AvroSource.java:644)
at org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord (BlockBasedSource.java:210)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl (FileBasedSource.java:484)
at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl (FileBasedSource.java:479)
at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start (OffsetBasedSource.java:249)
at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start (WorkerCustomSources.java:607)

References:

[1] https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#avrofiletocloudbigtable

py-r
  • 419
  • 5
  • 15

1 Answers1

2

BigTable is a scalable NoSQL database service, which means is schema-free; whereas Spark SQL has a schema as you indicated on your question.

From the below error, it's referring you to BigTable row key

expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

Therefore, you would need to create your BigTable schema design by following this process.

Since HBase is also schema-free, your use case could be solved by using Bigtable and the HBase API, if you're flexible to use Spark 2.4.0

As for the above use case, it looks to be a valid feature request, which I would file to the product team and update you with the report number.

Ismail
  • 1,068
  • 1
  • 6
  • 11
  • Thanks @Ismail. So you suggest to either define a BigTable schema upfront that maps the Avro schema or to use the mentioned connector to write from Spark directly into BigTable ? – py-r Dec 22 '20 at 21:19
  • @py-r I would start with the connector – Ismail Dec 22 '20 at 21:25
  • Thanks. Trying my chance, but facing [new issues](https://stackoverflow.com/questions/65429730/spark-hbase-where-to-find-an-up-to-date-connector) – py-r Dec 23 '20 at 19:20