I just need to run a dataflow pipeline on a daily basis, but it seems to me that suggested solutions like App Engine Cron Service, which requires building a whole web app, seems a bit too much. I was thinking about just running the pipeline from a cron job in a Compute Engine Linux VM, but maybe that's far too simple :). What's the problem with doing it that way, why isn't anybody (besides me I guess) suggesting it?
-
how finally did you do? Works well with Compute Engine? – IoT user Nov 28 '18 at 08:42
-
It does work well, it's been that way for more than a year – CCC Nov 28 '18 at 12:16
-
Use Composer to schedule the dataflow job. Composer is managed by google, you just need to create your flow dag and schedule your job. Mostly suitable for dataflow jobs. – miles212 Feb 13 '20 at 11:48
-
@Miles212: Personally, I do not like Composer for kicking off (python) dataflow jobs. The dataflow operators available with composer still require your pipeline to be written in python 2. – Thomas W. Dec 01 '20 at 10:25
4 Answers
This is how I did it using Cloud Functions, PubSub, and Cloud Scheduler (this assumes you've already created a Dataflow template and it exists in your GCS bucket somewhere)
Create a new topic in PubSub. this will be used to trigger the Cloud Function
Create a Cloud Function that launches a Dataflow job from a template. I find it easiest to just create this from the CF Console. Make sure the service account you choose has permission to create a dataflow job. the function's index.js looks something like:
const google = require('googleapis');
exports.triggerTemplate = (event, context) => {
// in this case the PubSub message payload and attributes are not used
// but can be used to pass parameters needed by the Dataflow template
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage, 'base64').toString());
console.log(event.attributes);
google.google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
console.error('Error occurred: ' + err.toString());
throw new Error(err);
}
const dataflow = google.google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {},
jobName: 'SOME-DATAFLOW-JOB-NAME',
gcsPath: 'gs://PATH-TO-YOUR-TEMPLATE'
}
}, function(err, response) {
if (err) {
console.error("Problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
});
});
};
The package.json looks like
{
"name": "pubsub-trigger-template",
"version": "0.0.1",
"dependencies": {
"googleapis": "37.1.0",
"@google-cloud/pubsub": "^0.18.0"
}
}
Go to PubSub and the topic you created, manually publish a message. this should trigger the Cloud Function and start a Dataflow job
Use Cloud Scheduler to publish a PubSub message on schedule https://cloud.google.com/scheduler/docs/tut-pub-sub

- 321
- 3
- 6
-
1Nice, Step 4 is the trick. Pity Google don't just do that for you, if they hid the complication of having to create a PubSub and a CloudFunction then it would effectively enable scheduling Dataflow from Cloud Scheduler. – Davos May 06 '19 at 04:19
-
1@Davos I think you CAN do this. See https://cloud.google.com/firestore/docs/solutions/schedule-export#create_a_cloud_function_and_a_job specifically the "Firebase CLI" based example, which I interpret as "subscribe to a built-in once-per-day event and run this cloud function." Much simpler than the console-based version of the same. If I can get this working I'll write an answer to the OP. – Chris Sep 15 '20 at 20:43
-
@Chris This was quite a while ago, I hope it's come along since then. I look forward to your answer. – Davos Sep 22 '20 at 06:24
There's absolutely nothing wrong with using a cron job to kick off your Dataflow pipelines. We do it all the time for our production systems, whether it be our Java or Python developed pipelines.
That said however, we are trying to wean ourselves off cron jobs, and move more toward using either AWS Lambdas (we run multi cloud) or Cloud Functions. Unfortunately, Cloud Functions don't have scheduling yet. AWS Lambdas do.

- 14,393
- 4
- 44
- 80
-
Now that I am back reading this comment I'm curious what you implemented as a multi cloud solution, as I'm now struggling with that :) – CCC Jan 25 '19 at 21:01
-
I have used cloudscheduler to kick off the Cloudfunction where i am generating some dynamic parameters needed by my Dataflow Jobs. Flow : CloudScheduler - > CloudFunction -> Dataflow Job. In in another use case am using Airflow as i have to trigger dataflow job in between of my other stages. – Rakesh Sabbani Mar 22 '23 at 10:49
There is a FAQ answer to that question: https://cloud.google.com/dataflow/docs/resources/faq#is_there_a_built-in_scheduling_mechanism_to_execute_pipelines_at_given_time_or_interval
- You can automate pipeline execution by using Google App Engine (Flexible Environment only) or Cloud Functions.
- You can use Apache Airflow's Dataflow Operator, one of several Google Cloud Platform Operators in a Cloud Composer workflow.
- You can use custom (cron) job processes on Compute Engine.
The Cloud Function approach is described as "Alpha" and it's still true that they don't have scheduling (no equivalent to AWS cloudwatch scheduling event), only Pub/Sub messages, Cloud Storage changes, HTTP invocations.
Cloud composer looks like a good option. Effectively a re-badged Apache Airflow, which is itself a great orchestration tool. Definitely not "too simple" like cron :)

- 5,066
- 42
- 66
You can use cloud scheduler to schedule your job as well. See my post
https://medium.com/@zhongchen/schedule-your-dataflow-batch-jobs-with-cloud-scheduler-8390e0e958eb
Terraform script
data "google_project" "project" {}
resource "google_cloud_scheduler_job" "scheduler" {
name = "scheduler-demo"
schedule = "0 0 * * *"
# This needs to be us-central1 even if the app engine is in us-central.
# You will get a resource not found error if just using us-central.
region = "us-central1"
http_target {
http_method = "POST"
uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/templates:launch?gcsPath=gs://zhong-gcp/templates/dataflow-demo-template"
oauth_token {
service_account_email = google_service_account.cloud-scheduler-demo.email
}
# need to encode the string
body = base64encode(<<-EOT
{
"jobName": "test-cloud-scheduler",
"parameters": {
"region": "${var.region}",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
},
"environment": {
"maxWorkers": "10",
"tempLocation": "gs://zhong-gcp/temp",
"zone": "us-west1-a"
}
}
EOT
)
}
}

- 21
- 5
-
Links are generally not accepted as answers. Please describe the process instead of simply linking it. Links can break or go down, or the poster might not be able to follow it due to geographical restrictions, etc. – Vendetta Feb 07 '20 at 00:16
-
1https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/templates:launch?gcsPath=gs://latest_Word_Count httpRequest: { status: 400 } insertId: "n1mrp7g2l8yr3o" jsonPayload: { @type: "type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished" jobName: "projects/projectid/locations/europe-west2/jobs/scheduler-WordCount" status: "INVALID_ARGUMENT" targetType: "HTTP" url: "https://dataflow.googleapis.com/v1b3/projects/projecid/locations/europe-west2/templates:launch?gcsPath=gs://latest_Word_Count" } – Rakesh Sabbani Jan 19 '22 at 10:22
-
1We have followed the same syntax but getting the above error status: "INVALID_ARGUMENT". We have added all the roles to the service account which we are using to shedule – Rakesh Sabbani Jan 19 '22 at 10:25
-
@RakeshSabbani You probably put some invalid parameters in the "parameters" field of the body. These are parameters passed to your job, but it's very specific to your job, so you probably need to change them or delete them. Personally, I just deleted the whole "parameters" field and everything worked fine. ref: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.templates/launch, https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters – The Foxy Dec 20 '22 at 14:14
-
@TheFoxy But my requirement is to pass certain dynamic values as parameters to my dataflow job, how can be achieved? – Rakesh Sabbani Jan 11 '23 at 11:28