1

I'd like to import bigquery data to bigtable using Google Composer.

Exporting bigquery rows in Avro format to GCS was successful. However, import Avro data to Bigtable was not.

The error says

Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

I guess the schema between bigquery and bigtable should match each other. But I have no idea how to do this.

Eric Lee
  • 700
  • 2
  • 9
  • 30
  • Does this answer your question? [how to export bigquery to bigtable using airflow? schema issue](https://stackoverflow.com/questions/68663873/how-to-export-bigquery-to-bigtable-using-airflow-schema-issue) – Krish Aug 09 '21 at 13:29

2 Answers2

2

For every record read from the Avro files:

  • Attributes present in the files and in the table are loaded into the table.
  • Attributes present in the file but not in the table are subject to ignore_unknown_fields,
  • Attributes that exist in the table but not in the file will use their default value, if there is one set.

The below links are helpful.

[1] https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#cloud-storage-avro-to-bigtable [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/resources/schema/avro/bigtable.avsc

[3] Avro to BigTable - Schema issue?

Lakshmi
  • 197
  • 3
  • I still don't get it how it works. But I will try the docs you mentioned carefully this time. – Eric Lee Aug 11 '21 at 06:01
  • Yes. I eventually resolved the problem with a lot of testing to match schema. Thanks!! – Eric Lee Sep 01 '21 at 02:04
  • Hi @EricLee do you mind sharing the working example schema in python. I've encountered the same problem and have tried different types of schema but nothing works so far. Thanks! – mitbal Aug 09 '22 at 07:58
  • 1
    Hi @mitbal All I did was to reference the 2nd link Lakshmi had shared to create my own. The python example is just like a Dictionary with some nested fields in List like schema = {"type": "record", "name": "my_name", "namespace": "default", "fields": [{"name": "field_name", "type": ["null", "string"], "default": None}]} – Eric Lee Aug 11 '22 at 07:25
1

For those of you who still have problem like me because they are not familiar with avro, here is one working schema transformation that I found after some tinkering.

For example, if you have table from bigquery like this example table from bigquery

And you want to use user_id as the bigtable row_key and ingest all columns, here is the example code to encode them as avro file.

from avro.schema import Parse
from avro.io import DatumWriter
from avro.datafile import DataFileWriter

bigtable_schema = {
    "name" : "BigtableRow",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "key", "type" : "bytes"},
      { "name" : "cells",
        "type" : {
          "type" : "array",
          "items": {
            "name": "BigtableCell",
            "type": "record",
            "fields": [
              { "name" : "family", "type" : "string"},
              { "name" : "qualifier", "type" : "bytes"},
              { "name" : "timestamp", "type" : "long", "logicalType" : "timestamp-micros"},
              { "name" : "value", "type" : "bytes"}
            ]
          }
        }
      }
   ]
}

parsed_schema = Parse(json.dumps(bigtable_schema))

row_key = 'user_id'
family_name = 'feature_name'
feature_list = ['channel', 'zip_code', 'history']

with open('features.avro', 'wb') as f:

    writer = DataFileWriter(f, DatumWriter(), parsed_schema)

    for item in df.iterrows():

        row = item[1]
        ts =  int(datetime.now().timestamp()) * 1000 * 1000

        for feat in feature_list:

            writer.append({
                "key": row[row_key].encode('utf-8'),
                "cells": [{"family": family_name, 
                           "qualifier": feat.encode('utf-8'), 
                           "timestamp": ts, 
                           "value": str(row[feat]).encode('utf-8')}]
            })
        
    writer.close()

Then you can use dataflow template job to run the ingestion.

Complete code can be found here: https://github.com/mitbal/sidu/blob/master/bigquery_to_bigtable.ipynb

mitbal
  • 326
  • 6
  • 14