-1

My Google dataflow pipeline is something like this:

pipeline
        // list of tables to backfill
        .apply("Backfill table list", Create.of("example_table"))
        // register the from and to IDs to backfill
        .apply("Start table IDs batch tracking", ParDo.of(new StartTableTracking()))
        // get ID ranges not copied yet
        .apply("Get IDs to copy", ParDo.of(new GetRangesToCopy()))
        // copy ID ranges
        .apply("Copy ID ranges", ParDo.of(new CopyIDRanges()))
        // stream insert rows
        .apply("Stream insert rows", ParDo.of(new StreamInsertRow()));

The CopyIDRanges step reads data from the source DB and outputs multiple Map<String, Object> elements, which are then Stream Inserted into Bigquery.

Map<String, Object> is actually the column name as key and the corresponding value to insert, and this can be String, Long, Arrays, etc.

I got an error asking to setCoder after CopyIDRanges step, but guess I cannot use a SerializableCoder for Object.

    .setCoder(
            KvCoder.of(
                StringUtf8Coder.of(), // Specify a coder for the String key
                MapCoder.of(
                        StringUtf8Coder.of(),
                        SerializableCoder.of(Object.class)
                ) // Specify a coder for Map<String, Object>
            )
    )

What is my best option here? I would prefer not to explicity declare a class for each table I will be copying, and would prefer this to be dynamic. Although I guess declaring a class would be a sure way of rectifying this.

Shahid Thaika
  • 2,133
  • 5
  • 23
  • 59

0 Answers0