Saga Orchestration

Orchestrate distributed transactions with bulletproof compensation

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Implement saga patterns for distributed transactions and cross-aggregate workflows. Use when coordinating multi-step business processes, handling compensating transactions, or managing long-running workflows.

saga-pattern distributed-transactions microservices workflow-orchestration compensation event-driven resilience long-running-processes
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

I need to implement an order processing workflow that reserves inventory, charges payment, creates shipment, and sends confirmation. If any step fails, I need to undo the previous steps automatically.

Skill Processing

Analyzing request...

Agent Response

Complete saga orchestrator with compensation logic for order fulfillment, including timeout handling and error recovery

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install saga-orchestration

claude-code skill install saga-orchestration
2

Config

3

First Trigger

@saga-orchestration help

Commands

CommandDescriptionRequired Args
@saga-orchestration order-fulfillment-pipelineCoordinate inventory reservation, payment processing, and shipment creation across multiple services with automatic rollback on failuresNone
@saga-orchestration multi-service-business-processBuild approval workflows or complex business processes spanning multiple bounded contexts with proper state managementNone
@saga-orchestration long-running-transaction-recoveryHandle failure scenarios in distributed systems with proper compensation and state recovery mechanismsNone

Typical Use Cases

Order Fulfillment Pipeline

Coordinate inventory reservation, payment processing, and shipment creation across multiple services with automatic rollback on failures

Multi-Service Business Process

Build approval workflows or complex business processes spanning multiple bounded contexts with proper state management

Long-Running Transaction Recovery

Handle failure scenarios in distributed systems with proper compensation and state recovery mechanisms

Overview

Saga Orchestration

Patterns for managing distributed transactions and long-running business processes.

When to Use This Skill

  • Coordinating multi-service transactions
  • Implementing compensating transactions
  • Managing long-running business workflows
  • Handling failures in distributed systems
  • Building order fulfillment processes
  • Implementing approval workflows

Core Concepts

1. Saga Types

Choreography                    Orchestration
┌─────┐  ┌─────┐  ┌─────┐     ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│     │ Orchestrator│
└─────┘  └─────┘  └─────┘     └──────┬──────┘
   │        │        │               │
   ▼        ▼        ▼         ┌─────┼─────┐
 Event    Event    Event       ▼     ▼     ▼
                            ┌────┐┌────┐┌────┐
                            │Svc1││Svc2││Svc3│
                            └────┘└────┘└────┘

2. Saga Execution States

StateDescription
StartedSaga initiated
PendingWaiting for step completion
CompensatingRolling back due to failure
CompletedAll steps succeeded
FailedSaga failed after compensation

Templates

Template 1: Saga Orchestrator Base

  1from abc import ABC, abstractmethod
  2from dataclasses import dataclass, field
  3from enum import Enum
  4from typing import List, Dict, Any, Optional
  5from datetime import datetime
  6import uuid
  7
  8class SagaState(Enum):
  9    STARTED = "started"
 10    PENDING = "pending"
 11    COMPENSATING = "compensating"
 12    COMPLETED = "completed"
 13    FAILED = "failed"
 14
 15
 16@dataclass
 17class SagaStep:
 18    name: str
 19    action: str
 20    compensation: str
 21    status: str = "pending"
 22    result: Optional[Dict] = None
 23    error: Optional[str] = None
 24    executed_at: Optional[datetime] = None
 25    compensated_at: Optional[datetime] = None
 26
 27
 28@dataclass
 29class Saga:
 30    saga_id: str
 31    saga_type: str
 32    state: SagaState
 33    data: Dict[str, Any]
 34    steps: List[SagaStep]
 35    current_step: int = 0
 36    created_at: datetime = field(default_factory=datetime.utcnow)
 37    updated_at: datetime = field(default_factory=datetime.utcnow)
 38
 39
 40class SagaOrchestrator(ABC):
 41    """Base class for saga orchestrators."""
 42
 43    def __init__(self, saga_store, event_publisher):
 44        self.saga_store = saga_store
 45        self.event_publisher = event_publisher
 46
 47    @abstractmethod
 48    def define_steps(self, data: Dict) -> List[SagaStep]:
 49        """Define the saga steps."""
 50        pass
 51
 52    @property
 53    @abstractmethod
 54    def saga_type(self) -> str:
 55        """Unique saga type identifier."""
 56        pass
 57
 58    async def start(self, data: Dict) -> Saga:
 59        """Start a new saga."""
 60        saga = Saga(
 61            saga_id=str(uuid.uuid4()),
 62            saga_type=self.saga_type,
 63            state=SagaState.STARTED,
 64            data=data,
 65            steps=self.define_steps(data)
 66        )
 67        await self.saga_store.save(saga)
 68        await self._execute_next_step(saga)
 69        return saga
 70
 71    async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
 72        """Handle successful step completion."""
 73        saga = await self.saga_store.get(saga_id)
 74
 75        # Update step
 76        for step in saga.steps:
 77            if step.name == step_name:
 78                step.status = "completed"
 79                step.result = result
 80                step.executed_at = datetime.utcnow()
 81                break
 82
 83        saga.current_step += 1
 84        saga.updated_at = datetime.utcnow()
 85
 86        # Check if saga is complete
 87        if saga.current_step >= len(saga.steps):
 88            saga.state = SagaState.COMPLETED
 89            await self.saga_store.save(saga)
 90            await self._on_saga_completed(saga)
 91        else:
 92            saga.state = SagaState.PENDING
 93            await self.saga_store.save(saga)
 94            await self._execute_next_step(saga)
 95
 96    async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
 97        """Handle step failure - start compensation."""
 98        saga = await self.saga_store.get(saga_id)
 99
100        # Mark step as failed
101        for step in saga.steps:
102            if step.name == step_name:
103                step.status = "failed"
104                step.error = error
105                break
106
107        saga.state = SagaState.COMPENSATING
108        saga.updated_at = datetime.utcnow()
109        await self.saga_store.save(saga)
110
111        # Start compensation from current step backwards
112        await self._compensate(saga)
113
114    async def _execute_next_step(self, saga: Saga):
115        """Execute the next step in the saga."""
116        if saga.current_step >= len(saga.steps):
117            return
118
119        step = saga.steps[saga.current_step]
120        step.status = "executing"
121        await self.saga_store.save(saga)
122
123        # Publish command to execute step
124        await self.event_publisher.publish(
125            step.action,
126            {
127                "saga_id": saga.saga_id,
128                "step_name": step.name,
129                **saga.data
130            }
131        )
132
133    async def _compensate(self, saga: Saga):
134        """Execute compensation for completed steps."""
135        # Compensate in reverse order
136        for i in range(saga.current_step - 1, -1, -1):
137            step = saga.steps[i]
138            if step.status == "completed":
139                step.status = "compensating"
140                await self.saga_store.save(saga)
141
142                await self.event_publisher.publish(
143                    step.compensation,
144                    {
145                        "saga_id": saga.saga_id,
146                        "step_name": step.name,
147                        "original_result": step.result,
148                        **saga.data
149                    }
150                )
151
152    async def handle_compensation_completed(self, saga_id: str, step_name: str):
153        """Handle compensation completion."""
154        saga = await self.saga_store.get(saga_id)
155
156        for step in saga.steps:
157            if step.name == step_name:
158                step.status = "compensated"
159                step.compensated_at = datetime.utcnow()
160                break
161
162        # Check if all compensations complete
163        all_compensated = all(
164            s.status in ("compensated", "pending", "failed")
165            for s in saga.steps
166        )
167
168        if all_compensated:
169            saga.state = SagaState.FAILED
170            await self._on_saga_failed(saga)
171
172        await self.saga_store.save(saga)
173
174    async def _on_saga_completed(self, saga: Saga):
175        """Called when saga completes successfully."""
176        await self.event_publisher.publish(
177            f"{self.saga_type}Completed",
178            {"saga_id": saga.saga_id, **saga.data}
179        )
180
181    async def _on_saga_failed(self, saga: Saga):
182        """Called when saga fails after compensation."""
183        await self.event_publisher.publish(
184            f"{self.saga_type}Failed",
185            {"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
186        )

Template 2: Order Fulfillment Saga

 1class OrderFulfillmentSaga(SagaOrchestrator):
 2    """Orchestrates order fulfillment across services."""
 3
 4    @property
 5    def saga_type(self) -> str:
 6        return "OrderFulfillment"
 7
 8    def define_steps(self, data: Dict) -> List[SagaStep]:
 9        return [
10            SagaStep(
11                name="reserve_inventory",
12                action="InventoryService.ReserveItems",
13                compensation="InventoryService.ReleaseReservation"
14            ),
15            SagaStep(
16                name="process_payment",
17                action="PaymentService.ProcessPayment",
18                compensation="PaymentService.RefundPayment"
19            ),
20            SagaStep(
21                name="create_shipment",
22                action="ShippingService.CreateShipment",
23                compensation="ShippingService.CancelShipment"
24            ),
25            SagaStep(
26                name="send_confirmation",
27                action="NotificationService.SendOrderConfirmation",
28                compensation="NotificationService.SendCancellationNotice"
29            )
30        ]
31
32
33# Usage
34async def create_order(order_data: Dict):
35    saga = OrderFulfillmentSaga(saga_store, event_publisher)
36    return await saga.start({
37        "order_id": order_data["order_id"],
38        "customer_id": order_data["customer_id"],
39        "items": order_data["items"],
40        "payment_method": order_data["payment_method"],
41        "shipping_address": order_data["shipping_address"]
42    })
43
44
45# Event handlers in each service
46class InventoryService:
47    async def handle_reserve_items(self, command: Dict):
48        try:
49            # Reserve inventory
50            reservation = await self.reserve(
51                command["items"],
52                command["order_id"]
53            )
54            # Report success
55            await self.event_publisher.publish(
56                "SagaStepCompleted",
57                {
58                    "saga_id": command["saga_id"],
59                    "step_name": "reserve_inventory",
60                    "result": {"reservation_id": reservation.id}
61                }
62            )
63        except InsufficientInventoryError as e:
64            await self.event_publisher.publish(
65                "SagaStepFailed",
66                {
67                    "saga_id": command["saga_id"],
68                    "step_name": "reserve_inventory",
69                    "error": str(e)
70                }
71            )
72
73    async def handle_release_reservation(self, command: Dict):
74        # Compensating action
75        await self.release_reservation(
76            command["original_result"]["reservation_id"]
77        )
78        await self.event_publisher.publish(
79            "SagaCompensationCompleted",
80            {
81                "saga_id": command["saga_id"],
82                "step_name": "reserve_inventory"
83            }
84        )

Template 3: Choreography-Based Saga

 1from dataclasses import dataclass
 2from typing import Dict, Any
 3import asyncio
 4
 5@dataclass
 6class SagaContext:
 7    """Passed through choreographed saga events."""
 8    saga_id: str
 9    step: int
10    data: Dict[str, Any]
11    completed_steps: list
12
13
14class OrderChoreographySaga:
15    """Choreography-based saga using events."""
16
17    def __init__(self, event_bus):
18        self.event_bus = event_bus
19        self._register_handlers()
20
21    def _register_handlers(self):
22        self.event_bus.subscribe("OrderCreated", self._on_order_created)
23        self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
24        self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
25        self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
26
27        # Compensation handlers
28        self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
29        self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
30
31    async def _on_order_created(self, event: Dict):
32        """Step 1: Order created, reserve inventory."""
33        await self.event_bus.publish("ReserveInventory", {
34            "saga_id": event["order_id"],
35            "order_id": event["order_id"],
36            "items": event["items"]
37        })
38
39    async def _on_inventory_reserved(self, event: Dict):
40        """Step 2: Inventory reserved, process payment."""
41        await self.event_bus.publish("ProcessPayment", {
42            "saga_id": event["saga_id"],
43            "order_id": event["order_id"],
44            "amount": event["total_amount"],
45            "reservation_id": event["reservation_id"]
46        })
47
48    async def _on_payment_processed(self, event: Dict):
49        """Step 3: Payment done, create shipment."""
50        await self.event_bus.publish("CreateShipment", {
51            "saga_id": event["saga_id"],
52            "order_id": event["order_id"],
53            "payment_id": event["payment_id"]
54        })
55
56    async def _on_shipment_created(self, event: Dict):
57        """Step 4: Complete - send confirmation."""
58        await self.event_bus.publish("OrderFulfilled", {
59            "saga_id": event["saga_id"],
60            "order_id": event["order_id"],
61            "tracking_number": event["tracking_number"]
62        })
63
64    # Compensation handlers
65    async def _on_payment_failed(self, event: Dict):
66        """Payment failed - release inventory."""
67        await self.event_bus.publish("ReleaseInventory", {
68            "saga_id": event["saga_id"],
69            "reservation_id": event["reservation_id"]
70        })
71        await self.event_bus.publish("OrderFailed", {
72            "order_id": event["order_id"],
73            "reason": "Payment failed"
74        })
75
76    async def _on_shipment_failed(self, event: Dict):
77        """Shipment failed - refund payment and release inventory."""
78        await self.event_bus.publish("RefundPayment", {
79            "saga_id": event["saga_id"],
80            "payment_id": event["payment_id"]
81        })
82        await self.event_bus.publish("ReleaseInventory", {
83            "saga_id": event["saga_id"],
84            "reservation_id": event["reservation_id"]
85        })

Template 4: Saga with Timeouts

 1class TimeoutSagaOrchestrator(SagaOrchestrator):
 2    """Saga orchestrator with step timeouts."""
 3
 4    def __init__(self, saga_store, event_publisher, scheduler):
 5        super().__init__(saga_store, event_publisher)
 6        self.scheduler = scheduler
 7
 8    async def _execute_next_step(self, saga: Saga):
 9        if saga.current_step >= len(saga.steps):
10            return
11
12        step = saga.steps[saga.current_step]
13        step.status = "executing"
14        step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
15        await self.saga_store.save(saga)
16
17        # Schedule timeout check
18        await self.scheduler.schedule(
19            f"saga_timeout_{saga.saga_id}_{step.name}",
20            self._check_timeout,
21            {"saga_id": saga.saga_id, "step_name": step.name},
22            run_at=step.timeout_at
23        )
24
25        await self.event_publisher.publish(
26            step.action,
27            {"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
28        )
29
30    async def _check_timeout(self, data: Dict):
31        """Check if step has timed out."""
32        saga = await self.saga_store.get(data["saga_id"])
33        step = next(s for s in saga.steps if s.name == data["step_name"])
34
35        if step.status == "executing":
36            # Step timed out - fail it
37            await self.handle_step_failed(
38                data["saga_id"],
39                data["step_name"],
40                "Step timed out"
41            )

Best Practices

Do’s

  • Make steps idempotent - Safe to retry
  • Design compensations carefully - They must work
  • Use correlation IDs - For tracing across services
  • Implement timeouts - Don’t wait forever
  • Log everything - For debugging failures

Don’ts

  • Don’t assume instant completion - Sagas take time
  • Don’t skip compensation testing - Most critical part
  • Don’t couple services - Use async messaging
  • Don’t ignore partial failures - Handle gracefully

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Python 3.8+
Async/await support
Event publishing system
Persistent saga store

Framework Support

AsyncIO ✓ (recommended) Event-driven architectures ✓ Message brokers (RabbitMQ, Kafka) ✓ Database persistence layer ✓

Context Window

Token Usage ~3K-8K tokens for complete saga implementations

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
architecture-patterns