Automating Ray Workflows with Astronomer and Ray on Vertex AI

Introduction

Orchestrating distributed workloads efficiently is essential to maintaining performance and cost effectiveness of modern data engineering and machine learning workflows. By combining Astronomer, Ray, and Google Cloud Vertex AI, you can dynamically create and manage Ray clusters to execute jobs and clean up resources afterward. In this post, we’ll work through a process to automate these steps.

Why Astronomer, Ray, and Vertex AI?

Benefits of Using Astronomer with Ray on Vertex AI

Pre-requisites

Before you begin, ensure the following:

Google Cloud Setup

Astronomer Setup

Python Environment

pip install ray google-cloud-aiplatform

Ray on Vertex AI

Setting Up the Workflow

Using the Astronomer CLI

The Astronomer CLI simplifies project setup, local development, and deployment of workflows. Install it if you haven’t already:

For Linux:

curl -sSL install.astronomer.io | sudo bash -s

For Mac OS:

brew install astro

Verify installation:

astro version

Create a New Astronomer Project

To create a new Airflow project, run:

astro dev init

This sets up the following structure:

.
├── .env                   #  Make sure to set GOOGLE_APPLICATION_CREDENTIALS here with a path to your creds JSON file
├── dags/                  #  Directory for your DAG files
├── Dockerfile             #  Defines your Airflow environment
├── requirements.txt       #  Python dependencies
├── airflow_settings.yaml  #  Astronomer workspace settings

Define the DAG

In the dags/ directory, create a file ray_vertex_ai_dag.py with the following code:

from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from google.cloud import aiplatform
from ray.job_submission import JobSubmissionClient
from vertex_ray import Resources

import logging
import ray
import vertex_ray

def cluster_creation(ti):
    try:
        cluster_name = "cluster"
        project_name = "<project_name>"

        logging.info(f"Running cluster creation commands for cluster: {cluster_name}")

        # Define a default CPU cluster, machine_type is n1-standard-16 for the head node, n1-standard-8 for the worker node
        head_node_type = Resources(
            machine_type="n1-standard-16",
            node_count=1
        )

        worker_node_types = [Resources(
            machine_type="n1-standard-8",
            node_count=1,
        )]

        aiplatform.init(project=project_name)

        # Check to see if cluster already exists
        clusters = vertex_ray.list_ray_clusters()
        for cluster in clusters:
            if cluster.cluster_resource_name.split('/')[-1] == cluster_name:
                logging.info(f"Cluster already exists, skipping cluster creation: {cluster.cluster_resource_name}")

                ti.xcom_push(key="cluster_resource_name", value=cluster.cluster_resource_name)

                return "Cluster already exists."

        # Initialize Vertex AI to retrieve projects for downstream operations.
        # Create the Ray cluster on Vertex AI
        cluster_resource_name = vertex_ray.create_ray_cluster(
            head_node_type=head_node_type,
            worker_node_types=worker_node_types,
            python_version="3.10",           # Optional
            ray_version="2.33",              # Optional
            cluster_name=cluster_name,
            enable_metrics_collection=True,  # Optional. Enable metrics collection for monitoring.
        )

        logging.info(f"Cluster successfully created: {cluster_resource_name}")

        ti.xcom_push(key="cluster_resource_name", value=cluster_resource_name)

        return "Cluster creation commands ran successfully."
    except Exception as e:
        logging.error(f"Cluster creation commands failed: {e}")
        raise

def job_submission_script(ti):
    try:
        logging.info("Running job submission script...")

        cluster_resource_name = ti.xcom_pull(key="cluster_resource_name", task_ids="cluster_creation")

        print(f"Submitting job to cluster: {cluster_resource_name}")

        client = JobSubmissionClient("vertex_ray://{}".format(cluster_resource_name))

        job_id = client.submit_job(
            entrypoint="python __main__.py",
            runtime_env={
                "working_dir": "./ray_job",
                "pip": [
                    "numpy",
                    "setuptools<70.0.0",
                    "ray==2.33.0",  # pin the Ray version to the same version as the cluster
                ]
            }
        )

        # Ensure that the Ray job has been created.
        logging.info(f"Submitted Job ID: {job_id}")

        return "Job submission script ran successfully."
    except Exception as e:
        logging.error("Job submission failed: %s", e)
        raise

def cluster_deletion(ti):
    try:
        cluster_resource_name = ti.xcom_pull(key="cluster_resource_name", task_ids="cluster_creation")

        logging.info(f"Terminating cluster: {cluster_resource_name}")

        ray.shutdown()
        vertex_ray.delete_ray_cluster(cluster_resource_name)

        return "Cluster deletion commands ran successfully."
    except Exception as e:
        logging.error("Cluster deletion commands failed: %s", e)
        raise

@dag(
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    description='A DAG for Ray cluster management and job submission',
    doc_md=__doc__,
    default_args={"owner": "Astro", "retries": 3},
    tags=["ray", "job management"],
)
def ray_vertex_ai_workflow():
    cluster_create = PythonOperator(
        task_id='cluster_creation',
        python_callable=cluster_creation,
    )

    submit_job = PythonOperator(
        task_id='submit_job',
        python_callable=job_submission_script,
    )

    teardown_cluster = PythonOperator(
        task_id='cluster_deletion',
        python_callable=cluster_deletion,
    )

    cluster_create >> submit_job >> teardown_cluster

ray_vertex_ai_workflow()

Test the DAG Locally

Start the local Airflow environment with the Astronomer CLI:

astro dev start

Run the Airflow DAG locally:

astro run ray_vertex_ai_workflow

Access the Airflow UI at http://localhost:8080. Use the default credentials (admin / admin) to log in. You should see your ray_vertex_ai_workflow DAG listed.

Trigger the DAG to ensure everything works as expected.

Deploy the DAG to Astronomer Cloud

  1. Add any dependencies to requirements.txt:
ray
google-cloud-aiplatform
  1. Deploy your project:
astro deploy
  1. Select your Astronomer workspace and deployment during the process. Make sure that you have properly configured your GCP cloud credentials.

Source Code

The full source code is available here:

Link

Next Steps

Please feel free to contact me at justinrmiller@gmail.com if you have any questions, comments, or corrections.

Here are some ideas on next steps:

Conclusion

By leveraging the Astronomer and Ray on Vertex AI, you can streamline the entire lifecycle of your distributed workflows while maintaining scalability and cost-effectiveness.