As per your requirement, the various possible ways for scheduling :
1. Cloud Composer
Cloud Composer is a managed Apache Airflow that helps you create, schedule, monitor and manage workflows.
You can follow the below mentioned steps to schedule your job every two weeks using Composer :
- Create a Composer Environment.
- Write a DAG file and add your custom training python code to the DAG file.
- Since the custom training job is a python code, the PythonOperator can be used to schedule the tasks.
- In the DAG file you need to provide the start time i.e from which time the scheduling will start and you need to define the schedule interval as two weeks as shown :
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
Alternately, you can also use the unix-cron string format (* * * * *) to do the scheduling.
I.e. In your case for scheduling every two weeks the cron format will be like : * * 1,15 * *
You can pass the parameters required by the custom job inside the PythonOperator using op_args and op_kwargs arguments.
After the DAG file is written, you need to push it to the dags/ folder inside the Composer Environment Bucket.
You can check the status of the scheduled DAG in the Airflow UI.
The scheduled DAG file would look like this:
sample_dag.py :
from __future__ import print_function
import datetime
from google.cloud import aiplatform
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
def create_custom_job_sample(
project: str,
display_name: str,
container_image_uri: str,
location: str,
api_endpoint: str,
):
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
"display_name": display_name,
"job_spec": {
"worker_pool_specs": [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": container_image_uri,
"command": [],
"args": [],
},
}
]
},
}
parent = f"projects/{project}/locations/{location}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=create_custom_job_sample,
op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='job scheduled')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
2. Cloud Scheduler:
To schedule a job using Cloud Scheduler you will need to do the following configuration:
- Target : HTTP
- URL : Endpoint URL of the job (example : “us-central1-aiplatform.googleapis.com”)
- Auth header : OAuth token for for Google APIs hosted on *.googleapis.com
3. Scheduling a recurring pipeline run using the Kubeflow Pipelines SDK:
You can schedule a recurring pipeline run using Python and the Kubeflow Pipelines SDK.
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
region=REGION)
api_client.create_schedule_from_job_spec(
job_spec_path=COMPILED_PIPELINE_PATH,
schedule=* * 1,15 * *,
time_zone=TIME_ZONE,
parameter_values=PIPELINE_PARAMETERS
)