1

I'm trying to convert a tablerow containing multiple values to a KV. I can achieve this in a DoFn but that adds more complexity to the code that I want to write further and makes my job harder. (Basically I need to perform CoGroupBy operation on two pcollections of tablerow)

Is there any way I can convert a PCollection to PCollection<KV<String, String>>, where the keys and values are stored in the same format as present in the tablerow?

I wrote a snippet that looks something like this but this doesnt give me the result I want, is there any way I can load all the entries in tablerow and generate a KV with those values?

ImmutableList<TableRow> input = ImmutableList.of(new TableRow().set("val1", "testVal1").set("val2", "testVal2").set("val3", "testVal3");
PCollection<TableRow> inputPC = p.apply(Create.of(input));

inputPC.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("val1"), (String) tableRow.get("val2"))));
Shriyut Jha
  • 79
  • 1
  • 10
  • Beam doesn't really have any built-in ways to transform TableRow, since that's a data type basically only used when reading and writing to BigQuery. For that reason, almost any solution to a problem involving TableRows is going to require writing a DoFn. Can you clarify what your current DoFn solution does and why you're unsatisfied with it? – Daniel Oliveira Nov 02 '21 at 02:14
  • I wanted to perform a join operation on data extracted from two bq tables in dataflow, I can't directly achieve this via a query since I'll get the table names dynamically. I'm a beginner in beam so I was exploring ways to convert tablerow into KV but the documentation didn't have anything useful, right now I have hardcoded some values in my DoFn to achieve this but that's not ideal. – Shriyut Jha Nov 02 '21 at 18:07

2 Answers2

1

It looks like what you want is a way to perform a Join on data obtained from BigQuery. There is no way to perform Joins on TableRows directly, and this is because TableRows are not meant to be generally manipulated as elements in your pipeline, their purpose is specifically for reading and writing with BigQuery IO.

In order to be able to use existing Beam transforms, you'll want to convert those TableRows into a more useful representation, such as either a Java object you write yourself, or the Beam schema Row type. Since TableRow is essentially a dictionary of JSON strings, all you need to do is write a Map function that reads the appropriate types and parses them if necessary. For example:

PCollection<TableRow> tableRows = ... // Reading from BigQuery IO.
PCollection<Foo> foos = tableRows.apply(MapElements.via(
    new SimpleFunction<TableRow, Foo>() {
        @Override
        public Foo apply(TableRow row) {
            String bar = (String) row.get("bar");
            Integer baz = (Integer.parseInt((String) row.get("baz")));
            return new Foo(bar, baz);
        }
    });

Once you have the data in a type of your choice, you can use find a way to perform a Join with built-in Beam transforms. There are many potential ways to do this so I won't list all of them, but a clear first choice to look at is the Join class.

Daniel Oliveira
  • 1,361
  • 4
  • 7
0

To convert from PCollection TableRow into PCollection string you can use the following code:

static class StringConverter extends DoFn<String, TableRow> {
@Override
public void processElement(ProcessContext c) { 
c.output(new TableRow().set("string_field", c.element())); }
}

Here you can read more on how to transform from a TableRow to a String.

Eduardo Ortiz
  • 715
  • 3
  • 14
  • Hi Eduardo, thanks for commenting, but I'm looking for converting tablerow to KV, is it possible to achieve this? – Shriyut Jha Nov 29 '21 at 15:14