Automating Ray Workflows with Astronomer and Ray on Vertex AI
Introduction
Orchestrating distributed workloads efficiently is essential to maintaining performance and cost effectiveness of modern data engineering and machine learning workflows. By combining Astronomer, Ray, and Google Cloud Vertex AI, you can dynamically create and manage Ray clusters to execute jobs and clean up resources afterward. In this post, we’ll work through a process to automate these steps.
Why Astronomer, Ray, and Vertex AI?
- Astronomer provides a managed Apache Airflow platform to orchestrate complex workflows.
- Ray enables distributed computing for machine learning, deep learning, and data-intensive applications.
- Vertex AI offers managed services for scalable and cost-effective resource management on Google Cloud.
Benefits of Using Astronomer with Ray on Vertex AI
- Dynamic Ray Cluster Configuration: Spin up Ray clusters that are sized to the needs of the Ray job with data retrieved from sources like Snowflake or Redshift.
- Cost Control: Spin down resources whenever the job completes to save on cluster resource costs.
- Simplified Local Development: Easily set up and test workflows locally before deploying to production.
- Effortless Deployment: Astronomer CLI provides one command to push updates to the Astronomer platform.
- Version Control: Seamlessly integrates with Git for managing your DAGs and dependencies.
Pre-requisites
Before you begin, ensure the following:
Google Cloud Setup
- A Google Cloud project with Vertex AI APIs enabled.
- A Service Account with permissions to manage Vertex AI and related resources.
- Billing enabled in your Google Cloud project.
Astronomer Setup
- Astronomer CLI installed: Installation Guide.
- An Astronomer workspace created.
- Access to an Astronomer project.
Python Environment
- Python 3.10+ installed with the following libraries:
pip install ray google-cloud-aiplatform
Ray on Vertex AI
- A basic understanding of Ray clusters.
- Vertex AI configured to support Ray workloads.
Setting Up the Workflow
Using the Astronomer CLI
The Astronomer CLI simplifies project setup, local development, and deployment of workflows. Install it if you haven’t already:
For Linux:
curl -sSL install.astronomer.io | sudo bash -s
For Mac OS:
brew install astro
Verify installation:
astro version
Create a New Astronomer Project
To create a new Airflow project, run:
astro dev init
This sets up the following structure:
.
├── .env # Make sure to set GOOGLE_APPLICATION_CREDENTIALS here with a path to your creds JSON file
├── dags/ # Directory for your DAG files
├── Dockerfile # Defines your Airflow environment
├── requirements.txt # Python dependencies
├── airflow_settings.yaml # Astronomer workspace settings
Define the DAG
In the dags/
directory, create a file ray_vertex_ai_dag.py
with the following code:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from google.cloud import aiplatform
from ray.job_submission import JobSubmissionClient
from vertex_ray import Resources
import logging
import ray
import vertex_ray
def cluster_creation(ti):
try:
cluster_name = "cluster"
project_name = "<project_name>"
logging.info(f"Running cluster creation commands for cluster: {cluster_name}")
# Define a default CPU cluster, machine_type is n1-standard-16 for the head node, n1-standard-8 for the worker node
head_node_type = Resources(
machine_type="n1-standard-16",
node_count=1
)
worker_node_types = [Resources(
machine_type="n1-standard-8",
node_count=1,
)]
aiplatform.init(project=project_name)
# Check to see if cluster already exists
clusters = vertex_ray.list_ray_clusters()
for cluster in clusters:
if cluster.cluster_resource_name.split('/')[-1] == cluster_name:
logging.info(f"Cluster already exists, skipping cluster creation: {cluster.cluster_resource_name}")
ti.xcom_push(key="cluster_resource_name", value=cluster.cluster_resource_name)
return "Cluster already exists."
# Initialize Vertex AI to retrieve projects for downstream operations.
# Create the Ray cluster on Vertex AI
cluster_resource_name = vertex_ray.create_ray_cluster(
head_node_type=head_node_type,
worker_node_types=worker_node_types,
python_version="3.10", # Optional
ray_version="2.33", # Optional
cluster_name=cluster_name,
enable_metrics_collection=True, # Optional. Enable metrics collection for monitoring.
)
logging.info(f"Cluster successfully created: {cluster_resource_name}")
ti.xcom_push(key="cluster_resource_name", value=cluster_resource_name)
return "Cluster creation commands ran successfully."
except Exception as e:
logging.error(f"Cluster creation commands failed: {e}")
raise
def job_submission_script(ti):
try:
logging.info("Running job submission script...")
cluster_resource_name = ti.xcom_pull(key="cluster_resource_name", task_ids="cluster_creation")
print(f"Submitting job to cluster: {cluster_resource_name}")
client = JobSubmissionClient("vertex_ray://{}".format(cluster_resource_name))
job_id = client.submit_job(
entrypoint="python __main__.py",
runtime_env={
"working_dir": "./ray_job",
"pip": [
"numpy",
"setuptools<70.0.0",
"ray==2.33.0", # pin the Ray version to the same version as the cluster
]
}
)
# Ensure that the Ray job has been created.
logging.info(f"Submitted Job ID: {job_id}")
return "Job submission script ran successfully."
except Exception as e:
logging.error("Job submission failed: %s", e)
raise
def cluster_deletion(ti):
try:
cluster_resource_name = ti.xcom_pull(key="cluster_resource_name", task_ids="cluster_creation")
logging.info(f"Terminating cluster: {cluster_resource_name}")
ray.shutdown()
vertex_ray.delete_ray_cluster(cluster_resource_name)
return "Cluster deletion commands ran successfully."
except Exception as e:
logging.error("Cluster deletion commands failed: %s", e)
raise
@dag(
start_date=datetime(2024, 1, 1),
schedule="@daily",
catchup=False,
description='A DAG for Ray cluster management and job submission',
doc_md=__doc__,
default_args={"owner": "Astro", "retries": 3},
tags=["ray", "job management"],
)
def ray_vertex_ai_workflow():
cluster_create = PythonOperator(
task_id='cluster_creation',
python_callable=cluster_creation,
)
submit_job = PythonOperator(
task_id='submit_job',
python_callable=job_submission_script,
)
teardown_cluster = PythonOperator(
task_id='cluster_deletion',
python_callable=cluster_deletion,
)
cluster_create >> submit_job >> teardown_cluster
ray_vertex_ai_workflow()
Test the DAG Locally
Start the local Airflow environment with the Astronomer CLI:
astro dev start
Run the Airflow DAG locally:
astro run ray_vertex_ai_workflow
Access the Airflow UI at http://localhost:8080. Use the default credentials (admin
/ admin
) to log in. You should see your ray_vertex_ai_workflow
DAG listed.
Trigger the DAG to ensure everything works as expected.
Deploy the DAG to Astronomer Cloud
- Add any dependencies to
requirements.txt
:
ray
google-cloud-aiplatform
- Deploy your project:
astro deploy
- Select your Astronomer workspace and deployment during the process. Make sure that you have properly configured your GCP cloud credentials.
Source Code
The full source code is available here:
Next Steps
Please feel free to contact me at justinrmiller@gmail.com if you have any questions, comments, or corrections.
Here are some ideas on next steps:
- Experiment with more complex Ray jobs, such as hyperparameter optimization or reinforcement learning tasks.
- Add monitoring and alerts to your DAGs for enhanced observability. Astronomer and Ray on Vertex AI provides automatic integrations with platforms like Datadog.
- If you’re generating embeddings or resizing media, try building a system that dynamically sizes the Ray cluster based on the input size of the batch job.
Conclusion
By leveraging the Astronomer and Ray on Vertex AI, you can streamline the entire lifecycle of your distributed workflows while maintaining scalability and cost-effectiveness.