Cqrs Implementation
Separate read and write models for scalable, high-performance applications
✨ The solution you've been looking for
Implement Command Query Responsibility Segregation for scalable architectures. Use when separating read and write models, optimizing query performance, or building event-sourced systems.
See It In Action
Interactive preview & real-world examples
AI Conversation Simulator
See how users interact with this skill
User Prompt
I need to design an order management system that can handle 10,000 orders per hour while providing instant product search and inventory lookups. How should I implement CQRS?
Skill Processing
Analyzing request...
Agent Response
Complete CQRS architecture with separate command handlers for orders, optimized read models for product queries, and event-driven synchronization between write and read sides
Quick Start (3 Steps)
Get up and running in minutes
Install
claude-code skill install cqrs-implementation
claude-code skill install cqrs-implementationConfig
First Trigger
@cqrs-implementation helpCommands
| Command | Description | Required Args |
|---|---|---|
| @cqrs-implementation high-volume-e-commerce-platform | Implement CQRS to handle thousands of order commands while maintaining fast product catalog queries | None |
| @cqrs-implementation real-time-analytics-dashboard | Build systems where complex reporting queries don't impact transactional performance | None |
| @cqrs-implementation event-sourced-financial-system | Create audit-compliant systems with complete event history and eventual consistency | None |
Typical Use Cases
High-Volume E-commerce Platform
Implement CQRS to handle thousands of order commands while maintaining fast product catalog queries
Real-time Analytics Dashboard
Build systems where complex reporting queries don't impact transactional performance
Event-Sourced Financial System
Create audit-compliant systems with complete event history and eventual consistency
Overview
CQRS Implementation
Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.
When to Use This Skill
- Separating read and write concerns
- Scaling reads independently from writes
- Building event-sourced systems
- Optimizing complex query scenarios
- Different read/write data models needed
- High-performance reporting requirements
Core Concepts
1. CQRS Architecture
┌─────────────┐
│ Client │
└──────┬──────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘
2. Key Components
| Component | Responsibility |
|---|---|
| Command | Intent to change state |
| Command Handler | Validates and executes commands |
| Event | Record of state change |
| Query | Request for data |
| Query Handler | Retrieves data from read model |
| Projector | Updates read model from events |
Templates
Template 1: Command Infrastructure
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import TypeVar, Generic, Dict, Any, Type
4from datetime import datetime
5import uuid
6
7# Command base
8@dataclass
9class Command:
10 command_id: str = None
11 timestamp: datetime = None
12
13 def __post_init__(self):
14 self.command_id = self.command_id or str(uuid.uuid4())
15 self.timestamp = self.timestamp or datetime.utcnow()
16
17
18# Concrete commands
19@dataclass
20class CreateOrder(Command):
21 customer_id: str
22 items: list
23 shipping_address: dict
24
25
26@dataclass
27class AddOrderItem(Command):
28 order_id: str
29 product_id: str
30 quantity: int
31 price: float
32
33
34@dataclass
35class CancelOrder(Command):
36 order_id: str
37 reason: str
38
39
40# Command handler base
41T = TypeVar('T', bound=Command)
42
43class CommandHandler(ABC, Generic[T]):
44 @abstractmethod
45 async def handle(self, command: T) -> Any:
46 pass
47
48
49# Command bus
50class CommandBus:
51 def __init__(self):
52 self._handlers: Dict[Type[Command], CommandHandler] = {}
53
54 def register(self, command_type: Type[Command], handler: CommandHandler):
55 self._handlers[command_type] = handler
56
57 async def dispatch(self, command: Command) -> Any:
58 handler = self._handlers.get(type(command))
59 if not handler:
60 raise ValueError(f"No handler for {type(command).__name__}")
61 return await handler.handle(command)
62
63
64# Command handler implementation
65class CreateOrderHandler(CommandHandler[CreateOrder]):
66 def __init__(self, order_repository, event_store):
67 self.order_repository = order_repository
68 self.event_store = event_store
69
70 async def handle(self, command: CreateOrder) -> str:
71 # Validate
72 if not command.items:
73 raise ValueError("Order must have at least one item")
74
75 # Create aggregate
76 order = Order.create(
77 customer_id=command.customer_id,
78 items=command.items,
79 shipping_address=command.shipping_address
80 )
81
82 # Persist events
83 await self.event_store.append_events(
84 stream_id=f"Order-{order.id}",
85 stream_type="Order",
86 events=order.uncommitted_events
87 )
88
89 return order.id
Template 2: Query Infrastructure
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import TypeVar, Generic, List, Optional
4
5# Query base
6@dataclass
7class Query:
8 pass
9
10
11# Concrete queries
12@dataclass
13class GetOrderById(Query):
14 order_id: str
15
16
17@dataclass
18class GetCustomerOrders(Query):
19 customer_id: str
20 status: Optional[str] = None
21 page: int = 1
22 page_size: int = 20
23
24
25@dataclass
26class SearchOrders(Query):
27 query: str
28 filters: dict = None
29 sort_by: str = "created_at"
30 sort_order: str = "desc"
31
32
33# Query result types
34@dataclass
35class OrderView:
36 order_id: str
37 customer_id: str
38 status: str
39 total_amount: float
40 item_count: int
41 created_at: datetime
42 shipped_at: Optional[datetime] = None
43
44
45@dataclass
46class PaginatedResult(Generic[T]):
47 items: List[T]
48 total: int
49 page: int
50 page_size: int
51
52 @property
53 def total_pages(self) -> int:
54 return (self.total + self.page_size - 1) // self.page_size
55
56
57# Query handler base
58T = TypeVar('T', bound=Query)
59R = TypeVar('R')
60
61class QueryHandler(ABC, Generic[T, R]):
62 @abstractmethod
63 async def handle(self, query: T) -> R:
64 pass
65
66
67# Query bus
68class QueryBus:
69 def __init__(self):
70 self._handlers: Dict[Type[Query], QueryHandler] = {}
71
72 def register(self, query_type: Type[Query], handler: QueryHandler):
73 self._handlers[query_type] = handler
74
75 async def dispatch(self, query: Query) -> Any:
76 handler = self._handlers.get(type(query))
77 if not handler:
78 raise ValueError(f"No handler for {type(query).__name__}")
79 return await handler.handle(query)
80
81
82# Query handler implementation
83class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
84 def __init__(self, read_db):
85 self.read_db = read_db
86
87 async def handle(self, query: GetOrderById) -> Optional[OrderView]:
88 async with self.read_db.acquire() as conn:
89 row = await conn.fetchrow(
90 """
91 SELECT order_id, customer_id, status, total_amount,
92 item_count, created_at, shipped_at
93 FROM order_views
94 WHERE order_id = $1
95 """,
96 query.order_id
97 )
98 if row:
99 return OrderView(**dict(row))
100 return None
101
102
103class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
104 def __init__(self, read_db):
105 self.read_db = read_db
106
107 async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
108 async with self.read_db.acquire() as conn:
109 # Build query with optional status filter
110 where_clause = "customer_id = $1"
111 params = [query.customer_id]
112
113 if query.status:
114 where_clause += " AND status = $2"
115 params.append(query.status)
116
117 # Get total count
118 total = await conn.fetchval(
119 f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
120 *params
121 )
122
123 # Get paginated results
124 offset = (query.page - 1) * query.page_size
125 rows = await conn.fetch(
126 f"""
127 SELECT order_id, customer_id, status, total_amount,
128 item_count, created_at, shipped_at
129 FROM order_views
130 WHERE {where_clause}
131 ORDER BY created_at DESC
132 LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
133 """,
134 *params, query.page_size, offset
135 )
136
137 return PaginatedResult(
138 items=[OrderView(**dict(row)) for row in rows],
139 total=total,
140 page=query.page,
141 page_size=query.page_size
142 )
Template 3: FastAPI CQRS Application
1from fastapi import FastAPI, HTTPException, Depends
2from pydantic import BaseModel
3from typing import List, Optional
4
5app = FastAPI()
6
7# Request/Response models
8class CreateOrderRequest(BaseModel):
9 customer_id: str
10 items: List[dict]
11 shipping_address: dict
12
13
14class OrderResponse(BaseModel):
15 order_id: str
16 customer_id: str
17 status: str
18 total_amount: float
19 item_count: int
20 created_at: datetime
21
22
23# Dependency injection
24def get_command_bus() -> CommandBus:
25 return app.state.command_bus
26
27
28def get_query_bus() -> QueryBus:
29 return app.state.query_bus
30
31
32# Command endpoints (POST, PUT, DELETE)
33@app.post("/orders", response_model=dict)
34async def create_order(
35 request: CreateOrderRequest,
36 command_bus: CommandBus = Depends(get_command_bus)
37):
38 command = CreateOrder(
39 customer_id=request.customer_id,
40 items=request.items,
41 shipping_address=request.shipping_address
42 )
43 order_id = await command_bus.dispatch(command)
44 return {"order_id": order_id}
45
46
47@app.post("/orders/{order_id}/items")
48async def add_item(
49 order_id: str,
50 product_id: str,
51 quantity: int,
52 price: float,
53 command_bus: CommandBus = Depends(get_command_bus)
54):
55 command = AddOrderItem(
56 order_id=order_id,
57 product_id=product_id,
58 quantity=quantity,
59 price=price
60 )
61 await command_bus.dispatch(command)
62 return {"status": "item_added"}
63
64
65@app.delete("/orders/{order_id}")
66async def cancel_order(
67 order_id: str,
68 reason: str,
69 command_bus: CommandBus = Depends(get_command_bus)
70):
71 command = CancelOrder(order_id=order_id, reason=reason)
72 await command_bus.dispatch(command)
73 return {"status": "cancelled"}
74
75
76# Query endpoints (GET)
77@app.get("/orders/{order_id}", response_model=OrderResponse)
78async def get_order(
79 order_id: str,
80 query_bus: QueryBus = Depends(get_query_bus)
81):
82 query = GetOrderById(order_id=order_id)
83 result = await query_bus.dispatch(query)
84 if not result:
85 raise HTTPException(status_code=404, detail="Order not found")
86 return result
87
88
89@app.get("/customers/{customer_id}/orders")
90async def get_customer_orders(
91 customer_id: str,
92 status: Optional[str] = None,
93 page: int = 1,
94 page_size: int = 20,
95 query_bus: QueryBus = Depends(get_query_bus)
96):
97 query = GetCustomerOrders(
98 customer_id=customer_id,
99 status=status,
100 page=page,
101 page_size=page_size
102 )
103 return await query_bus.dispatch(query)
104
105
106@app.get("/orders/search")
107async def search_orders(
108 q: str,
109 sort_by: str = "created_at",
110 query_bus: QueryBus = Depends(get_query_bus)
111):
112 query = SearchOrders(query=q, sort_by=sort_by)
113 return await query_bus.dispatch(query)
Template 4: Read Model Synchronization
1class ReadModelSynchronizer:
2 """Keeps read models in sync with events."""
3
4 def __init__(self, event_store, read_db, projections: List[Projection]):
5 self.event_store = event_store
6 self.read_db = read_db
7 self.projections = {p.name: p for p in projections}
8
9 async def run(self):
10 """Continuously sync read models."""
11 while True:
12 for name, projection in self.projections.items():
13 await self._sync_projection(projection)
14 await asyncio.sleep(0.1)
15
16 async def _sync_projection(self, projection: Projection):
17 checkpoint = await self._get_checkpoint(projection.name)
18
19 events = await self.event_store.read_all(
20 from_position=checkpoint,
21 limit=100
22 )
23
24 for event in events:
25 if event.event_type in projection.handles():
26 try:
27 await projection.apply(event)
28 except Exception as e:
29 # Log error, possibly retry or skip
30 logger.error(f"Projection error: {e}")
31 continue
32
33 await self._save_checkpoint(projection.name, event.global_position)
34
35 async def rebuild_projection(self, projection_name: str):
36 """Rebuild a projection from scratch."""
37 projection = self.projections[projection_name]
38
39 # Clear existing data
40 await projection.clear()
41
42 # Reset checkpoint
43 await self._save_checkpoint(projection_name, 0)
44
45 # Rebuild
46 while True:
47 checkpoint = await self._get_checkpoint(projection_name)
48 events = await self.event_store.read_all(checkpoint, 1000)
49
50 if not events:
51 break
52
53 for event in events:
54 if event.event_type in projection.handles():
55 await projection.apply(event)
56
57 await self._save_checkpoint(
58 projection_name,
59 events[-1].global_position
60 )
Template 5: Eventual Consistency Handling
1class ConsistentQueryHandler:
2 """Query handler that can wait for consistency."""
3
4 def __init__(self, read_db, event_store):
5 self.read_db = read_db
6 self.event_store = event_store
7
8 async def query_after_command(
9 self,
10 query: Query,
11 expected_version: int,
12 stream_id: str,
13 timeout: float = 5.0
14 ):
15 """
16 Execute query, ensuring read model is at expected version.
17 Used for read-your-writes consistency.
18 """
19 start_time = time.time()
20
21 while time.time() - start_time < timeout:
22 # Check if read model is caught up
23 projection_version = await self._get_projection_version(stream_id)
24
25 if projection_version >= expected_version:
26 return await self.execute_query(query)
27
28 # Wait a bit and retry
29 await asyncio.sleep(0.1)
30
31 # Timeout - return stale data with warning
32 return {
33 "data": await self.execute_query(query),
34 "_warning": "Data may be stale"
35 }
36
37 async def _get_projection_version(self, stream_id: str) -> int:
38 """Get the last processed event version for a stream."""
39 async with self.read_db.acquire() as conn:
40 return await conn.fetchval(
41 "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
42 stream_id
43 ) or 0
Best Practices
Do’s
- Separate command and query models - Different needs
- Use eventual consistency - Accept propagation delay
- Validate in command handlers - Before state change
- Denormalize read models - Optimize for queries
- Version your events - For schema evolution
Don’ts
- Don’t query in commands - Use only for writes
- Don’t couple read/write schemas - Independent evolution
- Don’t over-engineer - Start simple
- Don’t ignore consistency SLAs - Define acceptable lag
Resources
What Users Are Saying
Real feedback from the community
Environment Matrix
Dependencies
Framework Support
Context Window
Security & Privacy
Information
- Author
- wshobson
- Updated
- 2026-01-30
- Category
- architecture-patterns
Related Skills
Cqrs Implementation
Implement Command Query Responsibility Segregation for scalable architectures. Use when separating …
View Details →Api Design Principles
Master REST and GraphQL API design principles to build intuitive, scalable, and maintainable APIs …
View Details →Api Design Principles
Master REST and GraphQL API design principles to build intuitive, scalable, and maintainable APIs …
View Details →