LLM Inference with the Airflow AI SDK and Ollama
In this blog post, I will demonstrate how the Airflow AI SDK and Ollama can be used to develop Airflow DAGs at no cost locally with Astro CLI. This allows for full end-to-end testing of the Airflow DAG without reaching out to a third-party LLM provider and for a broader range of model selection. The example takes a collection of product names, submits to Ollama a request with a system prompt designed to generate blog ideas, then prints out the blog ideas.
Technology Stack
- Apache Airflow: Workflow orchestrator for automating data pipelines.
- Airflow AI SDK: Simplifies integration of LLM workflows directly into Apache Airflow DAGs.
- Ollama: Enables local hosting of powerful LLMs, ensuring secure, efficient, and private content generation.
Advantages of Using the Airflow AI SDK
- Ease of Integration: Easily embed LLM-driven tasks directly within Airflow DAGs.
- Scalability: Efficiently scale your AI workflows across multiple tasks.
- Improved Visibility: Track and monitor AI-driven tasks within Airflow’s familiar and user-friendly interface, simplifying troubleshooting and optimization.
Prerequisites
Ensure the following before proceeding:
- Apache Airflow environment
- If you don’t have an Airflow environment locally, you can set one up with the following commands:
# install Astro CLI brew install astro # initialize the Astro project astro dev init # place the DAG in the dags directory ... # start Astronomer/Airflow astro dev start
- Make sure that your Airflow requirements.txt includes:
airflow-ai-sdk[openai]
- If you don’t have an Airflow environment locally, you can set one up with the following commands:
- Ollama with a configured local model (
llama3.1:8b
). Make sure the model that you’re using supports tool calling.
Integrating with the Airflow AI SDK
Step 1: Define Products
Create a task to list products for content ideation:
@task
def get_products():
return [
{"name": "Apache Airflow"},
{"name": "Astronomer"},
{"name": "Astro CLI"},
]
Step 2: Generate Blog Ideas with Airflow AI SDK and Ollama
Use Airflow AI SDK to interact with local LLMs:
@task.llm(
model=OpenAIModel(
model_name="llama3.1:8b",
provider=OpenAIProvider(base_url="http://host.docker.internal:11434/v1")
),
result_type=BlogIdea,
system_prompt="""
You are an experienced content strategist tasked with generating an engaging and informative blog idea based on a given product name. Given the product name provided, produce a compelling idea for a blog post.
Return only the name and the idea.
Product Name: [Insert Product Name Here]
""",
)
def generate_blog_idea(product: dict):
return f"Product Name: {product['name']}"
Step 3: Review and Display Ideas
Implement a task for clear visualization of generated ideas:
@task
def display_blog_ideas(blog_ideas: list[BlogIdea]):
from pprint import pprint
ideas_list = [{"Product": idea.name, "Idea": idea.idea} for idea in blog_ideas]
pprint(ideas_list)
return ideas_list
Step 4: Define the DAG
Automate your ideation workflow:
@dag(schedule=None, start_date=pendulum.datetime(2025, 3, 1, tz="UTC"), catchup=False)
def ollama_blog_idea_generation():
products = get_products()
blog_ideas = generate_blog_idea.expand(product=products)
display_blog_ideas(blog_ideas)
ollama_blog_idea_generation_dag = ollama_blog_idea_generation()
Contribution Back to Airflow AI SDK Examples
I have created a PR that adds this example to the Airflow AI SDK Examples project. If it is merged I will update this link:
Airflow AI SDK - Ollama Example
Conclusion
By combining Apache Airflow, the Airflow AI SDK, and Ollama, you can develop LLM inference jobs entirely locally and write tests that can run consistently without relying on third-party LLM providers.
Further Reading
Full DAG Source
"""
This example consumes a list of products and produces a collection of blog ideas
"""
import pendulum
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
import airflow_ai_sdk as ai_sdk
@task
def get_products() -> list[dict]:
"""Get the list of products."""
return [
{"name": "Apache Airflow"},
{"name": "Astronomer"},
{"name": "Astro CLI"},
]
class BlogIdea(ai_sdk.BaseModel):
name: str
idea: str
@task.llm(
model=OpenAIModel(
model_name="llama3.1:8b",
provider=OpenAIProvider(
base_url="http://host.docker.internal:11434/v1"
),
),
result_type=BlogIdea,
system_prompt="""
You are an experienced content strategist tasked with generating an engaging and informative blog idea based on a given product name. Given the product name provided, produce a compelling idea for a blog post.
Return only the name and the idea.
Product Name: [Insert Product Name Here]
""",
)
def generate_blog_idea(product: dict | None = None):
if product is None:
raise AirflowSkipException("No product provided")
return f"Product Name: {product['name']}"
@task
def display_blog_ideas(blog_ideas: list[BlogIdea]):
"""Display the list of generated blog ideas."""
from pprint import pprint
ideas_list = [{"product": idea['name'], "idea": idea['idea']} for idea in blog_ideas]
pprint(ideas_list)
return ideas_list
@dag(schedule=None, start_date=pendulum.datetime(2025, 3, 1, tz="UTC"), catchup=False)
def ollama_blog_idea_generation():
products = get_products()
blog_ideas = generate_blog_idea.expand(product=products)
display_blog_ideas(blog_ideas)
ollama_blog_idea_generation_dag = ollama_blog_idea_generation()
if __name__ == "__main__":
ollama_blog_idea_generation_dag.test()