Event Store Design

Design robust event stores for scalable event-sourced systems

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.

event-sourcing event-store architecture database-design postgresql eventstore nosql streaming
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

I need to implement event sourcing for a high-volume e-commerce platform. Should I use EventStoreDB, PostgreSQL, or DynamoDB?

Skill Processing

Analyzing request...

Agent Response

Detailed comparison with pros/cons, capacity considerations, and implementation complexity analysis for each option

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install event-store-design

claude-code skill install event-store-design
2

Config

3

First Trigger

@event-store-design help

Commands

CommandDescriptionRequired Args
@event-store-design technology-selectionChoose the right event store technology for your specific requirements and constraintsNone
@event-store-design custom-event-store-implementationBuild a custom event store using existing databases with proper schema design and concurrency controlNone
@event-store-design performance-optimizationOptimize event store performance for high-throughput scenarios with proper indexing and scaling strategiesNone

Typical Use Cases

Technology Selection

Choose the right event store technology for your specific requirements and constraints

Custom Event Store Implementation

Build a custom event store using existing databases with proper schema design and concurrency control

Performance Optimization

Optimize event store performance for high-throughput scenarios with proper indexing and scaling strategies

Overview

Event Store Design

Comprehensive guide to designing event stores for event-sourced applications.

When to Use This Skill

  • Designing event sourcing infrastructure
  • Choosing between event store technologies
  • Implementing custom event stores
  • Optimizing event storage and retrieval
  • Setting up event store schemas
  • Planning for event store scaling

Core Concepts

1. Event Store Architecture

┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘

2. Event Store Requirements

RequirementDescription
Append-onlyEvents are immutable, only appends
OrderedPer-stream and global ordering
VersionedOptimistic concurrency control
SubscriptionsReal-time event notifications
IdempotentHandle duplicate writes safely

Technology Comparison

TechnologyBest ForLimitations
EventStoreDBPure event sourcingSingle-purpose
PostgreSQLExisting Postgres stackManual implementation
KafkaHigh-throughput streamingNot ideal for per-stream queries
DynamoDBServerless, AWS-nativeQuery limitations
Marten.NET ecosystems.NET specific

Templates

Template 1: PostgreSQL Event Store Schema

 1-- Events table
 2CREATE TABLE events (
 3    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
 4    stream_id VARCHAR(255) NOT NULL,
 5    stream_type VARCHAR(255) NOT NULL,
 6    event_type VARCHAR(255) NOT NULL,
 7    event_data JSONB NOT NULL,
 8    metadata JSONB DEFAULT '{}',
 9    version BIGINT NOT NULL,
10    global_position BIGSERIAL,
11    created_at TIMESTAMPTZ DEFAULT NOW(),
12
13    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
14);
15
16-- Index for stream queries
17CREATE INDEX idx_events_stream_id ON events(stream_id, version);
18
19-- Index for global subscription
20CREATE INDEX idx_events_global_position ON events(global_position);
21
22-- Index for event type queries
23CREATE INDEX idx_events_event_type ON events(event_type);
24
25-- Index for time-based queries
26CREATE INDEX idx_events_created_at ON events(created_at);
27
28-- Snapshots table
29CREATE TABLE snapshots (
30    stream_id VARCHAR(255) PRIMARY KEY,
31    stream_type VARCHAR(255) NOT NULL,
32    snapshot_data JSONB NOT NULL,
33    version BIGINT NOT NULL,
34    created_at TIMESTAMPTZ DEFAULT NOW()
35);
36
37-- Subscriptions checkpoint table
38CREATE TABLE subscription_checkpoints (
39    subscription_id VARCHAR(255) PRIMARY KEY,
40    last_position BIGINT NOT NULL DEFAULT 0,
41    updated_at TIMESTAMPTZ DEFAULT NOW()
42);

Template 2: Python Event Store Implementation

  1from dataclasses import dataclass, field
  2from datetime import datetime
  3from typing import Any, Optional, List
  4from uuid import UUID, uuid4
  5import json
  6import asyncpg
  7
  8@dataclass
  9class Event:
 10    stream_id: str
 11    event_type: str
 12    data: dict
 13    metadata: dict = field(default_factory=dict)
 14    event_id: UUID = field(default_factory=uuid4)
 15    version: Optional[int] = None
 16    global_position: Optional[int] = None
 17    created_at: datetime = field(default_factory=datetime.utcnow)
 18
 19
 20class EventStore:
 21    def __init__(self, pool: asyncpg.Pool):
 22        self.pool = pool
 23
 24    async def append_events(
 25        self,
 26        stream_id: str,
 27        stream_type: str,
 28        events: List[Event],
 29        expected_version: Optional[int] = None
 30    ) -> List[Event]:
 31        """Append events to a stream with optimistic concurrency."""
 32        async with self.pool.acquire() as conn:
 33            async with conn.transaction():
 34                # Check expected version
 35                if expected_version is not None:
 36                    current = await conn.fetchval(
 37                        "SELECT MAX(version) FROM events WHERE stream_id = $1",
 38                        stream_id
 39                    )
 40                    current = current or 0
 41                    if current != expected_version:
 42                        raise ConcurrencyError(
 43                            f"Expected version {expected_version}, got {current}"
 44                        )
 45
 46                # Get starting version
 47                start_version = await conn.fetchval(
 48                    "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
 49                    stream_id
 50                )
 51
 52                # Insert events
 53                saved_events = []
 54                for i, event in enumerate(events):
 55                    event.version = start_version + i
 56                    row = await conn.fetchrow(
 57                        """
 58                        INSERT INTO events (id, stream_id, stream_type, event_type,
 59                                          event_data, metadata, version, created_at)
 60                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
 61                        RETURNING global_position
 62                        """,
 63                        event.event_id,
 64                        stream_id,
 65                        stream_type,
 66                        event.event_type,
 67                        json.dumps(event.data),
 68                        json.dumps(event.metadata),
 69                        event.version,
 70                        event.created_at
 71                    )
 72                    event.global_position = row['global_position']
 73                    saved_events.append(event)
 74
 75                return saved_events
 76
 77    async def read_stream(
 78        self,
 79        stream_id: str,
 80        from_version: int = 0,
 81        limit: int = 1000
 82    ) -> List[Event]:
 83        """Read events from a stream."""
 84        async with self.pool.acquire() as conn:
 85            rows = await conn.fetch(
 86                """
 87                SELECT id, stream_id, event_type, event_data, metadata,
 88                       version, global_position, created_at
 89                FROM events
 90                WHERE stream_id = $1 AND version >= $2
 91                ORDER BY version
 92                LIMIT $3
 93                """,
 94                stream_id, from_version, limit
 95            )
 96            return [self._row_to_event(row) for row in rows]
 97
 98    async def read_all(
 99        self,
100        from_position: int = 0,
101        limit: int = 1000
102    ) -> List[Event]:
103        """Read all events globally."""
104        async with self.pool.acquire() as conn:
105            rows = await conn.fetch(
106                """
107                SELECT id, stream_id, event_type, event_data, metadata,
108                       version, global_position, created_at
109                FROM events
110                WHERE global_position > $1
111                ORDER BY global_position
112                LIMIT $2
113                """,
114                from_position, limit
115            )
116            return [self._row_to_event(row) for row in rows]
117
118    async def subscribe(
119        self,
120        subscription_id: str,
121        handler,
122        from_position: int = 0,
123        batch_size: int = 100
124    ):
125        """Subscribe to all events from a position."""
126        # Get checkpoint
127        async with self.pool.acquire() as conn:
128            checkpoint = await conn.fetchval(
129                """
130                SELECT last_position FROM subscription_checkpoints
131                WHERE subscription_id = $1
132                """,
133                subscription_id
134            )
135            position = checkpoint or from_position
136
137        while True:
138            events = await self.read_all(position, batch_size)
139            if not events:
140                await asyncio.sleep(1)  # Poll interval
141                continue
142
143            for event in events:
144                await handler(event)
145                position = event.global_position
146
147            # Save checkpoint
148            async with self.pool.acquire() as conn:
149                await conn.execute(
150                    """
151                    INSERT INTO subscription_checkpoints (subscription_id, last_position)
152                    VALUES ($1, $2)
153                    ON CONFLICT (subscription_id)
154                    DO UPDATE SET last_position = $2, updated_at = NOW()
155                    """,
156                    subscription_id, position
157                )
158
159    def _row_to_event(self, row) -> Event:
160        return Event(
161            event_id=row['id'],
162            stream_id=row['stream_id'],
163            event_type=row['event_type'],
164            data=json.loads(row['event_data']),
165            metadata=json.loads(row['metadata']),
166            version=row['version'],
167            global_position=row['global_position'],
168            created_at=row['created_at']
169        )
170
171
172class ConcurrencyError(Exception):
173    """Raised when optimistic concurrency check fails."""
174    pass

Template 3: EventStoreDB Usage

 1from esdbclient import EventStoreDBClient, NewEvent, StreamState
 2import json
 3
 4# Connect
 5client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
 6
 7# Append events
 8def append_events(stream_name: str, events: list, expected_revision=None):
 9    new_events = [
10        NewEvent(
11            type=event['type'],
12            data=json.dumps(event['data']).encode(),
13            metadata=json.dumps(event.get('metadata', {})).encode()
14        )
15        for event in events
16    ]
17
18    if expected_revision is None:
19        state = StreamState.ANY
20    elif expected_revision == -1:
21        state = StreamState.NO_STREAM
22    else:
23        state = expected_revision
24
25    return client.append_to_stream(
26        stream_name=stream_name,
27        events=new_events,
28        current_version=state
29    )
30
31# Read stream
32def read_stream(stream_name: str, from_revision: int = 0):
33    events = client.get_stream(
34        stream_name=stream_name,
35        stream_position=from_revision
36    )
37    return [
38        {
39            'type': event.type,
40            'data': json.loads(event.data),
41            'metadata': json.loads(event.metadata) if event.metadata else {},
42            'stream_position': event.stream_position,
43            'commit_position': event.commit_position
44        }
45        for event in events
46    ]
47
48# Subscribe to all
49async def subscribe_to_all(handler, from_position: int = 0):
50    subscription = client.subscribe_to_all(commit_position=from_position)
51    async for event in subscription:
52        await handler({
53            'type': event.type,
54            'data': json.loads(event.data),
55            'stream_id': event.stream_name,
56            'position': event.commit_position
57        })
58
59# Category projection ($ce-Category)
60def read_category(category: str):
61    """Read all events for a category using system projection."""
62    return read_stream(f"$ce-{category}")

Template 4: DynamoDB Event Store

 1import boto3
 2from boto3.dynamodb.conditions import Key
 3from datetime import datetime
 4import json
 5import uuid
 6
 7class DynamoEventStore:
 8    def __init__(self, table_name: str):
 9        self.dynamodb = boto3.resource('dynamodb')
10        self.table = self.dynamodb.Table(table_name)
11
12    def append_events(self, stream_id: str, events: list, expected_version: int = None):
13        """Append events with conditional write for concurrency."""
14        with self.table.batch_writer() as batch:
15            for i, event in enumerate(events):
16                version = (expected_version or 0) + i + 1
17                item = {
18                    'PK': f"STREAM#{stream_id}",
19                    'SK': f"VERSION#{version:020d}",
20                    'GSI1PK': 'EVENTS',
21                    'GSI1SK': datetime.utcnow().isoformat(),
22                    'event_id': str(uuid.uuid4()),
23                    'stream_id': stream_id,
24                    'event_type': event['type'],
25                    'event_data': json.dumps(event['data']),
26                    'version': version,
27                    'created_at': datetime.utcnow().isoformat()
28                }
29                batch.put_item(Item=item)
30        return events
31
32    def read_stream(self, stream_id: str, from_version: int = 0):
33        """Read events from a stream."""
34        response = self.table.query(
35            KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
36                                  Key('SK').gte(f"VERSION#{from_version:020d}")
37        )
38        return [
39            {
40                'event_type': item['event_type'],
41                'data': json.loads(item['event_data']),
42                'version': item['version']
43            }
44            for item in response['Items']
45        ]
46
47# Table definition (CloudFormation/Terraform)
48"""
49DynamoDB Table:
50  - PK (Partition Key): String
51  - SK (Sort Key): String
52  - GSI1PK, GSI1SK for global ordering
53
54Capacity: On-demand or provisioned based on throughput needs
55"""

Best Practices

Do’s

  • Use stream IDs that include aggregate type - Order-{uuid}
  • Include correlation/causation IDs - For tracing
  • Version events from day one - Plan for schema evolution
  • Implement idempotency - Use event IDs for deduplication
  • Index appropriately - For your query patterns

Don’ts

  • Don’t update or delete events - They’re immutable facts
  • Don’t store large payloads - Keep events small
  • Don’t skip optimistic concurrency - Prevents data corruption
  • Don’t ignore backpressure - Handle slow consumers

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Database system (PostgreSQL, EventStoreDB, DynamoDB, etc.)
Python 3.8+ (for examples)
asyncpg (for PostgreSQL implementation)
esdbclient (for EventStoreDB)
boto3 (for DynamoDB implementation)

Framework Support

PostgreSQL ✓ (recommended for existing stacks) EventStoreDB ✓ (recommended for pure event sourcing) DynamoDB ✓ (for AWS serverless) Marten ✓ (.NET ecosystems) Apache Kafka ✓ (high-throughput streaming)

Context Window

Token Usage ~3K-8K tokens for schema design and implementation patterns

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
architecture-patterns