Cqrs Implementation

Separate read and write models for scalable, high-performance applications

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Implement Command Query Responsibility Segregation for scalable architectures. Use when separating read and write models, optimizing query performance, or building event-sourced systems.

cqrs architecture event-sourcing scalability microservices command-query read-models backend
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

I need to design an order management system that can handle 10,000 orders per hour while providing instant product search and inventory lookups. How should I implement CQRS?

Skill Processing

Analyzing request...

Agent Response

Complete CQRS architecture with separate command handlers for orders, optimized read models for product queries, and event-driven synchronization between write and read sides

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install cqrs-implementation

claude-code skill install cqrs-implementation
2

Config

3

First Trigger

@cqrs-implementation help

Commands

CommandDescriptionRequired Args
@cqrs-implementation high-volume-e-commerce-platformImplement CQRS to handle thousands of order commands while maintaining fast product catalog queriesNone
@cqrs-implementation real-time-analytics-dashboardBuild systems where complex reporting queries don't impact transactional performanceNone
@cqrs-implementation event-sourced-financial-systemCreate audit-compliant systems with complete event history and eventual consistencyNone

Typical Use Cases

High-Volume E-commerce Platform

Implement CQRS to handle thousands of order commands while maintaining fast product catalog queries

Real-time Analytics Dashboard

Build systems where complex reporting queries don't impact transactional performance

Event-Sourced Financial System

Create audit-compliant systems with complete event history and eventual consistency

Overview

CQRS Implementation

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.

When to Use This Skill

  • Separating read and write concerns
  • Scaling reads independently from writes
  • Building event-sourced systems
  • Optimizing complex query scenarios
  • Different read/write data models needed
  • High-performance reporting requirements

Core Concepts

1. CQRS Architecture

                    ┌─────────────┐
                    │   Client    │
                    └──────┬──────┘
                           │
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Command    │          │   Query     │
       │  Handlers   │          │  Handlers   │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘

2. Key Components

ComponentResponsibility
CommandIntent to change state
Command HandlerValidates and executes commands
EventRecord of state change
QueryRequest for data
Query HandlerRetrieves data from read model
ProjectorUpdates read model from events

Templates

Template 1: Command Infrastructure

 1from abc import ABC, abstractmethod
 2from dataclasses import dataclass
 3from typing import TypeVar, Generic, Dict, Any, Type
 4from datetime import datetime
 5import uuid
 6
 7# Command base
 8@dataclass
 9class Command:
10    command_id: str = None
11    timestamp: datetime = None
12
13    def __post_init__(self):
14        self.command_id = self.command_id or str(uuid.uuid4())
15        self.timestamp = self.timestamp or datetime.utcnow()
16
17
18# Concrete commands
19@dataclass
20class CreateOrder(Command):
21    customer_id: str
22    items: list
23    shipping_address: dict
24
25
26@dataclass
27class AddOrderItem(Command):
28    order_id: str
29    product_id: str
30    quantity: int
31    price: float
32
33
34@dataclass
35class CancelOrder(Command):
36    order_id: str
37    reason: str
38
39
40# Command handler base
41T = TypeVar('T', bound=Command)
42
43class CommandHandler(ABC, Generic[T]):
44    @abstractmethod
45    async def handle(self, command: T) -> Any:
46        pass
47
48
49# Command bus
50class CommandBus:
51    def __init__(self):
52        self._handlers: Dict[Type[Command], CommandHandler] = {}
53
54    def register(self, command_type: Type[Command], handler: CommandHandler):
55        self._handlers[command_type] = handler
56
57    async def dispatch(self, command: Command) -> Any:
58        handler = self._handlers.get(type(command))
59        if not handler:
60            raise ValueError(f"No handler for {type(command).__name__}")
61        return await handler.handle(command)
62
63
64# Command handler implementation
65class CreateOrderHandler(CommandHandler[CreateOrder]):
66    def __init__(self, order_repository, event_store):
67        self.order_repository = order_repository
68        self.event_store = event_store
69
70    async def handle(self, command: CreateOrder) -> str:
71        # Validate
72        if not command.items:
73            raise ValueError("Order must have at least one item")
74
75        # Create aggregate
76        order = Order.create(
77            customer_id=command.customer_id,
78            items=command.items,
79            shipping_address=command.shipping_address
80        )
81
82        # Persist events
83        await self.event_store.append_events(
84            stream_id=f"Order-{order.id}",
85            stream_type="Order",
86            events=order.uncommitted_events
87        )
88
89        return order.id

Template 2: Query Infrastructure

  1from abc import ABC, abstractmethod
  2from dataclasses import dataclass
  3from typing import TypeVar, Generic, List, Optional
  4
  5# Query base
  6@dataclass
  7class Query:
  8    pass
  9
 10
 11# Concrete queries
 12@dataclass
 13class GetOrderById(Query):
 14    order_id: str
 15
 16
 17@dataclass
 18class GetCustomerOrders(Query):
 19    customer_id: str
 20    status: Optional[str] = None
 21    page: int = 1
 22    page_size: int = 20
 23
 24
 25@dataclass
 26class SearchOrders(Query):
 27    query: str
 28    filters: dict = None
 29    sort_by: str = "created_at"
 30    sort_order: str = "desc"
 31
 32
 33# Query result types
 34@dataclass
 35class OrderView:
 36    order_id: str
 37    customer_id: str
 38    status: str
 39    total_amount: float
 40    item_count: int
 41    created_at: datetime
 42    shipped_at: Optional[datetime] = None
 43
 44
 45@dataclass
 46class PaginatedResult(Generic[T]):
 47    items: List[T]
 48    total: int
 49    page: int
 50    page_size: int
 51
 52    @property
 53    def total_pages(self) -> int:
 54        return (self.total + self.page_size - 1) // self.page_size
 55
 56
 57# Query handler base
 58T = TypeVar('T', bound=Query)
 59R = TypeVar('R')
 60
 61class QueryHandler(ABC, Generic[T, R]):
 62    @abstractmethod
 63    async def handle(self, query: T) -> R:
 64        pass
 65
 66
 67# Query bus
 68class QueryBus:
 69    def __init__(self):
 70        self._handlers: Dict[Type[Query], QueryHandler] = {}
 71
 72    def register(self, query_type: Type[Query], handler: QueryHandler):
 73        self._handlers[query_type] = handler
 74
 75    async def dispatch(self, query: Query) -> Any:
 76        handler = self._handlers.get(type(query))
 77        if not handler:
 78            raise ValueError(f"No handler for {type(query).__name__}")
 79        return await handler.handle(query)
 80
 81
 82# Query handler implementation
 83class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
 84    def __init__(self, read_db):
 85        self.read_db = read_db
 86
 87    async def handle(self, query: GetOrderById) -> Optional[OrderView]:
 88        async with self.read_db.acquire() as conn:
 89            row = await conn.fetchrow(
 90                """
 91                SELECT order_id, customer_id, status, total_amount,
 92                       item_count, created_at, shipped_at
 93                FROM order_views
 94                WHERE order_id = $1
 95                """,
 96                query.order_id
 97            )
 98            if row:
 99                return OrderView(**dict(row))
100            return None
101
102
103class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
104    def __init__(self, read_db):
105        self.read_db = read_db
106
107    async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
108        async with self.read_db.acquire() as conn:
109            # Build query with optional status filter
110            where_clause = "customer_id = $1"
111            params = [query.customer_id]
112
113            if query.status:
114                where_clause += " AND status = $2"
115                params.append(query.status)
116
117            # Get total count
118            total = await conn.fetchval(
119                f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
120                *params
121            )
122
123            # Get paginated results
124            offset = (query.page - 1) * query.page_size
125            rows = await conn.fetch(
126                f"""
127                SELECT order_id, customer_id, status, total_amount,
128                       item_count, created_at, shipped_at
129                FROM order_views
130                WHERE {where_clause}
131                ORDER BY created_at DESC
132                LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
133                """,
134                *params, query.page_size, offset
135            )
136
137            return PaginatedResult(
138                items=[OrderView(**dict(row)) for row in rows],
139                total=total,
140                page=query.page,
141                page_size=query.page_size
142            )

Template 3: FastAPI CQRS Application

  1from fastapi import FastAPI, HTTPException, Depends
  2from pydantic import BaseModel
  3from typing import List, Optional
  4
  5app = FastAPI()
  6
  7# Request/Response models
  8class CreateOrderRequest(BaseModel):
  9    customer_id: str
 10    items: List[dict]
 11    shipping_address: dict
 12
 13
 14class OrderResponse(BaseModel):
 15    order_id: str
 16    customer_id: str
 17    status: str
 18    total_amount: float
 19    item_count: int
 20    created_at: datetime
 21
 22
 23# Dependency injection
 24def get_command_bus() -> CommandBus:
 25    return app.state.command_bus
 26
 27
 28def get_query_bus() -> QueryBus:
 29    return app.state.query_bus
 30
 31
 32# Command endpoints (POST, PUT, DELETE)
 33@app.post("/orders", response_model=dict)
 34async def create_order(
 35    request: CreateOrderRequest,
 36    command_bus: CommandBus = Depends(get_command_bus)
 37):
 38    command = CreateOrder(
 39        customer_id=request.customer_id,
 40        items=request.items,
 41        shipping_address=request.shipping_address
 42    )
 43    order_id = await command_bus.dispatch(command)
 44    return {"order_id": order_id}
 45
 46
 47@app.post("/orders/{order_id}/items")
 48async def add_item(
 49    order_id: str,
 50    product_id: str,
 51    quantity: int,
 52    price: float,
 53    command_bus: CommandBus = Depends(get_command_bus)
 54):
 55    command = AddOrderItem(
 56        order_id=order_id,
 57        product_id=product_id,
 58        quantity=quantity,
 59        price=price
 60    )
 61    await command_bus.dispatch(command)
 62    return {"status": "item_added"}
 63
 64
 65@app.delete("/orders/{order_id}")
 66async def cancel_order(
 67    order_id: str,
 68    reason: str,
 69    command_bus: CommandBus = Depends(get_command_bus)
 70):
 71    command = CancelOrder(order_id=order_id, reason=reason)
 72    await command_bus.dispatch(command)
 73    return {"status": "cancelled"}
 74
 75
 76# Query endpoints (GET)
 77@app.get("/orders/{order_id}", response_model=OrderResponse)
 78async def get_order(
 79    order_id: str,
 80    query_bus: QueryBus = Depends(get_query_bus)
 81):
 82    query = GetOrderById(order_id=order_id)
 83    result = await query_bus.dispatch(query)
 84    if not result:
 85        raise HTTPException(status_code=404, detail="Order not found")
 86    return result
 87
 88
 89@app.get("/customers/{customer_id}/orders")
 90async def get_customer_orders(
 91    customer_id: str,
 92    status: Optional[str] = None,
 93    page: int = 1,
 94    page_size: int = 20,
 95    query_bus: QueryBus = Depends(get_query_bus)
 96):
 97    query = GetCustomerOrders(
 98        customer_id=customer_id,
 99        status=status,
100        page=page,
101        page_size=page_size
102    )
103    return await query_bus.dispatch(query)
104
105
106@app.get("/orders/search")
107async def search_orders(
108    q: str,
109    sort_by: str = "created_at",
110    query_bus: QueryBus = Depends(get_query_bus)
111):
112    query = SearchOrders(query=q, sort_by=sort_by)
113    return await query_bus.dispatch(query)

Template 4: Read Model Synchronization

 1class ReadModelSynchronizer:
 2    """Keeps read models in sync with events."""
 3
 4    def __init__(self, event_store, read_db, projections: List[Projection]):
 5        self.event_store = event_store
 6        self.read_db = read_db
 7        self.projections = {p.name: p for p in projections}
 8
 9    async def run(self):
10        """Continuously sync read models."""
11        while True:
12            for name, projection in self.projections.items():
13                await self._sync_projection(projection)
14            await asyncio.sleep(0.1)
15
16    async def _sync_projection(self, projection: Projection):
17        checkpoint = await self._get_checkpoint(projection.name)
18
19        events = await self.event_store.read_all(
20            from_position=checkpoint,
21            limit=100
22        )
23
24        for event in events:
25            if event.event_type in projection.handles():
26                try:
27                    await projection.apply(event)
28                except Exception as e:
29                    # Log error, possibly retry or skip
30                    logger.error(f"Projection error: {e}")
31                    continue
32
33            await self._save_checkpoint(projection.name, event.global_position)
34
35    async def rebuild_projection(self, projection_name: str):
36        """Rebuild a projection from scratch."""
37        projection = self.projections[projection_name]
38
39        # Clear existing data
40        await projection.clear()
41
42        # Reset checkpoint
43        await self._save_checkpoint(projection_name, 0)
44
45        # Rebuild
46        while True:
47            checkpoint = await self._get_checkpoint(projection_name)
48            events = await self.event_store.read_all(checkpoint, 1000)
49
50            if not events:
51                break
52
53            for event in events:
54                if event.event_type in projection.handles():
55                    await projection.apply(event)
56
57            await self._save_checkpoint(
58                projection_name,
59                events[-1].global_position
60            )

Template 5: Eventual Consistency Handling

 1class ConsistentQueryHandler:
 2    """Query handler that can wait for consistency."""
 3
 4    def __init__(self, read_db, event_store):
 5        self.read_db = read_db
 6        self.event_store = event_store
 7
 8    async def query_after_command(
 9        self,
10        query: Query,
11        expected_version: int,
12        stream_id: str,
13        timeout: float = 5.0
14    ):
15        """
16        Execute query, ensuring read model is at expected version.
17        Used for read-your-writes consistency.
18        """
19        start_time = time.time()
20
21        while time.time() - start_time < timeout:
22            # Check if read model is caught up
23            projection_version = await self._get_projection_version(stream_id)
24
25            if projection_version >= expected_version:
26                return await self.execute_query(query)
27
28            # Wait a bit and retry
29            await asyncio.sleep(0.1)
30
31        # Timeout - return stale data with warning
32        return {
33            "data": await self.execute_query(query),
34            "_warning": "Data may be stale"
35        }
36
37    async def _get_projection_version(self, stream_id: str) -> int:
38        """Get the last processed event version for a stream."""
39        async with self.read_db.acquire() as conn:
40            return await conn.fetchval(
41                "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
42                stream_id
43            ) or 0

Best Practices

Do’s

  • Separate command and query models - Different needs
  • Use eventual consistency - Accept propagation delay
  • Validate in command handlers - Before state change
  • Denormalize read models - Optimize for queries
  • Version your events - For schema evolution

Don’ts

  • Don’t query in commands - Use only for writes
  • Don’t couple read/write schemas - Independent evolution
  • Don’t over-engineer - Start simple
  • Don’t ignore consistency SLAs - Define acceptable lag

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Python 3.8+
FastAPI or similar async web framework
PostgreSQL or similar RDBMS
asyncio support

Framework Support

FastAPI ✓ (recommended) Django ✓ Flask ✓ SQLAlchemy ✓ asyncpg ✓

Context Window

Token Usage ~3K-8K tokens for complete CQRS implementations

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
architecture-patterns