Async Python Patterns

Master Python asyncio for high-performance concurrent applications

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Master Python asyncio, concurrent programming, and async/await patterns for high-performance applications. Use when building async APIs, concurrent systems, or I/O-bound applications requiring non-blocking operations.

python asyncio concurrency async-await performance non-blocking web-apis microservices
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

Help me build a FastAPI application that fetches user data from multiple microservices concurrently and aggregates the results

Skill Processing

Analyzing request...

Agent Response

Complete async API implementation with concurrent service calls, error handling, and performance optimization techniques

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install async-python-patterns

claude-code skill install async-python-patterns
2

Config

3

First Trigger

@async-python-patterns help

Commands

CommandDescriptionRequired Args
@async-python-patterns building-high-performance-web-apisCreate non-blocking REST APIs and microservices that handle thousands of concurrent requests efficientlyNone
@async-python-patterns concurrent-data-processingProcess large datasets, scrape websites, or handle multiple database operations simultaneously without blockingNone
@async-python-patterns real-time-system-developmentBuild WebSocket servers, chat applications, or event-driven systems that handle real-time communicationNone

Typical Use Cases

Building High-Performance Web APIs

Create non-blocking REST APIs and microservices that handle thousands of concurrent requests efficiently

Concurrent Data Processing

Process large datasets, scrape websites, or handle multiple database operations simultaneously without blocking

Real-time System Development

Build WebSocket servers, chat applications, or event-driven systems that handle real-time communication

Overview

Async Python Patterns

Comprehensive guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.

When to Use This Skill

  • Building async web APIs (FastAPI, aiohttp, Sanic)
  • Implementing concurrent I/O operations (database, file, network)
  • Creating web scrapers with concurrent requests
  • Developing real-time applications (WebSocket servers, chat systems)
  • Processing multiple independent tasks simultaneously
  • Building microservices with async communication
  • Optimizing I/O-bound workloads
  • Implementing async background tasks and queues

Core Concepts

1. Event Loop

The event loop is the heart of asyncio, managing and scheduling asynchronous tasks.

Key characteristics:

  • Single-threaded cooperative multitasking
  • Schedules coroutines for execution
  • Handles I/O operations without blocking
  • Manages callbacks and futures

2. Coroutines

Functions defined with async def that can be paused and resumed.

Syntax:

1async def my_coroutine():
2    result = await some_async_operation()
3    return result

3. Tasks

Scheduled coroutines that run concurrently on the event loop.

4. Futures

Low-level objects representing eventual results of async operations.

5. Async Context Managers

Resources that support async with for proper cleanup.

6. Async Iterators

Objects that support async for for iterating over async data sources.

Quick Start

1import asyncio
2
3async def main():
4    print("Hello")
5    await asyncio.sleep(1)
6    print("World")
7
8# Python 3.7+
9asyncio.run(main())

Fundamental Patterns

Pattern 1: Basic Async/Await

 1import asyncio
 2
 3async def fetch_data(url: str) -> dict:
 4    """Fetch data from URL asynchronously."""
 5    await asyncio.sleep(1)  # Simulate I/O
 6    return {"url": url, "data": "result"}
 7
 8async def main():
 9    result = await fetch_data("https://api.example.com")
10    print(result)
11
12asyncio.run(main())

Pattern 2: Concurrent Execution with gather()

 1import asyncio
 2from typing import List
 3
 4async def fetch_user(user_id: int) -> dict:
 5    """Fetch user data."""
 6    await asyncio.sleep(0.5)
 7    return {"id": user_id, "name": f"User {user_id}"}
 8
 9async def fetch_all_users(user_ids: List[int]) -> List[dict]:
10    """Fetch multiple users concurrently."""
11    tasks = [fetch_user(uid) for uid in user_ids]
12    results = await asyncio.gather(*tasks)
13    return results
14
15async def main():
16    user_ids = [1, 2, 3, 4, 5]
17    users = await fetch_all_users(user_ids)
18    print(f"Fetched {len(users)} users")
19
20asyncio.run(main())

Pattern 3: Task Creation and Management

 1import asyncio
 2
 3async def background_task(name: str, delay: int):
 4    """Long-running background task."""
 5    print(f"{name} started")
 6    await asyncio.sleep(delay)
 7    print(f"{name} completed")
 8    return f"Result from {name}"
 9
10async def main():
11    # Create tasks
12    task1 = asyncio.create_task(background_task("Task 1", 2))
13    task2 = asyncio.create_task(background_task("Task 2", 1))
14
15    # Do other work
16    print("Main: doing other work")
17    await asyncio.sleep(0.5)
18
19    # Wait for tasks
20    result1 = await task1
21    result2 = await task2
22
23    print(f"Results: {result1}, {result2}")
24
25asyncio.run(main())

Pattern 4: Error Handling in Async Code

 1import asyncio
 2from typing import List, Optional
 3
 4async def risky_operation(item_id: int) -> dict:
 5    """Operation that might fail."""
 6    await asyncio.sleep(0.1)
 7    if item_id % 3 == 0:
 8        raise ValueError(f"Item {item_id} failed")
 9    return {"id": item_id, "status": "success"}
10
11async def safe_operation(item_id: int) -> Optional[dict]:
12    """Wrapper with error handling."""
13    try:
14        return await risky_operation(item_id)
15    except ValueError as e:
16        print(f"Error: {e}")
17        return None
18
19async def process_items(item_ids: List[int]):
20    """Process multiple items with error handling."""
21    tasks = [safe_operation(iid) for iid in item_ids]
22    results = await asyncio.gather(*tasks, return_exceptions=True)
23
24    # Filter out failures
25    successful = [r for r in results if r is not None and not isinstance(r, Exception)]
26    failed = [r for r in results if isinstance(r, Exception)]
27
28    print(f"Success: {len(successful)}, Failed: {len(failed)}")
29    return successful
30
31asyncio.run(process_items([1, 2, 3, 4, 5, 6]))

Pattern 5: Timeout Handling

 1import asyncio
 2
 3async def slow_operation(delay: int) -> str:
 4    """Operation that takes time."""
 5    await asyncio.sleep(delay)
 6    return f"Completed after {delay}s"
 7
 8async def with_timeout():
 9    """Execute operation with timeout."""
10    try:
11        result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
12        print(result)
13    except asyncio.TimeoutError:
14        print("Operation timed out")
15
16asyncio.run(with_timeout())

Advanced Patterns

Pattern 6: Async Context Managers

 1import asyncio
 2from typing import Optional
 3
 4class AsyncDatabaseConnection:
 5    """Async database connection context manager."""
 6
 7    def __init__(self, dsn: str):
 8        self.dsn = dsn
 9        self.connection: Optional[object] = None
10
11    async def __aenter__(self):
12        print("Opening connection")
13        await asyncio.sleep(0.1)  # Simulate connection
14        self.connection = {"dsn": self.dsn, "connected": True}
15        return self.connection
16
17    async def __aexit__(self, exc_type, exc_val, exc_tb):
18        print("Closing connection")
19        await asyncio.sleep(0.1)  # Simulate cleanup
20        self.connection = None
21
22async def query_database():
23    """Use async context manager."""
24    async with AsyncDatabaseConnection("postgresql://localhost") as conn:
25        print(f"Using connection: {conn}")
26        await asyncio.sleep(0.2)  # Simulate query
27        return {"rows": 10}
28
29asyncio.run(query_database())

Pattern 7: Async Iterators and Generators

 1import asyncio
 2from typing import AsyncIterator
 3
 4async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
 5    """Async generator that yields numbers with delay."""
 6    for i in range(start, end):
 7        await asyncio.sleep(delay)
 8        yield i
 9
10async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
11    """Fetch paginated data asynchronously."""
12    for page in range(1, max_pages + 1):
13        await asyncio.sleep(0.2)  # Simulate API call
14        yield {
15            "page": page,
16            "url": f"{url}?page={page}",
17            "data": [f"item_{page}_{i}" for i in range(5)]
18        }
19
20async def consume_async_iterator():
21    """Consume async iterator."""
22    async for number in async_range(1, 5):
23        print(f"Number: {number}")
24
25    print("\nFetching pages:")
26    async for page_data in fetch_pages("https://api.example.com/items", 3):
27        print(f"Page {page_data['page']}: {len(page_data['data'])} items")
28
29asyncio.run(consume_async_iterator())

Pattern 8: Producer-Consumer Pattern

 1import asyncio
 2from asyncio import Queue
 3from typing import Optional
 4
 5async def producer(queue: Queue, producer_id: int, num_items: int):
 6    """Produce items and put them in queue."""
 7    for i in range(num_items):
 8        item = f"Item-{producer_id}-{i}"
 9        await queue.put(item)
10        print(f"Producer {producer_id} produced: {item}")
11        await asyncio.sleep(0.1)
12    await queue.put(None)  # Signal completion
13
14async def consumer(queue: Queue, consumer_id: int):
15    """Consume items from queue."""
16    while True:
17        item = await queue.get()
18        if item is None:
19            queue.task_done()
20            break
21
22        print(f"Consumer {consumer_id} processing: {item}")
23        await asyncio.sleep(0.2)  # Simulate work
24        queue.task_done()
25
26async def producer_consumer_example():
27    """Run producer-consumer pattern."""
28    queue = Queue(maxsize=10)
29
30    # Create tasks
31    producers = [
32        asyncio.create_task(producer(queue, i, 5))
33        for i in range(2)
34    ]
35
36    consumers = [
37        asyncio.create_task(consumer(queue, i))
38        for i in range(3)
39    ]
40
41    # Wait for producers
42    await asyncio.gather(*producers)
43
44    # Wait for queue to be empty
45    await queue.join()
46
47    # Cancel consumers
48    for c in consumers:
49        c.cancel()
50
51asyncio.run(producer_consumer_example())

Pattern 9: Semaphore for Rate Limiting

 1import asyncio
 2from typing import List
 3
 4async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
 5    """Make API call with rate limiting."""
 6    async with semaphore:
 7        print(f"Calling {url}")
 8        await asyncio.sleep(0.5)  # Simulate API call
 9        return {"url": url, "status": 200}
10
11async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
12    """Make multiple requests with rate limiting."""
13    semaphore = asyncio.Semaphore(max_concurrent)
14    tasks = [api_call(url, semaphore) for url in urls]
15    results = await asyncio.gather(*tasks)
16    return results
17
18async def main():
19    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
20    results = await rate_limited_requests(urls, max_concurrent=3)
21    print(f"Completed {len(results)} requests")
22
23asyncio.run(main())

Pattern 10: Async Locks and Synchronization

 1import asyncio
 2
 3class AsyncCounter:
 4    """Thread-safe async counter."""
 5
 6    def __init__(self):
 7        self.value = 0
 8        self.lock = asyncio.Lock()
 9
10    async def increment(self):
11        """Safely increment counter."""
12        async with self.lock:
13            current = self.value
14            await asyncio.sleep(0.01)  # Simulate work
15            self.value = current + 1
16
17    async def get_value(self) -> int:
18        """Get current value."""
19        async with self.lock:
20            return self.value
21
22async def worker(counter: AsyncCounter, worker_id: int):
23    """Worker that increments counter."""
24    for _ in range(10):
25        await counter.increment()
26        print(f"Worker {worker_id} incremented")
27
28async def test_counter():
29    """Test concurrent counter."""
30    counter = AsyncCounter()
31
32    workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
33    await asyncio.gather(*workers)
34
35    final_value = await counter.get_value()
36    print(f"Final counter value: {final_value}")
37
38asyncio.run(test_counter())

Real-World Applications

Web Scraping with aiohttp

 1import asyncio
 2import aiohttp
 3from typing import List, Dict
 4
 5async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
 6    """Fetch single URL."""
 7    try:
 8        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
 9            text = await response.text()
10            return {
11                "url": url,
12                "status": response.status,
13                "length": len(text)
14            }
15    except Exception as e:
16        return {"url": url, "error": str(e)}
17
18async def scrape_urls(urls: List[str]) -> List[Dict]:
19    """Scrape multiple URLs concurrently."""
20    async with aiohttp.ClientSession() as session:
21        tasks = [fetch_url(session, url) for url in urls]
22        results = await asyncio.gather(*tasks)
23        return results
24
25async def main():
26    urls = [
27        "https://httpbin.org/delay/1",
28        "https://httpbin.org/delay/2",
29        "https://httpbin.org/status/404",
30    ]
31
32    results = await scrape_urls(urls)
33    for result in results:
34        print(result)
35
36asyncio.run(main())

Async Database Operations

 1import asyncio
 2from typing import List, Optional
 3
 4# Simulated async database client
 5class AsyncDB:
 6    """Simulated async database."""
 7
 8    async def execute(self, query: str) -> List[dict]:
 9        """Execute query."""
10        await asyncio.sleep(0.1)
11        return [{"id": 1, "name": "Example"}]
12
13    async def fetch_one(self, query: str) -> Optional[dict]:
14        """Fetch single row."""
15        await asyncio.sleep(0.1)
16        return {"id": 1, "name": "Example"}
17
18async def get_user_data(db: AsyncDB, user_id: int) -> dict:
19    """Fetch user and related data concurrently."""
20    user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
21    orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
22    profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
23
24    user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
25
26    return {
27        "user": user,
28        "orders": orders,
29        "profile": profile
30    }
31
32async def main():
33    db = AsyncDB()
34    user_data = await get_user_data(db, 1)
35    print(user_data)
36
37asyncio.run(main())

WebSocket Server

 1import asyncio
 2from typing import Set
 3
 4# Simulated WebSocket connection
 5class WebSocket:
 6    """Simulated WebSocket."""
 7
 8    def __init__(self, client_id: str):
 9        self.client_id = client_id
10
11    async def send(self, message: str):
12        """Send message."""
13        print(f"Sending to {self.client_id}: {message}")
14        await asyncio.sleep(0.01)
15
16    async def recv(self) -> str:
17        """Receive message."""
18        await asyncio.sleep(1)
19        return f"Message from {self.client_id}"
20
21class WebSocketServer:
22    """Simple WebSocket server."""
23
24    def __init__(self):
25        self.clients: Set[WebSocket] = set()
26
27    async def register(self, websocket: WebSocket):
28        """Register new client."""
29        self.clients.add(websocket)
30        print(f"Client {websocket.client_id} connected")
31
32    async def unregister(self, websocket: WebSocket):
33        """Unregister client."""
34        self.clients.remove(websocket)
35        print(f"Client {websocket.client_id} disconnected")
36
37    async def broadcast(self, message: str):
38        """Broadcast message to all clients."""
39        if self.clients:
40            tasks = [client.send(message) for client in self.clients]
41            await asyncio.gather(*tasks)
42
43    async def handle_client(self, websocket: WebSocket):
44        """Handle individual client connection."""
45        await self.register(websocket)
46        try:
47            async for message in self.message_iterator(websocket):
48                await self.broadcast(f"{websocket.client_id}: {message}")
49        finally:
50            await self.unregister(websocket)
51
52    async def message_iterator(self, websocket: WebSocket):
53        """Iterate over messages from client."""
54        for _ in range(3):  # Simulate 3 messages
55            yield await websocket.recv()

Performance Best Practices

1. Use Connection Pools

 1import asyncio
 2import aiohttp
 3
 4async def with_connection_pool():
 5    """Use connection pool for efficiency."""
 6    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
 7
 8    async with aiohttp.ClientSession(connector=connector) as session:
 9        tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
10        responses = await asyncio.gather(*tasks)
11        return responses

2. Batch Operations

 1async def batch_process(items: List[str], batch_size: int = 10):
 2    """Process items in batches."""
 3    for i in range(0, len(items), batch_size):
 4        batch = items[i:i + batch_size]
 5        tasks = [process_item(item) for item in batch]
 6        await asyncio.gather(*tasks)
 7        print(f"Processed batch {i // batch_size + 1}")
 8
 9async def process_item(item: str):
10    """Process single item."""
11    await asyncio.sleep(0.1)
12    return f"Processed: {item}"

3. Avoid Blocking Operations

 1import asyncio
 2import concurrent.futures
 3from typing import Any
 4
 5def blocking_operation(data: Any) -> Any:
 6    """CPU-intensive blocking operation."""
 7    import time
 8    time.sleep(1)
 9    return data * 2
10
11async def run_in_executor(data: Any) -> Any:
12    """Run blocking operation in thread pool."""
13    loop = asyncio.get_event_loop()
14    with concurrent.futures.ThreadPoolExecutor() as pool:
15        result = await loop.run_in_executor(pool, blocking_operation, data)
16        return result
17
18async def main():
19    results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
20    print(results)
21
22asyncio.run(main())

Common Pitfalls

1. Forgetting await

1# Wrong - returns coroutine object, doesn't execute
2result = async_function()
3
4# Correct
5result = await async_function()

2. Blocking the Event Loop

1# Wrong - blocks event loop
2import time
3async def bad():
4    time.sleep(1)  # Blocks!
5
6# Correct
7async def good():
8    await asyncio.sleep(1)  # Non-blocking

3. Not Handling Cancellation

 1async def cancelable_task():
 2    """Task that handles cancellation."""
 3    try:
 4        while True:
 5            await asyncio.sleep(1)
 6            print("Working...")
 7    except asyncio.CancelledError:
 8        print("Task cancelled, cleaning up...")
 9        # Perform cleanup
10        raise  # Re-raise to propagate cancellation

4. Mixing Sync and Async Code

1# Wrong - can't call async from sync directly
2def sync_function():
3    result = await async_function()  # SyntaxError!
4
5# Correct
6def sync_function():
7    result = asyncio.run(async_function())

Testing Async Code

 1import asyncio
 2import pytest
 3
 4# Using pytest-asyncio
 5@pytest.mark.asyncio
 6async def test_async_function():
 7    """Test async function."""
 8    result = await fetch_data("https://api.example.com")
 9    assert result is not None
10
11@pytest.mark.asyncio
12async def test_with_timeout():
13    """Test with timeout."""
14    with pytest.raises(asyncio.TimeoutError):
15        await asyncio.wait_for(slow_operation(5), timeout=1.0)

Resources

Best Practices Summary

  1. Use asyncio.run() for entry point (Python 3.7+)
  2. Always await coroutines to execute them
  3. Use gather() for concurrent execution of multiple tasks
  4. Implement proper error handling with try/except
  5. Use timeouts to prevent hanging operations
  6. Pool connections for better performance
  7. Avoid blocking operations in async code
  8. Use semaphores for rate limiting
  9. Handle task cancellation properly
  10. Test async code with pytest-asyncio

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Python 3.7+
aiohttp (for HTTP operations)
pytest-asyncio (for testing)

Framework Support

FastAPI ✓ (recommended) aiohttp ✓ Sanic ✓ asyncpg ✓ (PostgreSQL) motor ✓ (MongoDB)

Context Window

Token Usage ~3K-8K tokens depending on pattern complexity and number of examples

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
scripting