My Google dataflow pipeline is something like this:
pipeline
// list of tables to backfill
.apply("Backfill table list", Create.of("example_table"))
// register the from and to IDs to backfill
.apply("Start table IDs batch tracking", ParDo.of(new StartTableTracking()))
// get ID ranges not copied yet
.apply("Get IDs to copy", ParDo.of(new GetRangesToCopy()))
// copy ID ranges
.apply("Copy ID ranges", ParDo.of(new CopyIDRanges()))
// stream insert rows
.apply("Stream insert rows", ParDo.of(new StreamInsertRow()));
The CopyIDRanges step reads data from the source DB and outputs multiple Map<String, Object>
elements, which are then Stream Inserted into Bigquery.
Map<String, Object> is actually the column name as key and the corresponding value to insert, and this can be String, Long, Arrays, etc.
I got an error asking to setCoder
after CopyIDRanges step, but guess I cannot use a SerializableCoder for Object.
.setCoder(
KvCoder.of(
StringUtf8Coder.of(), // Specify a coder for the String key
MapCoder.of(
StringUtf8Coder.of(),
SerializableCoder.of(Object.class)
) // Specify a coder for Map<String, Object>
)
)
What is my best option here? I would prefer not to explicity declare a class for each table I will be copying, and would prefer this to be dynamic. Although I guess declaring a class would be a sure way of rectifying this.