Airflow Dag Patterns

Build production-ready Apache Airflow DAGs with battle-tested patterns

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.

apache-airflow data-engineering etl workflow-orchestration dag-patterns data-pipelines batch-processing task-scheduling
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

Help me create an Airflow DAG that runs daily at 6 AM to extract customer data from S3, clean it, and load it to PostgreSQL with proper error handling

Skill Processing

Analyzing request...

Agent Response

Complete DAG implementation with TaskFlow API, retry logic, and monitoring

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install airflow-dag-patterns

claude-code skill install airflow-dag-patterns
2

Config

3

First Trigger

@airflow-dag-patterns help

Commands

CommandDescriptionRequired Args
@airflow-dag-patterns creating-a-daily-etl-pipelineBuild a production ETL pipeline that extracts data from S3, transforms it with pandas, and loads to a data warehouseNone
@airflow-dag-patterns setting-up-dynamic-dagsGenerate multiple similar DAGs from configuration to process different data sourcesNone
@airflow-dag-patterns implementing-complex-dependenciesDesign DAGs with branching logic, sensors, and external task dependenciesNone

Typical Use Cases

Creating a Daily ETL Pipeline

Build a production ETL pipeline that extracts data from S3, transforms it with pandas, and loads to a data warehouse

Setting Up Dynamic DAGs

Generate multiple similar DAGs from configuration to process different data sources

Implementing Complex Dependencies

Design DAGs with branching logic, sensors, and external task dependencies

Overview

Apache Airflow DAG Patterns

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

When to Use This Skill

  • Creating data pipeline orchestration with Airflow
  • Designing DAG structures and dependencies
  • Implementing custom operators and sensors
  • Testing Airflow DAGs locally
  • Setting up Airflow in production
  • Debugging failed DAG runs

Core Concepts

1. DAG Design Principles

PrincipleDescription
IdempotentRunning twice produces same result
AtomicTasks succeed or fail completely
IncrementalProcess only new/changed data
ObservableLogs, metrics, alerts at every step

2. Task Dependencies

 1# Linear
 2task1 >> task2 >> task3
 3
 4# Fan-out
 5task1 >> [task2, task3, task4]
 6
 7# Fan-in
 8[task1, task2, task3] >> task4
 9
10# Complex
11task1 >> task2 >> task4
12task1 >> task3 >> task4

Quick Start

 1# dags/example_dag.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.operators.python import PythonOperator
 5from airflow.operators.empty import EmptyOperator
 6
 7default_args = {
 8    'owner': 'data-team',
 9    'depends_on_past': False,
10    'email_on_failure': True,
11    'email_on_retry': False,
12    'retries': 3,
13    'retry_delay': timedelta(minutes=5),
14    'retry_exponential_backoff': True,
15    'max_retry_delay': timedelta(hours=1),
16}
17
18with DAG(
19    dag_id='example_etl',
20    default_args=default_args,
21    description='Example ETL pipeline',
22    schedule='0 6 * * *',  # Daily at 6 AM
23    start_date=datetime(2024, 1, 1),
24    catchup=False,
25    tags=['etl', 'example'],
26    max_active_runs=1,
27) as dag:
28
29    start = EmptyOperator(task_id='start')
30
31    def extract_data(**context):
32        execution_date = context['ds']
33        # Extract logic here
34        return {'records': 1000}
35
36    extract = PythonOperator(
37        task_id='extract',
38        python_callable=extract_data,
39    )
40
41    end = EmptyOperator(task_id='end')
42
43    start >> extract >> end

Patterns

Pattern 1: TaskFlow API (Airflow 2.0+)

 1# dags/taskflow_example.py
 2from datetime import datetime
 3from airflow.decorators import dag, task
 4from airflow.models import Variable
 5
 6@dag(
 7    dag_id='taskflow_etl',
 8    schedule='@daily',
 9    start_date=datetime(2024, 1, 1),
10    catchup=False,
11    tags=['etl', 'taskflow'],
12)
13def taskflow_etl():
14    """ETL pipeline using TaskFlow API"""
15
16    @task()
17    def extract(source: str) -> dict:
18        """Extract data from source"""
19        import pandas as pd
20
21        df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
22        return {'data': df.to_dict(), 'rows': len(df)}
23
24    @task()
25    def transform(extracted: dict) -> dict:
26        """Transform extracted data"""
27        import pandas as pd
28
29        df = pd.DataFrame(extracted['data'])
30        df['processed_at'] = datetime.now()
31        df = df.dropna()
32        return {'data': df.to_dict(), 'rows': len(df)}
33
34    @task()
35    def load(transformed: dict, target: str):
36        """Load data to target"""
37        import pandas as pd
38
39        df = pd.DataFrame(transformed['data'])
40        df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
41        return transformed['rows']
42
43    @task()
44    def notify(rows_loaded: int):
45        """Send notification"""
46        print(f'Loaded {rows_loaded} rows')
47
48    # Define dependencies with XCom passing
49    extracted = extract(source='raw_data')
50    transformed = transform(extracted)
51    loaded = load(transformed, target='processed_data')
52    notify(loaded)
53
54# Instantiate the DAG
55taskflow_etl()

Pattern 2: Dynamic DAG Generation

 1# dags/dynamic_dag_factory.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.operators.python import PythonOperator
 5from airflow.models import Variable
 6import json
 7
 8# Configuration for multiple similar pipelines
 9PIPELINE_CONFIGS = [
10    {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
11    {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
12    {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
13]
14
15def create_dag(config: dict) -> DAG:
16    """Factory function to create DAGs from config"""
17
18    dag_id = f"etl_{config['name']}"
19
20    default_args = {
21        'owner': 'data-team',
22        'retries': 3,
23        'retry_delay': timedelta(minutes=5),
24    }
25
26    dag = DAG(
27        dag_id=dag_id,
28        default_args=default_args,
29        schedule=config['schedule'],
30        start_date=datetime(2024, 1, 1),
31        catchup=False,
32        tags=['etl', 'dynamic', config['name']],
33    )
34
35    with dag:
36        def extract_fn(source, **context):
37            print(f"Extracting from {source} for {context['ds']}")
38
39        def transform_fn(**context):
40            print(f"Transforming data for {context['ds']}")
41
42        def load_fn(table_name, **context):
43            print(f"Loading to {table_name} for {context['ds']}")
44
45        extract = PythonOperator(
46            task_id='extract',
47            python_callable=extract_fn,
48            op_kwargs={'source': config['source']},
49        )
50
51        transform = PythonOperator(
52            task_id='transform',
53            python_callable=transform_fn,
54        )
55
56        load = PythonOperator(
57            task_id='load',
58            python_callable=load_fn,
59            op_kwargs={'table_name': config['name']},
60        )
61
62        extract >> transform >> load
63
64    return dag
65
66# Generate DAGs
67for config in PIPELINE_CONFIGS:
68    globals()[f"dag_{config['name']}"] = create_dag(config)

Pattern 3: Branching and Conditional Logic

 1# dags/branching_example.py
 2from airflow.decorators import dag, task
 3from airflow.operators.python import BranchPythonOperator
 4from airflow.operators.empty import EmptyOperator
 5from airflow.utils.trigger_rule import TriggerRule
 6
 7@dag(
 8    dag_id='branching_pipeline',
 9    schedule='@daily',
10    start_date=datetime(2024, 1, 1),
11    catchup=False,
12)
13def branching_pipeline():
14
15    @task()
16    def check_data_quality() -> dict:
17        """Check data quality and return metrics"""
18        quality_score = 0.95  # Simulated
19        return {'score': quality_score, 'rows': 10000}
20
21    def choose_branch(**context) -> str:
22        """Determine which branch to execute"""
23        ti = context['ti']
24        metrics = ti.xcom_pull(task_ids='check_data_quality')
25
26        if metrics['score'] >= 0.9:
27            return 'high_quality_path'
28        elif metrics['score'] >= 0.7:
29            return 'medium_quality_path'
30        else:
31            return 'low_quality_path'
32
33    quality_check = check_data_quality()
34
35    branch = BranchPythonOperator(
36        task_id='branch',
37        python_callable=choose_branch,
38    )
39
40    high_quality = EmptyOperator(task_id='high_quality_path')
41    medium_quality = EmptyOperator(task_id='medium_quality_path')
42    low_quality = EmptyOperator(task_id='low_quality_path')
43
44    # Join point - runs after any branch completes
45    join = EmptyOperator(
46        task_id='join',
47        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
48    )
49
50    quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
51
52branching_pipeline()

Pattern 4: Sensors and External Dependencies

 1# dags/sensor_patterns.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.sensors.filesystem import FileSensor
 5from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
 6from airflow.sensors.external_task import ExternalTaskSensor
 7from airflow.operators.python import PythonOperator
 8
 9with DAG(
10    dag_id='sensor_example',
11    schedule='@daily',
12    start_date=datetime(2024, 1, 1),
13    catchup=False,
14) as dag:
15
16    # Wait for file on S3
17    wait_for_file = S3KeySensor(
18        task_id='wait_for_s3_file',
19        bucket_name='data-lake',
20        bucket_key='raw/{{ ds }}/data.parquet',
21        aws_conn_id='aws_default',
22        timeout=60 * 60 * 2,  # 2 hours
23        poke_interval=60 * 5,  # Check every 5 minutes
24        mode='reschedule',  # Free up worker slot while waiting
25    )
26
27    # Wait for another DAG to complete
28    wait_for_upstream = ExternalTaskSensor(
29        task_id='wait_for_upstream_dag',
30        external_dag_id='upstream_etl',
31        external_task_id='final_task',
32        execution_date_fn=lambda dt: dt,  # Same execution date
33        timeout=60 * 60 * 3,
34        mode='reschedule',
35    )
36
37    # Custom sensor using @task.sensor decorator
38    @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
39    def wait_for_api() -> PokeReturnValue:
40        """Custom sensor for API availability"""
41        import requests
42
43        response = requests.get('https://api.example.com/health')
44        is_done = response.status_code == 200
45
46        return PokeReturnValue(is_done=is_done, xcom_value=response.json())
47
48    api_ready = wait_for_api()
49
50    def process_data(**context):
51        api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
52        print(f"API returned: {api_result}")
53
54    process = PythonOperator(
55        task_id='process',
56        python_callable=process_data,
57    )
58
59    [wait_for_file, wait_for_upstream, api_ready] >> process

Pattern 5: Error Handling and Alerts

 1# dags/error_handling.py
 2from datetime import datetime, timedelta
 3from airflow import DAG
 4from airflow.operators.python import PythonOperator
 5from airflow.utils.trigger_rule import TriggerRule
 6from airflow.models import Variable
 7
 8def task_failure_callback(context):
 9    """Callback on task failure"""
10    task_instance = context['task_instance']
11    exception = context.get('exception')
12
13    # Send to Slack/PagerDuty/etc
14    message = f"""
15    Task Failed!
16    DAG: {task_instance.dag_id}
17    Task: {task_instance.task_id}
18    Execution Date: {context['ds']}
19    Error: {exception}
20    Log URL: {task_instance.log_url}
21    """
22    # send_slack_alert(message)
23    print(message)
24
25def dag_failure_callback(context):
26    """Callback on DAG failure"""
27    # Aggregate failures, send summary
28    pass
29
30with DAG(
31    dag_id='error_handling_example',
32    schedule='@daily',
33    start_date=datetime(2024, 1, 1),
34    catchup=False,
35    on_failure_callback=dag_failure_callback,
36    default_args={
37        'on_failure_callback': task_failure_callback,
38        'retries': 3,
39        'retry_delay': timedelta(minutes=5),
40    },
41) as dag:
42
43    def might_fail(**context):
44        import random
45        if random.random() < 0.3:
46            raise ValueError("Random failure!")
47        return "Success"
48
49    risky_task = PythonOperator(
50        task_id='risky_task',
51        python_callable=might_fail,
52    )
53
54    def cleanup(**context):
55        """Cleanup runs regardless of upstream failures"""
56        print("Cleaning up...")
57
58    cleanup_task = PythonOperator(
59        task_id='cleanup',
60        python_callable=cleanup,
61        trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
62    )
63
64    def notify_success(**context):
65        """Only runs if all upstream succeeded"""
66        print("All tasks succeeded!")
67
68    success_notification = PythonOperator(
69        task_id='notify_success',
70        python_callable=notify_success,
71        trigger_rule=TriggerRule.ALL_SUCCESS,
72    )
73
74    risky_task >> [cleanup_task, success_notification]

Pattern 6: Testing DAGs

 1# tests/test_dags.py
 2import pytest
 3from datetime import datetime
 4from airflow.models import DagBag
 5
 6@pytest.fixture
 7def dagbag():
 8    return DagBag(dag_folder='dags/', include_examples=False)
 9
10def test_dag_loaded(dagbag):
11    """Test that all DAGs load without errors"""
12    assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
13
14def test_dag_structure(dagbag):
15    """Test specific DAG structure"""
16    dag = dagbag.get_dag('example_etl')
17
18    assert dag is not None
19    assert len(dag.tasks) == 3
20    assert dag.schedule_interval == '0 6 * * *'
21
22def test_task_dependencies(dagbag):
23    """Test task dependencies are correct"""
24    dag = dagbag.get_dag('example_etl')
25
26    extract_task = dag.get_task('extract')
27    assert 'start' in [t.task_id for t in extract_task.upstream_list]
28    assert 'end' in [t.task_id for t in extract_task.downstream_list]
29
30def test_dag_integrity(dagbag):
31    """Test DAG has no cycles and is valid"""
32    for dag_id, dag in dagbag.dags.items():
33        assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
34
35# Test individual task logic
36def test_extract_function():
37    """Unit test for extract function"""
38    from dags.example_dag import extract_data
39
40    result = extract_data(ds='2024-01-01')
41    assert 'records' in result
42    assert isinstance(result['records'], int)

Project Structure

airflow/
├── dags/
│   ├── __init__.py
│   ├── common/
│   │   ├── __init__.py
│   │   ├── operators.py    # Custom operators
│   │   ├── sensors.py      # Custom sensors
│   │   └── callbacks.py    # Alert callbacks
│   ├── etl/
│   │   ├── customers.py
│   │   └── orders.py
│   └── ml/
│       └── training.py
├── plugins/
│   └── custom_plugin.py
├── tests/
│   ├── __init__.py
│   ├── test_dags.py
│   └── test_operators.py
├── docker-compose.yml
└── requirements.txt

Best Practices

Do’s

  • Use TaskFlow API - Cleaner code, automatic XCom
  • Set timeouts - Prevent zombie tasks
  • Use mode='reschedule' - For sensors, free up workers
  • Test DAGs - Unit tests and integration tests
  • Idempotent tasks - Safe to retry

Don’ts

  • Don’t use depends_on_past=True - Creates bottlenecks
  • Don’t hardcode dates - Use {{ ds }} macros
  • Don’t use global state - Tasks should be stateless
  • Don’t skip catchup blindly - Understand implications
  • Don’t put heavy logic in DAG file - Import from modules

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Apache Airflow 2.0+
Python 3.8+
pandas (for data processing examples)
boto3 (for AWS S3 integration)

Framework Support

Apache Airflow 2.x ✓ (recommended) AWS providers ✓ Google Cloud providers ✓ PostgreSQL providers ✓

Context Window

Token Usage ~3K-8K tokens for complex DAG patterns

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
productivity-tools