I have to read an AVRO file from Cloud Storage and then write the record into a big table with a row key and the AVRO as bytes in a column cell .. I am using AVROIO.read to read the data as GenericRecord .. How do i apply a pardo function to transform the data to something that can be written into bigtable
// Read AVRO from GCS
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
//.apply - pardo transformation
.apply("Write to Bigtable", write);
Any help on the second step in the pipeline would be really appreciated
Update:
Thanks Anton for the quick help , i now understand what i have to do and came up with the below for pardo
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
.apply(ParDo.of(new DoFn<GenericRecord, Iterable<Mutation> >() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord gen = c.element();
byte[] fieldNameByte = null;
byte[] fieldValueByte = null;
// ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
for (Schema.Field field : fields) {
try {
String fieldName = field.name();
fieldNameByte = fieldName.getBytes("UTF-8");
String value = String.valueOf(gen.get(fieldName));
fieldValueByte = value.getBytes("UTF-8");
} catch (Exception e) {
e.printStackTrace();
}
Iterable<Mutation> mutations =
ImmutableList.of(
Mutation.newBuilder()
.setSetCell(
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(fieldValueByte))
.setFamilyName(COLUMN_FAMILY_NAME))
.build());
c.output(,mutations));
}
}
}))
.apply("Write to Bigtable", write);
return pipeline.run();
This is just a pseudo code and i am just learning and trying out .. I need help on adding the mutations to the ProcessContext and do a write .. Please take a look and let me know if i am in the right direction and how do i add the mutation to the context