Fluid ForgeFluid Forge
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
Home
Get Started
  • Local (DuckDB)
  • GCP (BigQuery)
  • Snowflake Team Collaboration
  • Declarative Airflow
  • Orchestration Export
  • Jenkins CI/CD
  • Universal Pipeline
CLI Reference
  • Overview
  • Architecture
  • GCP (BigQuery)
  • AWS (S3 + Athena)
  • Snowflake
  • Local (DuckDB)
  • Custom Providers
  • Roadmap
GitHub
GitHub
  • Introduction

    • /
    • Getting Started
    • Snowflake Quickstart
    • Vision & Roadmap
  • Walkthroughs

    • Walkthrough: Local Development
    • Walkthrough: Deploy to Google Cloud Platform
    • Walkthrough: Snowflake Team Collaboration
    • Declarative Airflow DAG Generation - The FLUID Way
    • Generating Orchestration Code from Contracts
    • Jenkins CI/CD for FLUID Data Products
    • Universal Pipeline
  • CLI Reference

    • CLI Reference
    • init Command
    • validate Command
    • plan Command
    • apply Command
    • verify Command
    • generate-airflow Command
  • Providers

    • Providers
    • Provider Architecture
    • GCP Provider
    • AWS Provider
    • Snowflake Provider
    • Local Provider
    • Creating Custom Providers
    • Provider Roadmap
  • Advanced

    • Blueprints
    • Governance & Compliance
    • Airflow Integration
    • Built-in And Custom Forge Agents
    • FLUID Forge Contract GPT Packet
    • Forge Copilot Discovery Guide
    • Forge Copilot Memory Guide
  • Project

    • Contributing to Fluid Forge
    • Fluid Forge v0.7.1 - Multi-Provider Export Release

Generating Orchestration Code from Contracts

Version: 0.7.1
Status: ✅ Production Ready


Overview

Fluid Forge transforms your declarative contracts into production-ready orchestration code for three engines: Airflow, Dagster, and Prefect.

Why Generate DAGs?

  • 🚀 Fast Deployment - Generate 100+ lines of orchestration code in <3ms
  • ☁️ Multi-Cloud - Support for AWS, GCP, and Snowflake
  • ✅ Validated - Contract validation with circular dependency detection
  • 📦 Production-Ready - Error handling, retries, logging built-in
  • 🔄 Multi-Engine - Airflow, Dagster, and Prefect all available via CLI

Quick Start

1. Create a Contract

# crypto-analytics.fluid.yaml
fluidVersion: "0.7.1"
kind: DataProduct
id: crypto.bitcoin_analytics
name: bitcoin-analytics

metadata:
  owner: data-engineering
  description: Bitcoin price tracking and analytics

orchestration:
  schedule: "@hourly"
  tasks:
    - taskId: fetch_prices
      action: bigquery_query
      config:
        query: "SELECT * FROM crypto.raw_prices WHERE timestamp > CURRENT_TIMESTAMP() - INTERVAL 1 HOUR"
    
    - taskId: calculate_metrics
      action: bigquery_query
      dependsOn: [fetch_prices]
      config:
        query: "INSERT INTO crypto.hourly_metrics SELECT price_timestamp, AVG(price_usd) as avg_price..."

2. Generate Airflow DAG

# Generate Airflow DAG
fluid generate-airflow crypto-analytics.fluid.yaml -o dags/crypto_bitcoin_analytics.py

# With verbose output
fluid generate-airflow crypto-analytics.fluid.yaml -o dags/pipeline.py --verbose

3. Deploy to Airflow

# Copy to Airflow DAGs folder
cp dags/crypto_bitcoin_analytics.py $AIRFLOW_HOME/dags/

# Or for Cloud Composer (GCP)
gsutil cp dags/crypto_bitcoin_analytics.py gs://your-composer-bucket/dags/

# Or for MWAA (AWS)
aws s3 cp dags/crypto_bitcoin_analytics.py s3://your-mwaa-bucket/dags/

Provider Examples

GCP + BigQuery

Contract:

fluidVersion: "0.7.1"
kind: DataProduct
id: gcp.customer_analytics
name: customer-analytics

platform:
  provider: gcp
  project: my-project-id
  region: us-central1

orchestration:
  schedule: "@daily"
  tasks:
    - taskId: create_dataset
      action: create_bigquery_dataset
      config:
        dataset: analytics
        location: US
    
    - taskId: create_table
      action: create_bigquery_table
      dependsOn: [create_dataset]
      config:
        dataset: analytics
        table: customers
        schema:
          - name: customer_id
            type: INTEGER
          - name: name
            type: STRING
    
    - taskId: load_data
      action: bigquery_query
      dependsOn: [create_table]
      config:
        query: |
          INSERT INTO analytics.customers
          SELECT * FROM raw.customer_data
          WHERE date = CURRENT_DATE()

Generate Airflow DAG:

fluid generate-airflow gcp-analytics.yaml -o dags/gcp_customer_analytics.py

Generated Airflow DAG:

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateEmptyTableOperator,
    BigQueryInsertJobOperator
)
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='gcp_customer_analytics',
    default_args=default_args,
    description='Customer analytics pipeline',
    schedule_interval='@daily',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['analytics', 'customers']
) as dag:
    
    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id='create_dataset',
        dataset_id='analytics',
        location='US',
        project_id='my-project-id'
    )
    
    create_table = BigQueryCreateEmptyTableOperator(
        task_id='create_table',
        dataset_id='analytics',
        table_id='customers',
        schema_fields=[
            {'name': 'customer_id', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}
        ],
        project_id='my-project-id'
    )
    
    load_data = BigQueryInsertJobOperator(
        task_id='load_data',
        configuration={
            'query': {
                'query': """
                    INSERT INTO analytics.customers
                    SELECT * FROM raw.customer_data
                    WHERE date = CURRENT_DATE()
                """,
                'useLegacySql': False
            }
        },
        project_id='my-project-id'
    )
    
    create_dataset >> create_table >> load_data

AWS + S3 + Glue (Dagster Example)

Contract:

fluidVersion: "0.7.1"
kind: DataProduct
id: aws.sales_analytics
name: sales-analytics

platform:
  provider: aws
  account_id: "123456789012"
  region: us-east-1

orchestration:
  schedule: "0 */6 * * *"  # Every 6 hours
  tasks:
    - taskId: create_bucket
      action: create_s3_bucket
      config:
        bucket: sales-analytics-data
        region: us-east-1
    
    - taskId: create_database
      action: create_glue_database
      dependsOn: [create_bucket]
      config:
        database: sales
    
    - taskId: create_table
      action: create_glue_table
      dependsOn: [create_database]
      config:
        database: sales
        table: transactions
        location: s3://sales-analytics-data/transactions/
        format: PARQUET

Generate Dagster Pipeline:

fluid export aws-sales.yaml --engine dagster -o pipelines/

Generated Dagster Code:

from dagster import op, job, resource, In, Out
import boto3

@resource
def aws_s3_client(context):
    return boto3.client('s3', region_name='us-east-1')

@resource
def aws_glue_client(context):
    return boto3.client('glue', region_name='us-east-1')

@op(required_resource_keys={'s3_client'})
def create_bucket(context):
    s3 = context.resources.s3_client
    bucket_name = 'sales-analytics-data'
    
    try:
        s3.create_bucket(Bucket=bucket_name)
        context.log.info(f"Created S3 bucket: {bucket_name}")
        return bucket_name
    except Exception as e:
        context.log.error(f"Error creating bucket: {e}")
        raise

@op(required_resource_keys={'glue_client'}, ins={'bucket': In()})
def create_database(context, bucket):
    glue = context.resources.glue_client
    
    try:
        glue.create_database(
            DatabaseInput={
                'Name': 'sales',
                'Description': 'Sales analytics database'
            }
        )
        context.log.info("Created Glue database: sales")
        return 'sales'
    except Exception as e:
        context.log.error(f"Error creating database: {e}")
        raise

@op(required_resource_keys={'glue_client'}, ins={'database': In()})
def create_table(context, database):
    glue = context.resources.glue_client
    
    try:
        glue.create_table(
            DatabaseName=database,
            TableInput={
                'Name': 'transactions',
                'StorageDescriptor': {
                    'Location': 's3://sales-analytics-data/transactions/',
                    'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
                    'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
                }
            }
        )
        context.log.info("Created Glue table: transactions")
        return 'transactions'
    except Exception as e:
        context.log.error(f"Error creating table: {e}")
        raise

@job(resource_defs={
    's3_client': aws_s3_client,
    'glue_client': aws_glue_client
})
def aws_sales_analytics():
    database = create_database(create_bucket())
    create_table(database)

Snowflake + Data Warehousing (Prefect Example)

Contract:

fluidVersion: "0.7.1"
kind: DataProduct
id: snowflake.inventory_analytics
name: inventory-analytics

platform:
  provider: snowflake
  account: xy12345.us-east-1
  warehouse: COMPUTE_WH
  database: ANALYTICS

orchestration:
  schedule: "@hourly"
  tasks:
    - taskId: create_database
      action: create_database
      config:
        database: ANALYTICS
    
    - taskId: create_schema
      action: create_schema
      dependsOn: [create_database]
      config:
        schema: INVENTORY
    
    - taskId: create_table
      action: create_table
      dependsOn: [create_schema]
      config:
        table: INVENTORY.STOCK_LEVELS
        columns:
          - product_id: NUMBER
          - quantity: NUMBER
          - last_updated: TIMESTAMP
    
    - taskId: load_data
      action: run_query
      dependsOn: [create_table]
      config:
        query: |
          INSERT INTO INVENTORY.STOCK_LEVELS
          SELECT product_id, SUM(quantity), CURRENT_TIMESTAMP()
          FROM RAW.INVENTORY_UPDATES
          WHERE update_time > DATEADD(hour, -1, CURRENT_TIMESTAMP())
          GROUP BY product_id

Generate Prefect Flow:

fluid export snowflake-inventory.yaml --engine prefect -o flows/

Generated Prefect Code:

from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
import snowflake.connector

def get_snowflake_connection():
    return snowflake.connector.connect(
        account='xy12345.us-east-1',
        user='...',
        password='...',
        warehouse='COMPUTE_WH',
        database='ANALYTICS'
    )

@task(retries=3, retry_delay_seconds=300)
def create_database():
    conn = get_snowflake_connection()
    cursor = conn.cursor()
    
    try:
        cursor.execute('CREATE DATABASE IF NOT EXISTS ANALYTICS')
        print("Created database: ANALYTICS")
    finally:
        cursor.close()
        conn.close()

@task(retries=3, retry_delay_seconds=300)
def create_schema():
    conn = get_snowflake_connection()
    cursor = conn.cursor()
    
    try:
        cursor.execute('CREATE SCHEMA IF NOT EXISTS INVENTORY')
        print("Created schema: INVENTORY")
    finally:
        cursor.close()
        conn.close()

@task(retries=3, retry_delay_seconds=300)
def create_table():
    conn = get_snowflake_connection()
    cursor = conn.cursor()
    
    try:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS INVENTORY.STOCK_LEVELS (
                product_id NUMBER,
                quantity NUMBER,
                last_updated TIMESTAMP
            )
        """)
        print("Created table: INVENTORY.STOCK_LEVELS")
    finally:
        cursor.close()
        conn.close()

@task(retries=3, retry_delay_seconds=300)
def load_data():
    conn = get_snowflake_connection()
    cursor = conn.cursor()
    
    try:
        cursor.execute("""
            INSERT INTO INVENTORY.STOCK_LEVELS
            SELECT product_id, SUM(quantity), CURRENT_TIMESTAMP()
            FROM RAW.INVENTORY_UPDATES
            WHERE update_time > DATEADD(hour, -1, CURRENT_TIMESTAMP())
            GROUP BY product_id
        """)
        print(f"Loaded {cursor.rowcount} rows")
    finally:
        cursor.close()
        conn.close()

@flow(name='snowflake_inventory_analytics')
def main():
    create_database()
    create_schema()
    create_table()
    load_data()

# Create deployment
if __name__ == '__main__':
    deployment = Deployment.build_from_flow(
        flow=main,
        name='inventory-analytics-deployment',
        schedule=CronSchedule(cron='0 * * * *'),  # @hourly
        work_queue_name='default'
    )
    deployment.apply()

Engine Comparison

All Engines Available

  • Airflow: fluid generate-airflow or fluid export --engine airflow
  • Dagster: fluid export --engine dagster
  • Prefect: fluid export --engine prefect

| Feature | Airflow | Dagster | Prefect | |---------|---------|---------|---------|| | CLI Availability | ✅ Available | ✅ Available | ✅ Available | | Ease of Use | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | | Type Safety | ❌ | ✅ | ✅ | | Resource Management | Manual | Built-in | Built-in | | Testing | Limited | Excellent | Good | | UI Quality | Good | Excellent | Excellent | | Community | Largest | Growing | Growing | | Cloud Hosting | Cloud Composer (GCP) | Dagster Cloud | Prefect Cloud | | Best For | Traditional ETL | Data engineering teams | Modern data workflows |

Generation Performance (Benchmarked)

All three engines are available in the CLI.

ProviderAirflowDagsterPrefect
AWS2.05ms0.38ms0.32ms
GCP1.83ms0.34ms1.91ms
Snowflake2.08ms0.35ms0.33ms

Output Size (Small Contract)

ProviderAirflowDagsterPrefect
AWS1.91KB3.98KB3.84KB
GCP2.10KB2.43KB2.29KB
Snowflake1.83KB1.72KB2.52KB

Advanced Features

Contract Validation

All exports include automatic validation:

# Invalid contract (circular dependency)
fluid export bad-contract.yaml --engine airflow

# Output:
# ❌ Export failed: Circular dependencies detected in tasks: task_a, task_b

Validation Checks:

  • ✅ Orchestration section present
  • ✅ Non-empty task list
  • ✅ Unique task IDs
  • ✅ Valid task dependencies
  • ✅ No circular dependencies

Schedule Conversion

Fluid Forge automatically converts schedule expressions:

Fluid ScheduleAirflowDagsterPrefect
@hourly@hourly"0 * * * *"CronSchedule(cron="0 * * * *")
@daily@daily"0 0 * * *"CronSchedule(cron="0 0 * * *")
0 */6 * * *0 */6 * * *"0 */6 * * *"CronSchedule(cron="0 */6 * * *")

Custom Configuration

Inject custom settings into generated code:

orchestration:
  schedule: "@daily"
  config:
    # Airflow-specific
    airflow:
      retries: 5
      retry_delay_minutes: 10
      email_on_failure: true
      email: ["ops@company.com"]
    
    # Dagster-specific
    dagster:
      max_runtime_seconds: 3600
      
    # Prefect-specific
    prefect:
      timeout_seconds: 7200
      tags: ["production", "critical"]

Best Practices

1. Version Control Your Contracts

git add contracts/
git commit -m "Add customer analytics contract"

# Regenerate when contract changes
fluid export contracts/customer-analytics.yaml --engine airflow -o dags/

2. Test Generated Code

# Python syntax check
python -m py_compile dags/customer_analytics.py

# Airflow validation
airflow dags test customer_analytics 2026-01-30

# Dagster validation
dagster pipeline execute -f pipelines/customer_analytics.py

3. Use CI/CD

# .github/workflows/generate-dags.yml
name: Generate Orchestration Code

on:
  push:
    paths:
      - 'contracts/**'

jobs:
  generate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Install Fluid Forge
        run: pip install fluid-forge
      
      - name: Generate Airflow DAGs
        run: |
          fluid export contracts/*.yaml --engine airflow -o dags/
      
      - name: Commit generated code
        run: |
          git add dags/
          git commit -m "Regenerate DAGs from contracts"
          git push

4. Monitor Generated Pipelines

All generated code includes logging:

# Airflow
context.log.info("Processing task...")

# Dagster
context.log.info("Processing op...")

# Prefect
print("Processing task...")  # Captured by Prefect

Troubleshooting

Export Fails

Error: ProviderError: Invalid contract: Contract missing 'orchestration'

Solution: Add orchestration section:

orchestration:
  schedule: "@daily"
  tasks: []

Error: Circular dependencies detected in tasks: task_a, task_b

Solution: Fix dependency graph:

# Bad (circular)
tasks:
  - taskId: task_a
    dependsOn: [task_b]
  - taskId: task_b
    dependsOn: [task_a]

# Good (linear)
tasks:
  - taskId: task_a
  - taskId: task_b
    dependsOn: [task_a]

Generated Code Errors

Error: SyntaxError in generated DAG

Solution: Update to latest Fluid Forge version:

pip install --upgrade fluid-forge

Error: ImportError: No module named 'airflow.providers...'

Solution: Install required provider packages:

pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-snowflake

Next Steps

  • Airflow DAG Deployment Guide
  • GCP Integration
  • CI/CD Setup
  • Provider Roadmap

Questions? Open an issue on GitHub

Edit this page on GitHub
Last Updated: 4/4/26, 3:15 PM
Contributors: khanya_ai, fas89
Prev
Declarative Airflow DAG Generation - The FLUID Way
Next
Jenkins CI/CD for FLUID Data Products