Airflow Dag Patterns
Build production-ready Apache Airflow DAGs with battle-tested patterns
✨ The solution you've been looking for
Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
See It In Action
Interactive preview & real-world examples
AI Conversation Simulator
See how users interact with this skill
User Prompt
Help me create an Airflow DAG that runs daily at 6 AM to extract customer data from S3, clean it, and load it to PostgreSQL with proper error handling
Skill Processing
Analyzing request...
Agent Response
Complete DAG implementation with TaskFlow API, retry logic, and monitoring
Quick Start (3 Steps)
Get up and running in minutes
Install
claude-code skill install airflow-dag-patterns
claude-code skill install airflow-dag-patternsConfig
First Trigger
@airflow-dag-patterns helpCommands
| Command | Description | Required Args |
|---|---|---|
| @airflow-dag-patterns creating-a-daily-etl-pipeline | Build a production ETL pipeline that extracts data from S3, transforms it with pandas, and loads to a data warehouse | None |
| @airflow-dag-patterns setting-up-dynamic-dags | Generate multiple similar DAGs from configuration to process different data sources | None |
| @airflow-dag-patterns implementing-complex-dependencies | Design DAGs with branching logic, sensors, and external task dependencies | None |
Typical Use Cases
Creating a Daily ETL Pipeline
Build a production ETL pipeline that extracts data from S3, transforms it with pandas, and loads to a data warehouse
Setting Up Dynamic DAGs
Generate multiple similar DAGs from configuration to process different data sources
Implementing Complex Dependencies
Design DAGs with branching logic, sensors, and external task dependencies
Overview
Apache Airflow DAG Patterns
Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.
When to Use This Skill
- Creating data pipeline orchestration with Airflow
- Designing DAG structures and dependencies
- Implementing custom operators and sensors
- Testing Airflow DAGs locally
- Setting up Airflow in production
- Debugging failed DAG runs
Core Concepts
1. DAG Design Principles
| Principle | Description |
|---|---|
| Idempotent | Running twice produces same result |
| Atomic | Tasks succeed or fail completely |
| Incremental | Process only new/changed data |
| Observable | Logs, metrics, alerts at every step |
2. Task Dependencies
1# Linear
2task1 >> task2 >> task3
3
4# Fan-out
5task1 >> [task2, task3, task4]
6
7# Fan-in
8[task1, task2, task3] >> task4
9
10# Complex
11task1 >> task2 >> task4
12task1 >> task3 >> task4
Quick Start
1# dags/example_dag.py
2from datetime import datetime, timedelta
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.operators.empty import EmptyOperator
6
7default_args = {
8 'owner': 'data-team',
9 'depends_on_past': False,
10 'email_on_failure': True,
11 'email_on_retry': False,
12 'retries': 3,
13 'retry_delay': timedelta(minutes=5),
14 'retry_exponential_backoff': True,
15 'max_retry_delay': timedelta(hours=1),
16}
17
18with DAG(
19 dag_id='example_etl',
20 default_args=default_args,
21 description='Example ETL pipeline',
22 schedule='0 6 * * *', # Daily at 6 AM
23 start_date=datetime(2024, 1, 1),
24 catchup=False,
25 tags=['etl', 'example'],
26 max_active_runs=1,
27) as dag:
28
29 start = EmptyOperator(task_id='start')
30
31 def extract_data(**context):
32 execution_date = context['ds']
33 # Extract logic here
34 return {'records': 1000}
35
36 extract = PythonOperator(
37 task_id='extract',
38 python_callable=extract_data,
39 )
40
41 end = EmptyOperator(task_id='end')
42
43 start >> extract >> end
Patterns
Pattern 1: TaskFlow API (Airflow 2.0+)
1# dags/taskflow_example.py
2from datetime import datetime
3from airflow.decorators import dag, task
4from airflow.models import Variable
5
6@dag(
7 dag_id='taskflow_etl',
8 schedule='@daily',
9 start_date=datetime(2024, 1, 1),
10 catchup=False,
11 tags=['etl', 'taskflow'],
12)
13def taskflow_etl():
14 """ETL pipeline using TaskFlow API"""
15
16 @task()
17 def extract(source: str) -> dict:
18 """Extract data from source"""
19 import pandas as pd
20
21 df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
22 return {'data': df.to_dict(), 'rows': len(df)}
23
24 @task()
25 def transform(extracted: dict) -> dict:
26 """Transform extracted data"""
27 import pandas as pd
28
29 df = pd.DataFrame(extracted['data'])
30 df['processed_at'] = datetime.now()
31 df = df.dropna()
32 return {'data': df.to_dict(), 'rows': len(df)}
33
34 @task()
35 def load(transformed: dict, target: str):
36 """Load data to target"""
37 import pandas as pd
38
39 df = pd.DataFrame(transformed['data'])
40 df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
41 return transformed['rows']
42
43 @task()
44 def notify(rows_loaded: int):
45 """Send notification"""
46 print(f'Loaded {rows_loaded} rows')
47
48 # Define dependencies with XCom passing
49 extracted = extract(source='raw_data')
50 transformed = transform(extracted)
51 loaded = load(transformed, target='processed_data')
52 notify(loaded)
53
54# Instantiate the DAG
55taskflow_etl()
Pattern 2: Dynamic DAG Generation
1# dags/dynamic_dag_factory.py
2from datetime import datetime, timedelta
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.models import Variable
6import json
7
8# Configuration for multiple similar pipelines
9PIPELINE_CONFIGS = [
10 {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
11 {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
12 {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
13]
14
15def create_dag(config: dict) -> DAG:
16 """Factory function to create DAGs from config"""
17
18 dag_id = f"etl_{config['name']}"
19
20 default_args = {
21 'owner': 'data-team',
22 'retries': 3,
23 'retry_delay': timedelta(minutes=5),
24 }
25
26 dag = DAG(
27 dag_id=dag_id,
28 default_args=default_args,
29 schedule=config['schedule'],
30 start_date=datetime(2024, 1, 1),
31 catchup=False,
32 tags=['etl', 'dynamic', config['name']],
33 )
34
35 with dag:
36 def extract_fn(source, **context):
37 print(f"Extracting from {source} for {context['ds']}")
38
39 def transform_fn(**context):
40 print(f"Transforming data for {context['ds']}")
41
42 def load_fn(table_name, **context):
43 print(f"Loading to {table_name} for {context['ds']}")
44
45 extract = PythonOperator(
46 task_id='extract',
47 python_callable=extract_fn,
48 op_kwargs={'source': config['source']},
49 )
50
51 transform = PythonOperator(
52 task_id='transform',
53 python_callable=transform_fn,
54 )
55
56 load = PythonOperator(
57 task_id='load',
58 python_callable=load_fn,
59 op_kwargs={'table_name': config['name']},
60 )
61
62 extract >> transform >> load
63
64 return dag
65
66# Generate DAGs
67for config in PIPELINE_CONFIGS:
68 globals()[f"dag_{config['name']}"] = create_dag(config)
Pattern 3: Branching and Conditional Logic
1# dags/branching_example.py
2from airflow.decorators import dag, task
3from airflow.operators.python import BranchPythonOperator
4from airflow.operators.empty import EmptyOperator
5from airflow.utils.trigger_rule import TriggerRule
6
7@dag(
8 dag_id='branching_pipeline',
9 schedule='@daily',
10 start_date=datetime(2024, 1, 1),
11 catchup=False,
12)
13def branching_pipeline():
14
15 @task()
16 def check_data_quality() -> dict:
17 """Check data quality and return metrics"""
18 quality_score = 0.95 # Simulated
19 return {'score': quality_score, 'rows': 10000}
20
21 def choose_branch(**context) -> str:
22 """Determine which branch to execute"""
23 ti = context['ti']
24 metrics = ti.xcom_pull(task_ids='check_data_quality')
25
26 if metrics['score'] >= 0.9:
27 return 'high_quality_path'
28 elif metrics['score'] >= 0.7:
29 return 'medium_quality_path'
30 else:
31 return 'low_quality_path'
32
33 quality_check = check_data_quality()
34
35 branch = BranchPythonOperator(
36 task_id='branch',
37 python_callable=choose_branch,
38 )
39
40 high_quality = EmptyOperator(task_id='high_quality_path')
41 medium_quality = EmptyOperator(task_id='medium_quality_path')
42 low_quality = EmptyOperator(task_id='low_quality_path')
43
44 # Join point - runs after any branch completes
45 join = EmptyOperator(
46 task_id='join',
47 trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
48 )
49
50 quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
51
52branching_pipeline()
Pattern 4: Sensors and External Dependencies
1# dags/sensor_patterns.py
2from datetime import datetime, timedelta
3from airflow import DAG
4from airflow.sensors.filesystem import FileSensor
5from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
6from airflow.sensors.external_task import ExternalTaskSensor
7from airflow.operators.python import PythonOperator
8
9with DAG(
10 dag_id='sensor_example',
11 schedule='@daily',
12 start_date=datetime(2024, 1, 1),
13 catchup=False,
14) as dag:
15
16 # Wait for file on S3
17 wait_for_file = S3KeySensor(
18 task_id='wait_for_s3_file',
19 bucket_name='data-lake',
20 bucket_key='raw/{{ ds }}/data.parquet',
21 aws_conn_id='aws_default',
22 timeout=60 * 60 * 2, # 2 hours
23 poke_interval=60 * 5, # Check every 5 minutes
24 mode='reschedule', # Free up worker slot while waiting
25 )
26
27 # Wait for another DAG to complete
28 wait_for_upstream = ExternalTaskSensor(
29 task_id='wait_for_upstream_dag',
30 external_dag_id='upstream_etl',
31 external_task_id='final_task',
32 execution_date_fn=lambda dt: dt, # Same execution date
33 timeout=60 * 60 * 3,
34 mode='reschedule',
35 )
36
37 # Custom sensor using @task.sensor decorator
38 @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
39 def wait_for_api() -> PokeReturnValue:
40 """Custom sensor for API availability"""
41 import requests
42
43 response = requests.get('https://api.example.com/health')
44 is_done = response.status_code == 200
45
46 return PokeReturnValue(is_done=is_done, xcom_value=response.json())
47
48 api_ready = wait_for_api()
49
50 def process_data(**context):
51 api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
52 print(f"API returned: {api_result}")
53
54 process = PythonOperator(
55 task_id='process',
56 python_callable=process_data,
57 )
58
59 [wait_for_file, wait_for_upstream, api_ready] >> process
Pattern 5: Error Handling and Alerts
1# dags/error_handling.py
2from datetime import datetime, timedelta
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.utils.trigger_rule import TriggerRule
6from airflow.models import Variable
7
8def task_failure_callback(context):
9 """Callback on task failure"""
10 task_instance = context['task_instance']
11 exception = context.get('exception')
12
13 # Send to Slack/PagerDuty/etc
14 message = f"""
15 Task Failed!
16 DAG: {task_instance.dag_id}
17 Task: {task_instance.task_id}
18 Execution Date: {context['ds']}
19 Error: {exception}
20 Log URL: {task_instance.log_url}
21 """
22 # send_slack_alert(message)
23 print(message)
24
25def dag_failure_callback(context):
26 """Callback on DAG failure"""
27 # Aggregate failures, send summary
28 pass
29
30with DAG(
31 dag_id='error_handling_example',
32 schedule='@daily',
33 start_date=datetime(2024, 1, 1),
34 catchup=False,
35 on_failure_callback=dag_failure_callback,
36 default_args={
37 'on_failure_callback': task_failure_callback,
38 'retries': 3,
39 'retry_delay': timedelta(minutes=5),
40 },
41) as dag:
42
43 def might_fail(**context):
44 import random
45 if random.random() < 0.3:
46 raise ValueError("Random failure!")
47 return "Success"
48
49 risky_task = PythonOperator(
50 task_id='risky_task',
51 python_callable=might_fail,
52 )
53
54 def cleanup(**context):
55 """Cleanup runs regardless of upstream failures"""
56 print("Cleaning up...")
57
58 cleanup_task = PythonOperator(
59 task_id='cleanup',
60 python_callable=cleanup,
61 trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails
62 )
63
64 def notify_success(**context):
65 """Only runs if all upstream succeeded"""
66 print("All tasks succeeded!")
67
68 success_notification = PythonOperator(
69 task_id='notify_success',
70 python_callable=notify_success,
71 trigger_rule=TriggerRule.ALL_SUCCESS,
72 )
73
74 risky_task >> [cleanup_task, success_notification]
Pattern 6: Testing DAGs
1# tests/test_dags.py
2import pytest
3from datetime import datetime
4from airflow.models import DagBag
5
6@pytest.fixture
7def dagbag():
8 return DagBag(dag_folder='dags/', include_examples=False)
9
10def test_dag_loaded(dagbag):
11 """Test that all DAGs load without errors"""
12 assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
13
14def test_dag_structure(dagbag):
15 """Test specific DAG structure"""
16 dag = dagbag.get_dag('example_etl')
17
18 assert dag is not None
19 assert len(dag.tasks) == 3
20 assert dag.schedule_interval == '0 6 * * *'
21
22def test_task_dependencies(dagbag):
23 """Test task dependencies are correct"""
24 dag = dagbag.get_dag('example_etl')
25
26 extract_task = dag.get_task('extract')
27 assert 'start' in [t.task_id for t in extract_task.upstream_list]
28 assert 'end' in [t.task_id for t in extract_task.downstream_list]
29
30def test_dag_integrity(dagbag):
31 """Test DAG has no cycles and is valid"""
32 for dag_id, dag in dagbag.dags.items():
33 assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
34
35# Test individual task logic
36def test_extract_function():
37 """Unit test for extract function"""
38 from dags.example_dag import extract_data
39
40 result = extract_data(ds='2024-01-01')
41 assert 'records' in result
42 assert isinstance(result['records'], int)
Project Structure
airflow/
├── dags/
│ ├── __init__.py
│ ├── common/
│ │ ├── __init__.py
│ │ ├── operators.py # Custom operators
│ │ ├── sensors.py # Custom sensors
│ │ └── callbacks.py # Alert callbacks
│ ├── etl/
│ │ ├── customers.py
│ │ └── orders.py
│ └── ml/
│ └── training.py
├── plugins/
│ └── custom_plugin.py
├── tests/
│ ├── __init__.py
│ ├── test_dags.py
│ └── test_operators.py
├── docker-compose.yml
└── requirements.txt
Best Practices
Do’s
- Use TaskFlow API - Cleaner code, automatic XCom
- Set timeouts - Prevent zombie tasks
- Use
mode='reschedule'- For sensors, free up workers - Test DAGs - Unit tests and integration tests
- Idempotent tasks - Safe to retry
Don’ts
- Don’t use
depends_on_past=True- Creates bottlenecks - Don’t hardcode dates - Use
{{ ds }}macros - Don’t use global state - Tasks should be stateless
- Don’t skip catchup blindly - Understand implications
- Don’t put heavy logic in DAG file - Import from modules
Resources
What Users Are Saying
Real feedback from the community
Environment Matrix
Dependencies
Framework Support
Context Window
Security & Privacy
Information
- Author
- wshobson
- Updated
- 2026-01-30
- Category
- productivity-tools
Related Skills
Airflow Dag Patterns
Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and …
View Details →Component Search
This skill should be used when users need to discover Redpanda Connect components for their …
View Details →Component Search
This skill should be used when users need to discover Redpanda Connect components for their …
View Details →