0

I am receiving messages via pub-sub and would like to upload to big-query using the message data for determining what table to upload the data to.

I tried doing the following:

Pipeline pipeline = Pipeline.create(options); String bigQueryTable;

PCollection<String> input = pipeline
        .apply(PubsubIO.Read.subscription("projects/my-data-analysis/subscriptions/myDataflowSub"));

input.apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(DoFn<String, TableRow>.ProcessContext c) throws Exception {
        JSONObject firstJSONObject = new JSONObject(c.element());
         bigQueryTable = firstJSONObject.get("tableName").toString();

         TableRow tableRow = convertJsonToTableRow(firstJSONObject);  
        c.output(tableRow);

    }

})).apply(BigQueryIO.Write.to("my-data-analysis:mydataset." + bigQueryTable).withSchema(tableSchema));

is there any way doing this without writing my own DOFN?

If I do need to implement my own doFn how do implement it to upload to big-query?

dina
  • 4,039
  • 6
  • 39
  • 67
  • Possible duplicate of [Writing different values to different BigQuery tables in Apache Beam](https://stackoverflow.com/questions/43505534/writing-different-values-to-different-bigquery-tables-in-apache-beam) – jkff Aug 02 '17 at 18:45

1 Answers1

1

Right now this is not directly possible, but there are various workarounds covering some potential use cases. See related questions:

Dynamic table name when writing to BQ from dataflow pipelines

Specifying dynamically generated table name based on line contents

Community
  • 1
  • 1
jkff
  • 17,623
  • 5
  • 53
  • 85