Saga Orchestration
Orchestrate distributed transactions with bulletproof compensation
✨ The solution you've been looking for
Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.
See It In Action
Interactive preview & real-world examples
AI Conversation Simulator
See how users interact with this skill
User Prompt
I need to implement an order processing workflow that reserves inventory, charges payment, creates shipment, and sends confirmation. If any step fails, I need to undo the previous steps automatically.
Skill Processing
Analyzing request...
Agent Response
Complete saga orchestrator with compensation logic for order fulfillment, including timeout handling and error recovery
Quick Start (3 Steps)
Get up and running in minutes
Install
claude-code skill install saga-orchestration
claude-code skill install saga-orchestrationConfig
First Trigger
@saga-orchestration helpCommands
| Command | Description | Required Args |
|---|---|---|
| @saga-orchestration order-fulfillment-pipeline | Coordinate inventory reservation, payment processing, and shipment creation across multiple services with automatic rollback on failures | None |
| @saga-orchestration multi-service-business-process | Build approval workflows or complex business processes spanning multiple bounded contexts with proper state management | None |
| @saga-orchestration long-running-transaction-recovery | Handle failure scenarios in distributed systems with proper compensation and state recovery mechanisms | None |
Typical Use Cases
Order Fulfillment Pipeline
Coordinate inventory reservation, payment processing, and shipment creation across multiple services with automatic rollback on failures
Multi-Service Business Process
Build approval workflows or complex business processes spanning multiple bounded contexts with proper state management
Long-Running Transaction Recovery
Handle failure scenarios in distributed systems with proper compensation and state recovery mechanisms
Overview
Saga Orchestration
Patterns for managing distributed transactions and long-running business processes.
When to Use This Skill
- Coordinating multi-service transactions
- Implementing compensating transactions
- Managing long-running business workflows
- Handling failures in distributed systems
- Building order fulfillment processes
- Implementing approval workflows
Core Concepts
1. Saga Types
Choreography Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
│Svc1││Svc2││Svc3│
└────┘└────┘└────┘
2. Saga Execution States
| State | Description |
|---|---|
| Started | Saga initiated |
| Pending | Waiting for step completion |
| Compensating | Rolling back due to failure |
| Completed | All steps succeeded |
| Failed | Saga failed after compensation |
Templates
Template 1: Saga Orchestrator Base
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from enum import Enum
4from typing import List, Dict, Any, Optional
5from datetime import datetime
6import uuid
7
8class SagaState(Enum):
9 STARTED = "started"
10 PENDING = "pending"
11 COMPENSATING = "compensating"
12 COMPLETED = "completed"
13 FAILED = "failed"
14
15
16@dataclass
17class SagaStep:
18 name: str
19 action: str
20 compensation: str
21 status: str = "pending"
22 result: Optional[Dict] = None
23 error: Optional[str] = None
24 executed_at: Optional[datetime] = None
25 compensated_at: Optional[datetime] = None
26
27
28@dataclass
29class Saga:
30 saga_id: str
31 saga_type: str
32 state: SagaState
33 data: Dict[str, Any]
34 steps: List[SagaStep]
35 current_step: int = 0
36 created_at: datetime = field(default_factory=datetime.utcnow)
37 updated_at: datetime = field(default_factory=datetime.utcnow)
38
39
40class SagaOrchestrator(ABC):
41 """Base class for saga orchestrators."""
42
43 def __init__(self, saga_store, event_publisher):
44 self.saga_store = saga_store
45 self.event_publisher = event_publisher
46
47 @abstractmethod
48 def define_steps(self, data: Dict) -> List[SagaStep]:
49 """Define the saga steps."""
50 pass
51
52 @property
53 @abstractmethod
54 def saga_type(self) -> str:
55 """Unique saga type identifier."""
56 pass
57
58 async def start(self, data: Dict) -> Saga:
59 """Start a new saga."""
60 saga = Saga(
61 saga_id=str(uuid.uuid4()),
62 saga_type=self.saga_type,
63 state=SagaState.STARTED,
64 data=data,
65 steps=self.define_steps(data)
66 )
67 await self.saga_store.save(saga)
68 await self._execute_next_step(saga)
69 return saga
70
71 async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
72 """Handle successful step completion."""
73 saga = await self.saga_store.get(saga_id)
74
75 # Update step
76 for step in saga.steps:
77 if step.name == step_name:
78 step.status = "completed"
79 step.result = result
80 step.executed_at = datetime.utcnow()
81 break
82
83 saga.current_step += 1
84 saga.updated_at = datetime.utcnow()
85
86 # Check if saga is complete
87 if saga.current_step >= len(saga.steps):
88 saga.state = SagaState.COMPLETED
89 await self.saga_store.save(saga)
90 await self._on_saga_completed(saga)
91 else:
92 saga.state = SagaState.PENDING
93 await self.saga_store.save(saga)
94 await self._execute_next_step(saga)
95
96 async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
97 """Handle step failure - start compensation."""
98 saga = await self.saga_store.get(saga_id)
99
100 # Mark step as failed
101 for step in saga.steps:
102 if step.name == step_name:
103 step.status = "failed"
104 step.error = error
105 break
106
107 saga.state = SagaState.COMPENSATING
108 saga.updated_at = datetime.utcnow()
109 await self.saga_store.save(saga)
110
111 # Start compensation from current step backwards
112 await self._compensate(saga)
113
114 async def _execute_next_step(self, saga: Saga):
115 """Execute the next step in the saga."""
116 if saga.current_step >= len(saga.steps):
117 return
118
119 step = saga.steps[saga.current_step]
120 step.status = "executing"
121 await self.saga_store.save(saga)
122
123 # Publish command to execute step
124 await self.event_publisher.publish(
125 step.action,
126 {
127 "saga_id": saga.saga_id,
128 "step_name": step.name,
129 **saga.data
130 }
131 )
132
133 async def _compensate(self, saga: Saga):
134 """Execute compensation for completed steps."""
135 # Compensate in reverse order
136 for i in range(saga.current_step - 1, -1, -1):
137 step = saga.steps[i]
138 if step.status == "completed":
139 step.status = "compensating"
140 await self.saga_store.save(saga)
141
142 await self.event_publisher.publish(
143 step.compensation,
144 {
145 "saga_id": saga.saga_id,
146 "step_name": step.name,
147 "original_result": step.result,
148 **saga.data
149 }
150 )
151
152 async def handle_compensation_completed(self, saga_id: str, step_name: str):
153 """Handle compensation completion."""
154 saga = await self.saga_store.get(saga_id)
155
156 for step in saga.steps:
157 if step.name == step_name:
158 step.status = "compensated"
159 step.compensated_at = datetime.utcnow()
160 break
161
162 # Check if all compensations complete
163 all_compensated = all(
164 s.status in ("compensated", "pending", "failed")
165 for s in saga.steps
166 )
167
168 if all_compensated:
169 saga.state = SagaState.FAILED
170 await self._on_saga_failed(saga)
171
172 await self.saga_store.save(saga)
173
174 async def _on_saga_completed(self, saga: Saga):
175 """Called when saga completes successfully."""
176 await self.event_publisher.publish(
177 f"{self.saga_type}Completed",
178 {"saga_id": saga.saga_id, **saga.data}
179 )
180
181 async def _on_saga_failed(self, saga: Saga):
182 """Called when saga fails after compensation."""
183 await self.event_publisher.publish(
184 f"{self.saga_type}Failed",
185 {"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
186 )
Template 2: Order Fulfillment Saga
1class OrderFulfillmentSaga(SagaOrchestrator):
2 """Orchestrates order fulfillment across services."""
3
4 @property
5 def saga_type(self) -> str:
6 return "OrderFulfillment"
7
8 def define_steps(self, data: Dict) -> List[SagaStep]:
9 return [
10 SagaStep(
11 name="reserve_inventory",
12 action="InventoryService.ReserveItems",
13 compensation="InventoryService.ReleaseReservation"
14 ),
15 SagaStep(
16 name="process_payment",
17 action="PaymentService.ProcessPayment",
18 compensation="PaymentService.RefundPayment"
19 ),
20 SagaStep(
21 name="create_shipment",
22 action="ShippingService.CreateShipment",
23 compensation="ShippingService.CancelShipment"
24 ),
25 SagaStep(
26 name="send_confirmation",
27 action="NotificationService.SendOrderConfirmation",
28 compensation="NotificationService.SendCancellationNotice"
29 )
30 ]
31
32
33# Usage
34async def create_order(order_data: Dict):
35 saga = OrderFulfillmentSaga(saga_store, event_publisher)
36 return await saga.start({
37 "order_id": order_data["order_id"],
38 "customer_id": order_data["customer_id"],
39 "items": order_data["items"],
40 "payment_method": order_data["payment_method"],
41 "shipping_address": order_data["shipping_address"]
42 })
43
44
45# Event handlers in each service
46class InventoryService:
47 async def handle_reserve_items(self, command: Dict):
48 try:
49 # Reserve inventory
50 reservation = await self.reserve(
51 command["items"],
52 command["order_id"]
53 )
54 # Report success
55 await self.event_publisher.publish(
56 "SagaStepCompleted",
57 {
58 "saga_id": command["saga_id"],
59 "step_name": "reserve_inventory",
60 "result": {"reservation_id": reservation.id}
61 }
62 )
63 except InsufficientInventoryError as e:
64 await self.event_publisher.publish(
65 "SagaStepFailed",
66 {
67 "saga_id": command["saga_id"],
68 "step_name": "reserve_inventory",
69 "error": str(e)
70 }
71 )
72
73 async def handle_release_reservation(self, command: Dict):
74 # Compensating action
75 await self.release_reservation(
76 command["original_result"]["reservation_id"]
77 )
78 await self.event_publisher.publish(
79 "SagaCompensationCompleted",
80 {
81 "saga_id": command["saga_id"],
82 "step_name": "reserve_inventory"
83 }
84 )
Template 3: Choreography-Based Saga
1from dataclasses import dataclass
2from typing import Dict, Any
3import asyncio
4
5@dataclass
6class SagaContext:
7 """Passed through choreographed saga events."""
8 saga_id: str
9 step: int
10 data: Dict[str, Any]
11 completed_steps: list
12
13
14class OrderChoreographySaga:
15 """Choreography-based saga using events."""
16
17 def __init__(self, event_bus):
18 self.event_bus = event_bus
19 self._register_handlers()
20
21 def _register_handlers(self):
22 self.event_bus.subscribe("OrderCreated", self._on_order_created)
23 self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
24 self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
25 self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
26
27 # Compensation handlers
28 self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
29 self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
30
31 async def _on_order_created(self, event: Dict):
32 """Step 1: Order created, reserve inventory."""
33 await self.event_bus.publish("ReserveInventory", {
34 "saga_id": event["order_id"],
35 "order_id": event["order_id"],
36 "items": event["items"]
37 })
38
39 async def _on_inventory_reserved(self, event: Dict):
40 """Step 2: Inventory reserved, process payment."""
41 await self.event_bus.publish("ProcessPayment", {
42 "saga_id": event["saga_id"],
43 "order_id": event["order_id"],
44 "amount": event["total_amount"],
45 "reservation_id": event["reservation_id"]
46 })
47
48 async def _on_payment_processed(self, event: Dict):
49 """Step 3: Payment done, create shipment."""
50 await self.event_bus.publish("CreateShipment", {
51 "saga_id": event["saga_id"],
52 "order_id": event["order_id"],
53 "payment_id": event["payment_id"]
54 })
55
56 async def _on_shipment_created(self, event: Dict):
57 """Step 4: Complete - send confirmation."""
58 await self.event_bus.publish("OrderFulfilled", {
59 "saga_id": event["saga_id"],
60 "order_id": event["order_id"],
61 "tracking_number": event["tracking_number"]
62 })
63
64 # Compensation handlers
65 async def _on_payment_failed(self, event: Dict):
66 """Payment failed - release inventory."""
67 await self.event_bus.publish("ReleaseInventory", {
68 "saga_id": event["saga_id"],
69 "reservation_id": event["reservation_id"]
70 })
71 await self.event_bus.publish("OrderFailed", {
72 "order_id": event["order_id"],
73 "reason": "Payment failed"
74 })
75
76 async def _on_shipment_failed(self, event: Dict):
77 """Shipment failed - refund payment and release inventory."""
78 await self.event_bus.publish("RefundPayment", {
79 "saga_id": event["saga_id"],
80 "payment_id": event["payment_id"]
81 })
82 await self.event_bus.publish("ReleaseInventory", {
83 "saga_id": event["saga_id"],
84 "reservation_id": event["reservation_id"]
85 })
Template 4: Saga with Timeouts
1class TimeoutSagaOrchestrator(SagaOrchestrator):
2 """Saga orchestrator with step timeouts."""
3
4 def __init__(self, saga_store, event_publisher, scheduler):
5 super().__init__(saga_store, event_publisher)
6 self.scheduler = scheduler
7
8 async def _execute_next_step(self, saga: Saga):
9 if saga.current_step >= len(saga.steps):
10 return
11
12 step = saga.steps[saga.current_step]
13 step.status = "executing"
14 step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
15 await self.saga_store.save(saga)
16
17 # Schedule timeout check
18 await self.scheduler.schedule(
19 f"saga_timeout_{saga.saga_id}_{step.name}",
20 self._check_timeout,
21 {"saga_id": saga.saga_id, "step_name": step.name},
22 run_at=step.timeout_at
23 )
24
25 await self.event_publisher.publish(
26 step.action,
27 {"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
28 )
29
30 async def _check_timeout(self, data: Dict):
31 """Check if step has timed out."""
32 saga = await self.saga_store.get(data["saga_id"])
33 step = next(s for s in saga.steps if s.name == data["step_name"])
34
35 if step.status == "executing":
36 # Step timed out - fail it
37 await self.handle_step_failed(
38 data["saga_id"],
39 data["step_name"],
40 "Step timed out"
41 )
Best Practices
Do’s
- Make steps idempotent - Safe to retry
- Design compensations carefully - They must work
- Use correlation IDs - For tracing across services
- Implement timeouts - Don’t wait forever
- Log everything - For debugging failures
Don’ts
- Don’t assume instant completion - Sagas take time
- Don’t skip compensation testing - Most critical part
- Don’t couple services - Use async messaging
- Don’t ignore partial failures - Handle gracefully
Resources
What Users Are Saying
Real feedback from the community
Environment Matrix
Dependencies
Framework Support
Context Window
Security & Privacy
Information
- Author
- wshobson
- Updated
- 2026-01-30
- Category
- architecture-patterns
Related Skills
Saga Orchestration
Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when …
View Details →Workflow Orchestration Patterns
Design durable workflows with Temporal for distributed systems. Covers workflow vs activity …
View Details →Workflow Orchestration Patterns
Design durable workflows with Temporal for distributed systems. Covers workflow vs activity …
View Details →