1

I have to read an AVRO file from Cloud Storage and then write the record into a big table with a row key and the AVRO as bytes in a column cell .. I am using AVROIO.read to read the data as GenericRecord .. How do i apply a pardo function to transform the data to something that can be written into bigtable

// Read AVRO from GCS

pipeline
  .apply("Read from Avro",
    AvroIO
       .readGenericRecords(schema)
       .from(options.getInputFilePattern()))

//.apply - pardo transformation 

.apply("Write to Bigtable", write);

Any help on the second step in the pipeline would be really appreciated

Update:

Thanks Anton for the quick help , i now understand what i have to do and came up with the below for pardo

 pipeline
   .apply("Read from Avro",
               AvroIO
                 .readGenericRecords(schema)
                 .from(options.getInputFilePattern()))
   .apply(ParDo.of(new DoFn<GenericRecord,  Iterable<Mutation> >() {
       @ProcessElement
       public void processElement(ProcessContext c) {
            GenericRecord gen = c.element();
            byte[] fieldNameByte = null;
            byte[] fieldValueByte = null;

            // ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
            for (Schema.Field field : fields) {

                try {
                   String fieldName = field.name();
                   fieldNameByte = fieldName.getBytes("UTF-8");
                   String value = String.valueOf(gen.get(fieldName));
                   fieldValueByte = value.getBytes("UTF-8");
                } catch (Exception e) {
                   e.printStackTrace();
                }

                Iterable<Mutation> mutations =
                  ImmutableList.of(
                     Mutation.newBuilder()
                         .setSetCell(
                           Mutation.SetCell.newBuilder()
                              .setValue(
                                   ByteString.copyFrom(fieldValueByte))
                               .setFamilyName(COLUMN_FAMILY_NAME))
                         .build());
                c.output(,mutations));
              }
          }
       }))
   .apply("Write to Bigtable", write);
 return pipeline.run();

This is just a pseudo code and i am just learning and trying out .. I need help on adding the mutations to the ProcessContext and do a write .. Please take a look and let me know if i am in the right direction and how do i add the mutation to the context

Anton
  • 2,431
  • 10
  • 20
Joe
  • 33
  • 8

1 Answers1

1

Something along these lines:

Pipeline p = Pipeline.create(options);
p.apply(GenerateSequence.from(0).to(numRows))
 .apply(
     ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
             int index = c.element().intValue();

             Iterable<Mutation> mutations =
                ImmutableList.of(
                   Mutation.newBuilder()
                           .setSetCell(Mutation.SetCell.newBuilder()
                           .setValue(testData.get(index).getValue())
                           .setFamilyName(COLUMN_FAMILY_NAME))
                           .build());
             c.output(KV.of(testData.get(index).getKey(), mutations));
         }
     }))
 .apply(
    BigtableIO
      .write()
      .withBigtableOptions(bigtableOptions)
      .withTableId(tableId));

Copied from Bigtable integration test.

Also here is Beam doc on ParDo in general, and here's javadoc for BigtableIO, it has some explanation.

Anton
  • 2,431
  • 10
  • 20
  • Thanks Anton .. I looked at the example and i am still confused on how do i iterate over the AVRO Generic Record and convert the values as mutations which can be then written to BigTable. public void processElement(ProcessContext ctx) { GenericRecord genericRecord = ctx.element(); Schema schema = new Schema.Parser().parse(schemaJson); I need some help in understanding the conversion of genericRecord to mutations(extracting the bytes from avro record) which can be inserted to BigTable columns – Joe Dec 12 '18 at 14:53
  • I am not sure I fully understand. To get values from generic record you use `genericRecord.get("field_name")` which gives you an object. Then you have to convert it to byte strings depending on what you store in BigTable. And that part is your business logic, you yourself determine how you want your objects to be serialized. You can try using helper classes other people use if they fit your use case, e.g.: https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java#L154 – Anton Dec 12 '18 at 17:07
  • If you have questions about how to serialize objects in general, you should read on that topic, e.g. here's an example of how to convert an object to a byte array: https://stackoverflow.com/questions/2836646/java-serializable-object-to-byte-array (this is an example, you choose how your object is represented) . You then can use `ByteStrings.copyFrom(byteArray)` if you need it. – Anton Dec 12 '18 at 17:12
  • Anton , I edited my original question with update .. Please take a look and provide feedback – Joe Dec 13 '18 at 02:19
  • Your approach makes sense in general. Next step depends on how your BigTable is supposed to look like. One thing to note though is, in your example, for every field of your input Avro object you emit a separate cell mutation - `c.output(,mutations)`, not sure whether it is what you intend. Another thing is that you need to emit a key-value pair from your `ParDo`, a byte-string key, and a collection of mutations value, so you must pick a key (e.g. one of the fields of your input Avro object), and construct a `KV` similar to my example. – Anton Dec 13 '18 at 04:09
  • And you probably might want to emit a single `KV` for you whole Avro input, e.g. instead of emitting a `KV` with mutation for each field, collect the mutations for all fields, and then emit in a single `KV`. This may be not what you intend though, depends on your business logic. – Anton Dec 13 '18 at 04:11
  • Thanks Anton .. The Single KV with all mutations , is that bytestring key the rowkey of the bigtable ? – Joe Dec 13 '18 at 14:57