Event Store Design
Design robust event stores for scalable event-sourced systems
✨ The solution you've been looking for
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.
See It In Action
Interactive preview & real-world examples
AI Conversation Simulator
See how users interact with this skill
User Prompt
I need to implement event sourcing for a high-volume e-commerce platform. Should I use EventStoreDB, PostgreSQL, or DynamoDB?
Skill Processing
Analyzing request...
Agent Response
Detailed comparison with pros/cons, capacity considerations, and implementation complexity analysis for each option
Quick Start (3 Steps)
Get up and running in minutes
Install
claude-code skill install event-store-design
claude-code skill install event-store-designConfig
First Trigger
@event-store-design helpCommands
| Command | Description | Required Args |
|---|---|---|
| @event-store-design technology-selection | Choose the right event store technology for your specific requirements and constraints | None |
| @event-store-design custom-event-store-implementation | Build a custom event store using existing databases with proper schema design and concurrency control | None |
| @event-store-design performance-optimization | Optimize event store performance for high-throughput scenarios with proper indexing and scaling strategies | None |
Typical Use Cases
Technology Selection
Choose the right event store technology for your specific requirements and constraints
Custom Event Store Implementation
Build a custom event store using existing databases with proper schema design and concurrency control
Performance Optimization
Optimize event store performance for high-throughput scenarios with proper indexing and scaling strategies
Overview
Event Store Design
Comprehensive guide to designing event stores for event-sourced applications.
When to Use This Skill
- Designing event sourcing infrastructure
- Choosing between event store technologies
- Implementing custom event stores
- Optimizing event storage and retrieval
- Setting up event store schemas
- Planning for event store scaling
Core Concepts
1. Event Store Architecture
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
2. Event Store Requirements
| Requirement | Description |
|---|---|
| Append-only | Events are immutable, only appends |
| Ordered | Per-stream and global ordering |
| Versioned | Optimistic concurrency control |
| Subscriptions | Real-time event notifications |
| Idempotent | Handle duplicate writes safely |
Technology Comparison
| Technology | Best For | Limitations |
|---|---|---|
| EventStoreDB | Pure event sourcing | Single-purpose |
| PostgreSQL | Existing Postgres stack | Manual implementation |
| Kafka | High-throughput streaming | Not ideal for per-stream queries |
| DynamoDB | Serverless, AWS-native | Query limitations |
| Marten | .NET ecosystems | .NET specific |
Templates
Template 1: PostgreSQL Event Store Schema
1-- Events table
2CREATE TABLE events (
3 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
4 stream_id VARCHAR(255) NOT NULL,
5 stream_type VARCHAR(255) NOT NULL,
6 event_type VARCHAR(255) NOT NULL,
7 event_data JSONB NOT NULL,
8 metadata JSONB DEFAULT '{}',
9 version BIGINT NOT NULL,
10 global_position BIGSERIAL,
11 created_at TIMESTAMPTZ DEFAULT NOW(),
12
13 CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
14);
15
16-- Index for stream queries
17CREATE INDEX idx_events_stream_id ON events(stream_id, version);
18
19-- Index for global subscription
20CREATE INDEX idx_events_global_position ON events(global_position);
21
22-- Index for event type queries
23CREATE INDEX idx_events_event_type ON events(event_type);
24
25-- Index for time-based queries
26CREATE INDEX idx_events_created_at ON events(created_at);
27
28-- Snapshots table
29CREATE TABLE snapshots (
30 stream_id VARCHAR(255) PRIMARY KEY,
31 stream_type VARCHAR(255) NOT NULL,
32 snapshot_data JSONB NOT NULL,
33 version BIGINT NOT NULL,
34 created_at TIMESTAMPTZ DEFAULT NOW()
35);
36
37-- Subscriptions checkpoint table
38CREATE TABLE subscription_checkpoints (
39 subscription_id VARCHAR(255) PRIMARY KEY,
40 last_position BIGINT NOT NULL DEFAULT 0,
41 updated_at TIMESTAMPTZ DEFAULT NOW()
42);
Template 2: Python Event Store Implementation
1from dataclasses import dataclass, field
2from datetime import datetime
3from typing import Any, Optional, List
4from uuid import UUID, uuid4
5import json
6import asyncpg
7
8@dataclass
9class Event:
10 stream_id: str
11 event_type: str
12 data: dict
13 metadata: dict = field(default_factory=dict)
14 event_id: UUID = field(default_factory=uuid4)
15 version: Optional[int] = None
16 global_position: Optional[int] = None
17 created_at: datetime = field(default_factory=datetime.utcnow)
18
19
20class EventStore:
21 def __init__(self, pool: asyncpg.Pool):
22 self.pool = pool
23
24 async def append_events(
25 self,
26 stream_id: str,
27 stream_type: str,
28 events: List[Event],
29 expected_version: Optional[int] = None
30 ) -> List[Event]:
31 """Append events to a stream with optimistic concurrency."""
32 async with self.pool.acquire() as conn:
33 async with conn.transaction():
34 # Check expected version
35 if expected_version is not None:
36 current = await conn.fetchval(
37 "SELECT MAX(version) FROM events WHERE stream_id = $1",
38 stream_id
39 )
40 current = current or 0
41 if current != expected_version:
42 raise ConcurrencyError(
43 f"Expected version {expected_version}, got {current}"
44 )
45
46 # Get starting version
47 start_version = await conn.fetchval(
48 "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
49 stream_id
50 )
51
52 # Insert events
53 saved_events = []
54 for i, event in enumerate(events):
55 event.version = start_version + i
56 row = await conn.fetchrow(
57 """
58 INSERT INTO events (id, stream_id, stream_type, event_type,
59 event_data, metadata, version, created_at)
60 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
61 RETURNING global_position
62 """,
63 event.event_id,
64 stream_id,
65 stream_type,
66 event.event_type,
67 json.dumps(event.data),
68 json.dumps(event.metadata),
69 event.version,
70 event.created_at
71 )
72 event.global_position = row['global_position']
73 saved_events.append(event)
74
75 return saved_events
76
77 async def read_stream(
78 self,
79 stream_id: str,
80 from_version: int = 0,
81 limit: int = 1000
82 ) -> List[Event]:
83 """Read events from a stream."""
84 async with self.pool.acquire() as conn:
85 rows = await conn.fetch(
86 """
87 SELECT id, stream_id, event_type, event_data, metadata,
88 version, global_position, created_at
89 FROM events
90 WHERE stream_id = $1 AND version >= $2
91 ORDER BY version
92 LIMIT $3
93 """,
94 stream_id, from_version, limit
95 )
96 return [self._row_to_event(row) for row in rows]
97
98 async def read_all(
99 self,
100 from_position: int = 0,
101 limit: int = 1000
102 ) -> List[Event]:
103 """Read all events globally."""
104 async with self.pool.acquire() as conn:
105 rows = await conn.fetch(
106 """
107 SELECT id, stream_id, event_type, event_data, metadata,
108 version, global_position, created_at
109 FROM events
110 WHERE global_position > $1
111 ORDER BY global_position
112 LIMIT $2
113 """,
114 from_position, limit
115 )
116 return [self._row_to_event(row) for row in rows]
117
118 async def subscribe(
119 self,
120 subscription_id: str,
121 handler,
122 from_position: int = 0,
123 batch_size: int = 100
124 ):
125 """Subscribe to all events from a position."""
126 # Get checkpoint
127 async with self.pool.acquire() as conn:
128 checkpoint = await conn.fetchval(
129 """
130 SELECT last_position FROM subscription_checkpoints
131 WHERE subscription_id = $1
132 """,
133 subscription_id
134 )
135 position = checkpoint or from_position
136
137 while True:
138 events = await self.read_all(position, batch_size)
139 if not events:
140 await asyncio.sleep(1) # Poll interval
141 continue
142
143 for event in events:
144 await handler(event)
145 position = event.global_position
146
147 # Save checkpoint
148 async with self.pool.acquire() as conn:
149 await conn.execute(
150 """
151 INSERT INTO subscription_checkpoints (subscription_id, last_position)
152 VALUES ($1, $2)
153 ON CONFLICT (subscription_id)
154 DO UPDATE SET last_position = $2, updated_at = NOW()
155 """,
156 subscription_id, position
157 )
158
159 def _row_to_event(self, row) -> Event:
160 return Event(
161 event_id=row['id'],
162 stream_id=row['stream_id'],
163 event_type=row['event_type'],
164 data=json.loads(row['event_data']),
165 metadata=json.loads(row['metadata']),
166 version=row['version'],
167 global_position=row['global_position'],
168 created_at=row['created_at']
169 )
170
171
172class ConcurrencyError(Exception):
173 """Raised when optimistic concurrency check fails."""
174 pass
Template 3: EventStoreDB Usage
1from esdbclient import EventStoreDBClient, NewEvent, StreamState
2import json
3
4# Connect
5client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
6
7# Append events
8def append_events(stream_name: str, events: list, expected_revision=None):
9 new_events = [
10 NewEvent(
11 type=event['type'],
12 data=json.dumps(event['data']).encode(),
13 metadata=json.dumps(event.get('metadata', {})).encode()
14 )
15 for event in events
16 ]
17
18 if expected_revision is None:
19 state = StreamState.ANY
20 elif expected_revision == -1:
21 state = StreamState.NO_STREAM
22 else:
23 state = expected_revision
24
25 return client.append_to_stream(
26 stream_name=stream_name,
27 events=new_events,
28 current_version=state
29 )
30
31# Read stream
32def read_stream(stream_name: str, from_revision: int = 0):
33 events = client.get_stream(
34 stream_name=stream_name,
35 stream_position=from_revision
36 )
37 return [
38 {
39 'type': event.type,
40 'data': json.loads(event.data),
41 'metadata': json.loads(event.metadata) if event.metadata else {},
42 'stream_position': event.stream_position,
43 'commit_position': event.commit_position
44 }
45 for event in events
46 ]
47
48# Subscribe to all
49async def subscribe_to_all(handler, from_position: int = 0):
50 subscription = client.subscribe_to_all(commit_position=from_position)
51 async for event in subscription:
52 await handler({
53 'type': event.type,
54 'data': json.loads(event.data),
55 'stream_id': event.stream_name,
56 'position': event.commit_position
57 })
58
59# Category projection ($ce-Category)
60def read_category(category: str):
61 """Read all events for a category using system projection."""
62 return read_stream(f"$ce-{category}")
Template 4: DynamoDB Event Store
1import boto3
2from boto3.dynamodb.conditions import Key
3from datetime import datetime
4import json
5import uuid
6
7class DynamoEventStore:
8 def __init__(self, table_name: str):
9 self.dynamodb = boto3.resource('dynamodb')
10 self.table = self.dynamodb.Table(table_name)
11
12 def append_events(self, stream_id: str, events: list, expected_version: int = None):
13 """Append events with conditional write for concurrency."""
14 with self.table.batch_writer() as batch:
15 for i, event in enumerate(events):
16 version = (expected_version or 0) + i + 1
17 item = {
18 'PK': f"STREAM#{stream_id}",
19 'SK': f"VERSION#{version:020d}",
20 'GSI1PK': 'EVENTS',
21 'GSI1SK': datetime.utcnow().isoformat(),
22 'event_id': str(uuid.uuid4()),
23 'stream_id': stream_id,
24 'event_type': event['type'],
25 'event_data': json.dumps(event['data']),
26 'version': version,
27 'created_at': datetime.utcnow().isoformat()
28 }
29 batch.put_item(Item=item)
30 return events
31
32 def read_stream(self, stream_id: str, from_version: int = 0):
33 """Read events from a stream."""
34 response = self.table.query(
35 KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
36 Key('SK').gte(f"VERSION#{from_version:020d}")
37 )
38 return [
39 {
40 'event_type': item['event_type'],
41 'data': json.loads(item['event_data']),
42 'version': item['version']
43 }
44 for item in response['Items']
45 ]
46
47# Table definition (CloudFormation/Terraform)
48"""
49DynamoDB Table:
50 - PK (Partition Key): String
51 - SK (Sort Key): String
52 - GSI1PK, GSI1SK for global ordering
53
54Capacity: On-demand or provisioned based on throughput needs
55"""
Best Practices
Do’s
- Use stream IDs that include aggregate type -
Order-{uuid} - Include correlation/causation IDs - For tracing
- Version events from day one - Plan for schema evolution
- Implement idempotency - Use event IDs for deduplication
- Index appropriately - For your query patterns
Don’ts
- Don’t update or delete events - They’re immutable facts
- Don’t store large payloads - Keep events small
- Don’t skip optimistic concurrency - Prevents data corruption
- Don’t ignore backpressure - Handle slow consumers
Resources
What Users Are Saying
Real feedback from the community
Environment Matrix
Dependencies
Framework Support
Context Window
Security & Privacy
Information
- Author
- wshobson
- Updated
- 2026-01-30
- Category
- architecture-patterns
Related Skills
Event Store Design
Design and implement event stores for event-sourced systems. Use when building event sourcing …
View Details →Cqrs Implementation
Implement Command Query Responsibility Segregation for scalable architectures. Use when separating …
View Details →Cqrs Implementation
Implement Command Query Responsibility Segregation for scalable architectures. Use when separating …
View Details →