4

I have packaged my training code as a python package and then am able to run it as a custom training job on Vertex AI. Now, I wanted to be able to schedule this job to run, say every 2 weeks, and re-train the model. The Scheduling settings in the CustomJoBSpec allow only 2 fields, "timeout" and "restartJobOnWorkerRestart" so it's not possible using the scheduling settings in the CustomJobSpec. One way to achieve this I could think of was to create a Vertex AI pipeline with a single step using the "CustomPythonPackageTrainingJobRunOp" Google Cloud Pipeline Component and then scheduling the pipeline to run as I see fit. Are there better alternatives to achieve this?

Edit:

I was able to schedule the custom training job using Cloud Scheduler, but I found using the create_schedule_from_job_spec method in the AIPlatformClient very easy to use in the Vertex AI pipeline. The steps I took to schedule the custom job using Cloud Scheduler in gcp are as follows, link to google docs:

  1. Set target type to HTTP
  2. For the url to specify the custom job, I followed this link to get the url
  3. For the authentication, under Auth header, I selected the "Add OAauth token"

You also need to have a "Cloud Scheduler service account" with a "Cloud Scheduler Service Agent role granted to it" in your project. Although the docs ay this should have been set up automatically if you enabled the Cloud Scheduler API after March 19, 2019, this was not the case for me and had to add the service account with the role manually.

racerX
  • 930
  • 9
  • 25
  • 1
    How do you run your job today? Through the UI or the gcloud CLI? If with the UI, try to perform the same thing with the CLI. Then add the --log-http parameter to view the API call which is performed. Finally, you can reproduce this API call with Cloud Scheduler. If you have difficulties, update your questions when you are stuck in this process, and ping with a comment hereafter. – guillaume blaquiere Aug 15 '21 at 19:45
  • Neither UI nor gcloud CLI, I used a VertexAI notebook and Vertex Python SDK, and followed this link, https://cloud.google.com/vertex-ai/docs/training/create-custom-job#create. – racerX Aug 15 '21 at 20:01
  • It's easier, you have the REST request performed. Try it with your parameters. When it works you can paste it in Cloud Scheduler. – guillaume blaquiere Aug 16 '21 at 08:27
  • Yes, I had been looking into that, see the new Edit. I was looking for something similar to how we can schedule a Vertex AI pipeline which I found much more straightforward to use – racerX Aug 16 '21 at 21:08
  • Under the hood, a Cloud Scheduler will be created by Vertex Pipepline, but embedded in a function call, not directly deployed by yourselves ;) – guillaume blaquiere Aug 16 '21 at 21:48
  • True, but it abstracts away that whole cloud scheduler layer which I liked – racerX Aug 16 '21 at 22:05

1 Answers1

4

As per your requirement, the various possible ways for scheduling :

1. Cloud Composer

Cloud Composer is a managed Apache Airflow that helps you create, schedule, monitor and manage workflows.

You can follow the below mentioned steps to schedule your job every two weeks using Composer :

  • Create a Composer Environment.
  • Write a DAG file and add your custom training python code to the DAG file.
  • Since the custom training job is a python code, the PythonOperator can be used to schedule the tasks.
  • In the DAG file you need to provide the start time i.e from which time the scheduling will start and you need to define the schedule interval as two weeks as shown :
with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:

Alternately, you can also use the unix-cron string format (* * * * *) to do the scheduling.

I.e. In your case for scheduling every two weeks the cron format will be like : * * 1,15 * *

You can pass the parameters required by the custom job inside the PythonOperator using op_args and op_kwargs arguments.

After the DAG file is written, you need to push it to the dags/ folder inside the Composer Environment Bucket.

You can check the status of the scheduled DAG in the Airflow UI.

The scheduled DAG file would look like this:

sample_dag.py :

from __future__ import print_function

import datetime

from google.cloud import aiplatform

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:
    
    def create_custom_job_sample(
    project: str,
    display_name: str,
    container_image_uri: str,
    location: str,
    api_endpoint: str,
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(client_options=client_options)
    custom_job = {
        "display_name": display_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                        "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                        "accelerator_count": 1,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": container_image_uri,
                        "command": [],
                        "args": [],
                    },
                }
            ]
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_custom_job(parent=parent, custom_job=custom_job)
    print("response:", response)
    
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=create_custom_job_sample,
        op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
        )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='job scheduled')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

2. Cloud Scheduler: To schedule a job using Cloud Scheduler you will need to do the following configuration:

  • Target : HTTP
  • URL : Endpoint URL of the job (example : “us-central1-aiplatform.googleapis.com”)
  • Auth header : OAuth token for for Google APIs hosted on *.googleapis.com

3. Scheduling a recurring pipeline run using the Kubeflow Pipelines SDK:

You can schedule a recurring pipeline run using Python and the Kubeflow Pipelines SDK.

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region=REGION)

api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=* * 1,15 * *,
    time_zone=TIME_ZONE,
    parameter_values=PIPELINE_PARAMETERS
)
Sandeep Mohanty
  • 1,419
  • 5
  • 13
  • 1
    For this use case I think using Cloud Scheduler is probably more straightforward, or a scheduled pipeline. – racerX Aug 16 '21 at 21:09
  • Hi @racerX, as you are already aware of Cloud Scheduler and recurring pipeline, I have updated the answer with various possible ways for scheduling. – Sandeep Mohanty Aug 17 '21 at 07:24
  • I am working on exactly similar use case where I want to 1) serve the model for inference 2) Retrain the model every week and re deploy the model.. I am looking for CT and CD. Can you plz share few articles or any documents you referred to create one! I am planning to user VertexAI – Chaitanya Patil Aug 26 '21 at 03:05
  • Hi @ChaitanyaPatil Could you please post your question regarding articles and documents in the [Cloud Community](https://www.googlecloudcommunity.com/gc/Google-Cloud/ct-p/google-cloud) forum. This community hosts peer-to-peer product forums and is a great resource to get answers. – Sandeep Mohanty Aug 27 '21 at 14:29
  • @racerX How do you provide the unique jobId with cloud scheduler every time? – adarsh Sep 08 '21 at 15:29
  • Hi @adarsh, I don't, I simply provide the training job name, VertexAI creates the unique ID every time a new training job is started. – racerX Sep 08 '21 at 20:37
  • @racerX thanks, it seems like that bit is different between vertex-ai and ai-platform. Time to switch to vertex-ai! – adarsh Sep 10 '21 at 09:59