Fluid ForgeFluid Forge
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • /
    • Getting Started
    • Snowflake Quickstart
    • Vision & Roadmap
  • Walkthroughs

    • Walkthrough: Local Development
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
  • CLI Reference

    • CLI Reference
    • init Command
    • validate Command
    • plan Command
    • apply Command
    • verify Command
    • generate-airflow Command
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Agents
    • FLUID Forge Contract GPT Packet
    • Forge Copilot Discovery Guide
    • Forge Copilot Memory Guide
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

Declarative Airflow DAG Generation - The FLUID Way

Learn how FLUID transforms your data product contract into production-ready Airflow DAGs without writing orchestration code.


Overview

Apache Airflow is the industry-standard tool for orchestrating data pipelines, but creating DAGs typically requires writing hundreds of lines of Python code for each workflow. FLUID changes this by automatically generating production-ready Airflow DAGs from your data product contract.

This walkthrough shows you how to go from a YAML contract to a fully functional Airflow DAG with a single commandβ€”reducing 300+ lines of imperative Python to just one declarative statement.

What you'll learn:

  • Why manual DAG development is time-consuming and error-prone
  • How FLUID's declarative approach works
  • Step-by-step tutorial using the Bitcoin tracker example
  • Advanced patterns like multi-provider deployments
  • Customization options for schedules, environments, and DAG IDs

🎯 The Problem: Manual Airflow DAG Development

Traditional Approach (Imperative)

When building data pipelines with Airflow, you typically write 300+ lines of Python code like this:

# bitcoin_tracker_manual.py - THE OLD WAY
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator
from datetime import datetime, timedelta
import sys
import os

# Add project to path
sys.path.insert(0, os.path.dirname(__file__))
from ingest_bitcoin_prices import fetch_bitcoin_price, insert_to_bigquery

# Default arguments
default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email": ["alerts@example.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
}

# DAG definition
dag = DAG(
    dag_id="bitcoin_tracker_manual",
    default_args=default_args,
    description="Hourly Bitcoin price ingestion",
    schedule_interval="0 * * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["crypto", "bitcoin", "manual"],
)

# Task 1: Validate contract
validate = BashOperator(
    task_id="validate_contract",
    bash_command="cd /path/to/project && python3 -m fluid_build.cli validate contract.fluid.yaml",
    dag=dag,
)

# Task 2: Fetch price
def fetch_btc_task(**context):
    price_data = fetch_bitcoin_price()
    context["ti"].xcom_push(key="price_usd", value=price_data["price_usd"])
    return price_data

fetch_price = PythonOperator(
    task_id="fetch_bitcoin_price",
    python_callable=fetch_btc_task,
    provide_context=True,
    dag=dag,
)

# Task 3: Insert to BigQuery
def insert_task(**context):
    price_data = context["ti"].xcom_pull(task_ids="fetch_bitcoin_price")
    success = insert_to_bigquery(
        price_data,
        "your-project-id",
        "crypto_data",
        "bitcoin_prices"
    )
    if not success:
        raise Exception("Insert failed")

insert_price = PythonOperator(
    task_id="insert_to_bigquery",
    python_callable=insert_task,
    provide_context=True,
    dag=dag,
)

# Task 4: Run dbt
run_dbt = BashOperator(
    task_id="run_dbt_models",
    bash_command="cd /path/to/project/dbt && dbt run --profiles-dir .",
    dag=dag,
)

# Task 5: Quality check
check_quality = BigQueryCheckOperator(
    task_id="check_data_quality",
    sql="""
        SELECT COUNT(*) > 0
        FROM `your-project.crypto_data.bitcoin_prices`
        WHERE DATE(timestamp) = CURRENT_DATE()
    """,
    use_legacy_sql=False,
    dag=dag,
)

# Define dependencies
validate >> fetch_price >> insert_price >> run_dbt >> check_quality

Problems with this approach:

  • ❌ 300+ lines of boilerplate code
  • ❌ Hardcoded project paths and IDs
  • ❌ Manual dependency management
  • ❌ No single source of truth (contract vs DAG can drift)
  • ❌ Copy-paste errors across similar DAGs
  • ❌ Hard to maintain (change contract, must manually update DAG)
  • ❌ Not portable (GCP-specific, can't easily switch providers)

✨ The FLUID Solution: Declarative DAG Generation

What You Declare (One Source of Truth)

Your data product contract (contract.fluid.yaml) already contains everything needed:

fluidVersion: "0.7.1"
kind: DataProduct
id: crypto.bitcoin_prices_gcp
name: bitcoin-prices-gcp

# 1️⃣ Metadata drives DAG configuration
metadata:
  layer: Gold
  owner:
    team: data-engineering
    email: data-eng@example.com

tags:
  - crypto
  - bitcoin

# 2️⃣ Builds define the workflow
builds:
  - id: ingest_bitcoin_prices
    description: Fetch Bitcoin prices from CoinGecko API
    pattern: hybrid-reference
    engine: python
    repository: ./runtime
    properties:
      model: ingest_bitcoin_prices
    
    execution:
      trigger:
        type: manual
      runtime:
        resources:
          memory: "512Mi"
        timeout: "PT5M"
      retries:
        maxAttempts: 3
        backoffStrategy: exponential
    
    outputs:
      - bitcoin_prices_table

  - id: calculate_daily_summary
    description: Calculate daily price statistics
    engine: dbt
    repository: ./dbt
    properties:
      model: daily_price_summary
    outputs:
      - daily_price_summary

  - id: calculate_price_trends
    description: Calculate moving averages
    engine: dbt
    repository: ./dbt
    properties:
      model: price_trends
    outputs:
      - price_trends

# 3️⃣ Exposes define datasets and bindings
exposes:
  - exposeId: bitcoin_prices_table
    kind: table
    title: Bitcoin Prices Table
    
    binding:
      platform: gcp
      format: bigquery_table
      location:
        project: <<YOUR_PROJECT_HERE>>
        dataset: crypto_data
        table: bitcoin_prices
        region: us-central1

What FLUID Generates (Automatically)

One command:

fluid generate-airflow contract.fluid.yaml \
  -o dags/bitcoin_tracker.py \
  --dag-id bitcoin_tracker \
  --schedule "0 * * * *"

Output: Production-ready Airflow DAG

"""
Airflow DAG for FLUID Data Product: bitcoin-prices-gcp

Auto-generated from FLUID contract v0.7.1
Generated at: 2026-01-21T12:00:00

Domain: crypto
Description: Bitcoin price tracking data product
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

# DAG configuration (from contract metadata)
default_args = {
    'owner': 'fluid',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,  # ← From contract.builds[].execution.retries
    'retry_delay': timedelta(minutes=5),
}

# DAG definition
dag = DAG(
    dag_id="bitcoin_tracker",
    description="""Bitcoin price tracking data product""",
    schedule_interval="0 * * * *",  # ← From --schedule argument
    start_date=days_ago(1),
    catchup=False,
    tags=["fluid", "crypto", "bitcoin"],  # ← From contract.tags
    default_args=default_args
)

# Provision dataset: bitcoin_prices_table (from exposes[0])
provision_bitcoin_prices_table = BashOperator(
    task_id="provision_bitcoin_prices_table",
    bash_command="bq mk --project_id=<<YOUR_PROJECT_HERE>> --dataset crypto_data || true",
    dag=dag
)

# Schedule task: ingest_bitcoin_prices (from builds[0])
schedule_ingest_bitcoin_prices = BashOperator(
    task_id="schedule_ingest_bitcoin_prices",
    bash_command="python3 runtime/ingest_bitcoin_prices.py",
    dag=dag
)

# Schedule task: calculate_daily_summary (from builds[1])
schedule_calculate_daily_summary = BashOperator(
    task_id="schedule_calculate_daily_summary",
    bash_command="dbt run --models daily_price_summary",
    dag=dag
)

# Schedule task: calculate_price_trends (from builds[2])
schedule_calculate_price_trends = BashOperator(
    task_id="schedule_calculate_price_trends",
    bash_command="dbt run --models price_trends",
    dag=dag
)

# Task dependencies (inferred from builds.outputs)
provision_bitcoin_prices_table >> schedule_ingest_bitcoin_prices
schedule_ingest_bitcoin_prices >> schedule_calculate_daily_summary
schedule_ingest_bitcoin_prices >> schedule_calculate_price_trends

πŸ“Š Side-by-Side Comparison

Here's how the traditional manual approach compares to FLUID's declarative generation across key development aspects:

AspectManual ApproachFLUID Declarative
Lines of Code300+ lines Python1 command
MaintenanceUpdate DAG + ContractUpdate contract only
PortabilityGCP-specific codeProvider-agnostic contract
ConsistencyCan drift from contractContract IS source of truth
TestingTest Python codeTest YAML contract
OnboardingLearn Airflow + PythonLearn FLUID contracts
Multi-envCopy-paste DAGSame contract, diff env
Provider switchRewrite operatorsChange binding.platform

πŸš€ Step-by-Step: Bitcoin Tracker with Declarative Airflow

Now let's put this into practice. We'll use the Bitcoin price tracker example to demonstrate how FLUID generates Airflow DAGs from contracts. This tutorial takes about 10-15 minutes and assumes you have a working FLUID installation.

What We'll Build

We'll generate an Airflow DAG that:

  • Provisions BigQuery datasets and tables
  • Runs Bitcoin price ingestion (Python)
  • Executes dbt transformations
  • Manages task dependencies automatically
  • Includes retry logic and scheduling

Prerequisites

# 1. Ensure FLUID CLI is available
fluid --version  # Should show v0.7.1 or higher

# 2. Navigate to example
cd examples/bitcoin-tracker

# 3. Validate contract
fluid validate contract.fluid.yaml

Step 1: Generate Airflow DAG (Declarative Way)

# Generate production-ready DAG from contract
fluid generate-airflow contract.fluid.yaml \
  -o airflow/dags/bitcoin_tracker_declarative.py \
  --dag-id bitcoin_tracker_declarative \
  --schedule "0 * * * *" \
  --verbose

Output:

βœ“ Loading contract from contract.fluid.yaml
βœ“ Generating Airflow DAG...
βœ“ DAG written to: airflow/dags/bitcoin_tracker_declarative.py
  Contract ID: crypto.bitcoin_prices_gcp
  DAG ID: bitcoin_tracker_declarative
  Schedule: 0 * * * *

What just happened?

  1. βœ… FLUID read your contract
  2. βœ… Parsed builds, exposes, and execution config
  3. βœ… Generated provider-specific commands (GCP BigQuery)
  4. βœ… Created task dependencies from outputs
  5. βœ… Applied retry/schedule configuration
  6. βœ… Wrote production-ready Python DAG

Step 2: Review Generated DAG

# View the generated DAG
cat airflow/dags/bitcoin_tracker_declarative.py

Notice:

  • βœ… Tasks match your builds array
  • βœ… Dataset provisioning from exposes bindings
  • βœ… Retry config from execution.retries
  • βœ… Schedule from command argument
  • βœ… Tags from contract metadata

Step 3: Test DAG Syntax

# Validate DAG syntax without running
python3 airflow/dags/bitcoin_tracker_declarative.py
echo $?  # Should be 0 (success)

Step 4: Deploy to Airflow

Option A: Local Airflow

# Set Airflow home
export AIRFLOW_HOME=$PWD/airflow

# Initialize Airflow
airflow db init

# Create admin user
airflow users create \
  --username admin \
  --password admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

# Start webserver (terminal 1)
airflow webserver --port 8080

# Start scheduler (terminal 2)
airflow scheduler

Access UI: http://localhost:8080 (admin/admin)

Option B: Cloud Composer (GCP)

# Create Composer environment
gcloud composer environments create bitcoin-tracker \
  --location us-central1 \
  --image-version composer-2.6.0-airflow-2.6.3

# Upload DAG
BUCKET=$(gcloud composer environments describe bitcoin-tracker \
  --location us-central1 \
  --format="get(config.dagGcsPrefix)")

gsutil cp airflow/dags/bitcoin_tracker_declarative.py $BUCKET/dags/

πŸ”„ The Declarative Workflow

The real power of FLUID's declarative approach becomes clear when you need to make changes. Instead of manually editing DAG code, you simply update your contract and regenerate. Here's the three-step workflow:

1. Define Once (Contract)

# contract.fluid.yaml
builds:
  - id: new_transformation
    engine: dbt
    repository: ./dbt
    properties:
      model: price_volatility
    outputs:
      - price_volatility_view

2. Regenerate DAG

# Regenerate from updated contract
fluid generate-airflow contract.fluid.yaml \
  -o airflow/dags/bitcoin_tracker_declarative.py \
  --schedule "0 * * * *"

3. Deploy

# DAG automatically includes new task!
# No manual code changes needed

🎨 Advanced: Multi-Provider Example

One of FLUID's most powerful features is provider portability. The same contract can deploy to different cloud platforms by simply changing the binding.platform field. This makes migrations and multi-cloud strategies straightforward.

Let's look at migrating our Bitcoin tracker from GCP to Snowflake:

GCP β†’ Snowflake Migration

Before (GCP):

exposes:
  - exposeId: bitcoin_prices
    binding:
      platform: gcp
      format: bigquery_table
      location:
        project: my-gcp-project
        dataset: crypto_data

After (Snowflake):

exposes:
  - exposeId: bitcoin_prices
    binding:
      platform: snowflake
      format: snowflake_table
      location:
        account: xy12345.us-east-1
        database: CRYPTO_DB
        schema: PROD

Regenerate:

fluid generate-airflow contract.fluid.yaml -o dags/bitcoin_tracker.py

Result: DAG automatically uses Snowflake operators instead of BigQuery! πŸŽ‰


βœ… Why Use Declarative DAG Generation?

FLUID's declarative approach offers several compelling advantages over manual DAG development:

1. Single Source of Truth

  • Your contract defines both infrastructure and orchestration
  • No drift between documentation, DAGs, and actual deployments
  • Version control one file instead of many

2. Drastically Improved Maintainability

  • Update contract β†’ regenerate DAG
  • No manual synchronization
  • Version control the contract, not generated code

3. Testability

# Test contract before generating DAG
fluid validate contract.fluid.yaml

# Generate and test DAG
fluid generate-airflow contract.fluid.yaml -o test_dag.py
python3 test_dag.py  # Syntax check

4. Portability

  • Same contract works on GCP, AWS, Snowflake
  • Just change binding.platform
  • Regenerate DAG with provider-specific operators

5. Consistency

  • All data products follow same DAG structure
  • Standardized retry/schedule patterns
  • Centralized configuration management

6. Developer Experience

# Before: Write 300 lines of Python
# After: Run 1 command
fluid generate-airflow contract.yaml -o dag.py

πŸ”§ Customization Options

While FLUID generates DAGs declaratively from your contract, you can customize the output for different environments and use cases using command-line flags:

Override Schedule

# Different schedules for different environments
fluid generate-airflow contract.yaml -o dag.py --schedule "*/15 * * * *"  # Every 15 min
fluid generate-airflow contract.yaml -o dag.py --schedule "0 2 * * *"      # Daily at 2 AM

Override DAG ID

# Environment-specific DAG IDs
fluid generate-airflow contract.yaml \
  -o dags/bitcoin_prod.py \
  --dag-id bitcoin_tracker_prod

fluid generate-airflow contract.yaml \
  -o dags/bitcoin_dev.py \
  --dag-id bitcoin_tracker_dev

Use Environment Overlays

# Generate from dev environment
fluid generate-airflow contract.yaml \
  -o dags/bitcoin_dev.py \
  --env dev

# Generate from prod environment
fluid generate-airflow contract.yaml \
  -o dags/bitcoin_prod.py \
  --env prod

πŸ“š Additional Resources

FLUID Documentation

  • CLI Reference - Complete command reference
  • GCP Deployment Guide - Deploy to Google Cloud Platform
  • Local Development Guide - Test with DuckDB locally
  • Advanced Topics - Airflow integration details

Airflow Resources

  • Apache Airflow Docs
  • GCP Cloud Composer

Examples

  • Bitcoin Tracker Example - Working code with this walkthrough
  • Netflix Preferences Example - Local development example

πŸŽ“ Summary

You've learned how FLUID's declarative approach transforms Airflow DAG development from a manual, code-heavy process into a simple, contract-driven workflow.

Traditional Airflow Development (The Old Way):

  1. Write data product contract
  2. Write Python DAG manually (300+ lines)
  3. Test DAG syntax and logic
  4. Deploy DAG to Airflow
  5. Maintain both contract and DAG in sync

FLUID Declarative Approach (The New Way):

  1. Write data product contract βœ…
  2. Run fluid generate-airflow contract.yaml -o dag.py βœ…
  3. Deploy generated DAG βœ…
  4. Maintain only the contract βœ…

Key Takeaways:

  • 90% less code - One command replaces 300+ lines of Python
  • 100% consistency - Contract is the single source of truth
  • Infinite portability - Same contract works on GCP, AWS, Snowflake
  • Zero drift - DAG always matches contract when regenerated
  • Faster development - Minutes instead of hours per pipeline

πŸš€ Ready to go declarative?

cd examples/bitcoin-tracker
fluid generate-airflow contract.fluid.yaml -o airflow/dags/my_dag.py --schedule "0 * * * *"

Welcome to the declarative future of data orchestration! πŸŽ‰

Edit this page on GitHub
Last Updated: 3/12/26, 1:03 PM
Contributors: khanya_ai
Prev
Walkthrough: Snowflake Team Collaboration
Next
Generating Orchestration Code from Contracts