2

Whenever a file is written to Cloud Storage, I want it to trigger a Cloud Function that executes a DataFlow template to transform the file content and write the results to BigQuery.

I think I got a handle that much for the most part. But the problem is that I don't need to just insert into a BQ table, I need to upsert (using the Merge operation). This seems like it would be a common requirement, but the Apache Beam BQ connector doesn't offer this option (only write, create and truncate/write).

So then I thought... OK, if I can just capture when the DataFlow pipeline is done executing, I could have DataFlow write to a temporary table and then I could call a SQL Merge query to merge data from the temp table to the target table. However, I'm not seeing any way to trigger a cloud function upon pipeline execution completion.

Any suggestions on how to accomplish the end goal?

Thanks

Jim Ott
  • 725
  • 13
  • 24

3 Answers3

2

Interesting question, some good ideas already but I'd like to show another possibility with just Dataflow and BigQuery. If this is a non-templated Batch job we can use PipelineResult.waitUntilFinish() which:

Waits until the pipeline finishes and returns the final status.

Then we check if State is DONE and proceed with the MERGE statement if needed:

PipelineResult res = p.run();
res.waitUntilFinish();

if (res.getState() == PipelineResult.State.DONE) {
    LOG.info("Dataflow job is finished. Merging results...");
    MergeResults();
    LOG.info("All done :)");
}

In order to test this we can create a BigQuery table (upsert.full) which will contain the final results and be updated each run:

bq mk upsert
bq mk -t upsert.full name:STRING,total:INT64
bq query --use_legacy_sql=false "INSERT upsert.full (name, total) VALUES('tv', 10), ('laptop', 20)"

at the start we'll populate it with a total of 10 TVs. But now let's imagine that we sell 5 extra TVs and, in our Dataflow job, we'll write a single row to a temporary table (upsert.temp) with the new corrected value (15):

p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
                .<String>write()
                .to(output)
                .withFormatFunction(
                    (String dummy) ->
                    new TableRow().set("name", "tv").set("total", 15))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withSchema(schema));

So now we want to update the original table with the following query (DML syntax):

MERGE upsert.full F
USING upsert.temp T
ON T.name = F.name
WHEN MATCHED THEN
  UPDATE SET total = T.total
WHEN NOT MATCHED THEN
  INSERT(name, total)
  VALUES(name, total)

Therefore, we can use BigQuery's Java Client Library in MergeResults:

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
          "MERGE upsert.full F "
        + ...
        + "VALUES(name, total)")
        .setUseLegacySql(false)
        .build();

JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

This is based on this snippet which includes some basic error handling. Note that you'll need to add this to your pom.xml or equivalent:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-bigquery</artifactId>
  <version>1.82.0</version>
</dependency>

and it works for me:

INFO: 2020-02-08T11:38:56.292Z: Worker pool stopped.
Feb 08, 2020 12:39:04 PM org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
INFO: Job 2020-02-08_REDACTED finished with status DONE.
Feb 08, 2020 12:39:04 PM org.apache.beam.examples.BigQueryUpsert main
INFO: Dataflow job is finished. Merging results...
Feb 08, 2020 12:39:09 PM org.apache.beam.examples.BigQueryUpsert main
INFO: All done :)
$ bq query --use_legacy_sql=false "SELECT name,total FROM upsert.full LIMIT 10"
+--------+-------+
|  name  | total |
+--------+-------+
| tv     |    15 |
| laptop |    20 |
+--------+-------+

Tested with the 2.17.0 Java SDK and both the Direct and Dataflow runners.

Full example here

Guillem Xercavins
  • 6,938
  • 1
  • 16
  • 35
  • This looks great, thanks! The problem is that I think I need to run it from a template since I don't have any server infrastructure myself. I guess the alternative would be to have a GAE instance running Java? – Jim Ott Feb 12 '20 at 20:37
  • 1
    You have several alternatives, some of them in [this question](https://stackoverflow.com/questions/43816707/easiest-way-to-schedule-a-google-cloud-dataflow-job) or [this faq](https://cloud.google.com/dataflow/docs/resources/faq#is_there_a_built-in_scheduling_mechanism_to_execute_pipelines_at_given_time_or_interval): cron in GCE/GAE, Cloud Functions, etc. Airflow/Composer is a good one and can launch a [self-executing jar](https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar) but in that case you can just run the query task after the Dataflow job is successful – Guillem Xercavins Feb 13 '20 at 20:03
1

There is no native built in solution to generate an event at the end of Dataflow job. However, you can cheat thanks to the logs.

For this:

  • Go to logs, select advanced filter (arrow on the right of the filter bar) and paste this custom filter:
resource.type="dataflow_step" textPayload="Worker pool stopped."

You should see only your end of dataflow. Then, you have to create a sink into PubSub of this result. Then, you have to plug your function on these PubSub messages and you can do what you want.

For this, after having filling up your custom filter

  • Click on create sink
  • Set a sink name
  • Set the destination to PubSub
  • Select your topic
  • Now, plug a function on this topic, it will be trigger only at the end of dataflow.
guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76
  • 1
    Another way is not to use a template. Instead, your Cloud Function calls Cloud Build, which runs the pipeline in blocking mode. Then just wait for it to finish, and carry on with your next step when it's done. Downside, of course, is you're paying for Cloud Build while you're waiting, but it's very cheap. Also, if your pipeline needs to run for a very long time (hours), then you may have problems. – Graham Polley Feb 06 '20 at 11:48
  • Thanks Guillaume. I actually saw your similar answer on another question. Nice and creative! It seemed like you were having a discussion with someone else where they weren't getting the same log output as you. So I kind of shied away from that approach as it seems Google may change their logging text at any time without warning. I was hoping to find a more robust solution, but I will definitely keep this in mind if it's the only option. Thanks for sharing! – Jim Ott Feb 07 '20 at 04:16
  • Thanks @GrahamPolley. I'm pretty new to DataFlow, so I'm not entirely familiar with you're suggesting. But I *think* I mostly understand it. I'm going to look into that. – Jim Ott Feb 07 '20 at 04:18
1

I have implemented the exact use case, but instead of using 2 different pipeline, you can just create 1 pipeline.

Step 1: Read file from gcs and convert it into TableRow.

Step 2: Read the entire row from BigQuery.

Step 3: Create 1 pardo where you have your custom upsert operation like below code.

PCollection<KV<String,TableRow>> val = p.apply(BigQueryIO.readTableRows().from(""));

PCollection<KV<String,TableRow>> val1 = p.apply(TextIO.read().from("")).apply(Convert to TableRow()));

Step 4: Perform CoGroupByKey and perform pardo on top of that result to get the updated one(equivalent to MERGE OPERATION).

Step 5: Insert the complete TableRow to BQ using WRITE_TRUNCATE mode. Here the code part would be little bit complicate, but that would perform better using single pipeline.

miles212
  • 383
  • 3
  • 20
  • Thanks @miles212. I believe I was only talking about a single pipeline to start with (I'm pretty new to DataFlow, so maybe I'm not using some correct terminology). But if I understand you correctly, I think you're saying to query the db to check for the existence of the row first (from within the pipeline) and if it does, then do my own merge, then update the row as the output accordingly. Seems to make sense (if I'm understanding correctly). I'll look in to that! – Jim Ott Feb 07 '20 at 04:22
  • Exactly you got my point. Try to build the pipeline, in case you face any difficulties let me know, I'll help you with the code part. – miles212 Feb 07 '20 at 05:10
  • After I posted my question and nobody replied for a while, I had to move on with a different (non-DataFlow solution). But I want to come back to it soon and learn DataFlow better. I will acknowledge the solution I use when I settle on one. The problem I see with yours is the number of insert/update operations. Only 1,000 day allowed per table. I would be going WAY over that. Unless I'm thinking about this wrong? – Jim Ott Feb 12 '20 at 20:16