Fluid ForgeFluid Forge
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • /
    • Getting Started
    • Snowflake Quickstart
    • Vision & Roadmap
  • Walkthroughs

    • Walkthrough: Local Development
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
  • CLI Reference

    • CLI Reference
    • init Command
    • validate Command
    • plan Command
    • apply Command
    • verify Command
    • generate-airflow Command
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Agents
    • FLUID Forge Contract GPT Packet
    • Forge Copilot Discovery Guide
    • Forge Copilot Memory Guide
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

generate-airflow Command

Generate Airflow DAG Python code from a Fluid Forge contract.

Available Now

This command is fully available in FLUID v0.7.1 for AWS, GCP, and Snowflake providers.

Syntax

fluid generate-airflow <contract-file> [options]

Options

OptionDescriptionDefault
-o, --output <file>Output file path for generated DAGdags/<contract-name>.py
--env <environment>Environment name (dev/staging/prod)dev
--verboseShow detailed generation logsfalse

What It Generates

The command creates a production-ready Airflow DAG with:

  • ✅ Task orchestration - Proper task dependencies and sequencing
  • ✅ Error handling - Retry logic and failure callbacks
  • ✅ Resource management - Connection pooling and cleanup
  • ✅ Monitoring - Task duration tracking and logging
  • ✅ Idempotency - Safe to re-run without side effects
  • ✅ Type hints - Full Python typing for IDE support

Supported Providers

ProviderStatusExample
AWS✅ AvailableBigQuery, S3, Redshift tasks
GCP✅ AvailableBigQuery, GCS, Dataflow tasks
Snowflake✅ AvailableWarehouse, table operations

Examples

Basic Generation

Generate DAG for a GCP contract:

fluid generate-airflow gcp-analytics.yaml -o dags/gcp_pipeline.py

Output:

✅ Validated contract: gcp-analytics.yaml
✅ Generated Airflow DAG: dags/gcp_pipeline.py
⏱️  Generation time: 1.8ms
📊 Tasks created: 5 (3 BigQuery + 2 GCS)

With Environment Specification

Generate for production environment:

fluid generate-airflow contract.yaml \
  --output dags/prod_pipeline.py \
  --env prod \
  --verbose

Verbose Output:

[INFO] Loading contract: contract.yaml
[INFO] Validating schema and dependencies
[INFO] Detected provider: gcp
[INFO] Generating Airflow DAG for environment: prod
[INFO] Creating task graph with 8 tasks
[INFO] Adding error handling and retries
[INFO] Writing DAG to: dags/prod_pipeline.py
✅ Complete in 2.1ms

AWS Example

Generate DAG for AWS data pipeline:

fluid generate-airflow aws-etl.yaml -o dags/aws_etl.py

Generated DAG includes:

  • S3 file operations
  • Redshift data loading
  • Lambda function triggers
  • SNS notifications

Snowflake Example

Generate DAG for Snowflake data warehouse:

fluid generate-airflow snowflake-dwh.yaml -o dags/snowflake_pipeline.py

Generated DAG includes:

  • Warehouse management
  • Table creation/updates
  • SQL transformations
  • Query monitoring

Generated DAG Structure

Imports and Configuration

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'fluid-forge',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

DAG Definition

with DAG(
    dag_id='gcp_customer_analytics',
    default_args=default_args,
    description='Customer analytics pipeline for GCP',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['gcp', 'analytics', 'customer'],
) as dag:

Tasks with Dependencies

    # Task 1: Load raw data
    load_raw = GCSToBigQueryOperator(
        task_id='load_raw_customer_data',
        bucket='analytics-bucket',
        source_objects=['raw/customers/*.parquet'],
        destination_project_dataset_table='analytics.raw_customers',
        write_disposition='WRITE_TRUNCATE',
    )
    
    # Task 2: Transform data
    transform = BigQueryExecuteQueryOperator(
        task_id='transform_customer_metrics',
        sql='sql/transform_customers.sql',
        use_legacy_sql=False,
        destination_dataset_table='analytics.customer_metrics',
        write_disposition='WRITE_TRUNCATE',
    )
    
    # Task 3: Export to GCS
    export = BigQueryToGCSOperator(
        task_id='export_customer_reports',
        source_project_dataset_table='analytics.customer_metrics',
        destination_cloud_storage_uris=['gs://reports/customers-{{ ds }}.csv'],
        export_format='CSV',
    )
    
    # Set dependencies
    load_raw >> transform >> export

Performance Benchmarks

ProviderAvg Generation TimeLines of CodeTasks (Typical)
GCP1.83ms280-3504-8
AWS2.08ms320-4005-10
Snowflake1.91ms250-3003-6

Production Ready

All generated DAGs include production best practices:

  • Proper error handling and retries
  • Task timeout configuration
  • SLA monitoring
  • Resource cleanup
  • Logging and observability

Contract Requirements

Minimal Contract

fluidVersion: "0.7.1"
kind: Contract
id: my-pipeline
name: "My Data Pipeline"

metadata:
  provider: gcp
  
exposes:
  - id: customer_data
    name: "Customer Analytics"
    location:
      type: bigquery
      properties:
        dataset: analytics
        table: customers

Full-Featured Contract

fluidVersion: "0.7.1"
kind: Contract
id: advanced-pipeline
name: "Advanced Analytics Pipeline"

metadata:
  provider: gcp
  environment: prod
  owner: data-team
  tags: [analytics, ml, customer]

dependencies:
  - id: raw-data
    source: gs://raw-bucket/data/*.parquet
  
exposes:
  - id: transformed_customers
    name: "Customer Metrics"
    location:
      type: bigquery
      properties:
        dataset: analytics
        table: customer_metrics
    schema:
      - {name: customer_id, type: STRING}
      - {name: total_value, type: FLOAT64}
      - {name: last_purchase, type: TIMESTAMP}
    transforms:
      - type: sql
        path: sql/customer_aggregations.sql

schedule:
  cron: "0 2 * * *"
  timezone: "America/New_York"
  
monitoring:
  sla_seconds: 3600
  alert_email: data-ops@company.com

Common Use Cases

1. Simple ETL Pipeline

Scenario: Load data from GCS to BigQuery daily

fluid generate-airflow gcs-to-bq.yaml -o dags/daily_load.py

2. Multi-Stage Transformation

Scenario: Extract → Transform → Load with multiple steps

fluid generate-airflow etl-pipeline.yaml \
  --output dags/etl_full.py \
  --env prod

3. Cross-Provider Pipeline

Scenario: AWS S3 → Processing → Snowflake

fluid generate-airflow s3-to-snowflake.yaml -o dags/cross_cloud.py

Troubleshooting

Issue: Import errors in generated DAG

Cause: Missing Airflow providers

Solution:

pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-snowflake

Issue: Connection not found

Cause: Airflow connections not configured

Solution:

# Set up GCP connection
airflow connections add 'google_cloud_default' \
  --conn-type 'google_cloud_platform' \
  --conn-extra '{"key_path": "/path/to/service-account.json"}'

Issue: Tasks failing immediately

Cause: Invalid contract configuration

Solution:

# Validate contract first
fluid validate contract.yaml --verbose

# Check provider configuration
fluid validate contract.yaml --provider gcp

Integration with Airflow

1. Copy Generated DAG

# Generate DAG
fluid generate-airflow contract.yaml -o my_pipeline.py

# Copy to Airflow DAGs folder
cp my_pipeline.py $AIRFLOW_HOME/dags/

2. Verify DAG

# Check DAG syntax
python $AIRFLOW_HOME/dags/my_pipeline.py

# List DAGs
airflow dags list | grep my_pipeline

# Test DAG
airflow dags test my_pipeline 2026-01-30

3. Enable and Monitor

# Unpause DAG
airflow dags unpause my_pipeline

# Trigger manual run
airflow dags trigger my_pipeline

# Check status
airflow dags state my_pipeline

Next Steps

  • 📖 See Airflow Walkthrough for detailed examples
  • 🔧 Learn about contract validation
  • 📊 Explore GCP provider features
  • 🚀 Check out AWS provider capabilities

Multi-Engine Export

The fluid export command supports all three engines:

# Airflow (also available via generate-airflow)
fluid export contract.yaml --engine airflow -o dags/

# Dagster
fluid export contract.yaml --engine dagster -o pipelines/

# Prefect
fluid export contract.yaml --engine prefect -o flows/

See Also

  • validate command - Validate contracts before generation
  • plan command - Preview infrastructure changes
  • GCP Provider - GCP-specific features
  • AWS Provider - AWS integration
  • Snowflake Provider - Data warehouse pipelines
Edit this page on GitHub
Last Updated: 3/12/26, 1:03 PM
Contributors: khanya_ai
Prev
verify Command