Rust Async Patterns

Master async Rust with Tokio patterns and concurrent programming techniques

✨ The solution you've been looking for

Verified
Tested and verified by our team
25450 Stars

Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.

rust async-programming tokio concurrency futures systems-programming performance networking
Repository

See It In Action

Interactive preview & real-world examples

Live Demo
Skill Demo Animation

AI Conversation Simulator

See how users interact with this skill

User Prompt

Help me build an async HTTP server in Rust that can handle 1000+ concurrent connections with proper error handling and graceful shutdown

Skill Processing

Analyzing request...

Agent Response

Complete async server implementation with Tokio, proper channel communication, error propagation, and shutdown patterns

Quick Start (3 Steps)

Get up and running in minutes

1

Install

claude-code skill install rust-async-patterns

claude-code skill install rust-async-patterns
2

Config

3

First Trigger

@rust-async-patterns help

Commands

CommandDescriptionRequired Args
@rust-async-patterns building-concurrent-web-servicesImplement high-performance async web services with proper error handling and graceful shutdownNone
@rust-async-patterns async-data-processing-pipelineCreate efficient data processing pipelines using streams and channels for concurrent operationsNone
@rust-async-patterns debugging-async-performance-issuesIdentify and resolve common async performance bottlenecks and deadlocksNone

Typical Use Cases

Building Concurrent Web Services

Implement high-performance async web services with proper error handling and graceful shutdown

Async Data Processing Pipeline

Create efficient data processing pipelines using streams and channels for concurrent operations

Debugging Async Performance Issues

Identify and resolve common async performance bottlenecks and deadlocks

Overview

Rust Async Patterns

Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

When to Use This Skill

  • Building async Rust applications
  • Implementing concurrent network services
  • Using Tokio for async I/O
  • Handling async errors properly
  • Debugging async code issues
  • Optimizing async performance

Core Concepts

1. Async Execution Model

Future (lazy) → poll() → Ready(value) | Pending
                ↑           ↓
              Waker ← Runtime schedules

2. Key Abstractions

ConceptPurpose
FutureLazy computation that may complete later
async fnFunction returning impl Future
awaitSuspend until future completes
TaskSpawned future running concurrently
RuntimeExecutor that polls futures

Quick Start

1# Cargo.toml
2[dependencies]
3tokio = { version = "1", features = ["full"] }
4futures = "0.3"
5async-trait = "0.1"
6anyhow = "1.0"
7tracing = "0.1"
8tracing-subscriber = "0.3"
 1use tokio::time::{sleep, Duration};
 2use anyhow::Result;
 3
 4#[tokio::main]
 5async fn main() -> Result<()> {
 6    // Initialize tracing
 7    tracing_subscriber::fmt::init();
 8
 9    // Async operations
10    let result = fetch_data("https://api.example.com").await?;
11    println!("Got: {}", result);
12
13    Ok(())
14}
15
16async fn fetch_data(url: &str) -> Result<String> {
17    // Simulated async operation
18    sleep(Duration::from_millis(100)).await;
19    Ok(format!("Data from {}", url))
20}

Patterns

Pattern 1: Concurrent Task Execution

 1use tokio::task::JoinSet;
 2use anyhow::Result;
 3
 4// Spawn multiple concurrent tasks
 5async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
 6    let mut set = JoinSet::new();
 7
 8    for url in urls {
 9        set.spawn(async move {
10            fetch_data(&url).await
11        });
12    }
13
14    let mut results = Vec::new();
15    while let Some(res) = set.join_next().await {
16        match res {
17            Ok(Ok(data)) => results.push(data),
18            Ok(Err(e)) => tracing::error!("Task failed: {}", e),
19            Err(e) => tracing::error!("Join error: {}", e),
20        }
21    }
22
23    Ok(results)
24}
25
26// With concurrency limit
27use futures::stream::{self, StreamExt};
28
29async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
30    stream::iter(urls)
31        .map(|url| async move { fetch_data(&url).await })
32        .buffer_unordered(limit) // Max concurrent tasks
33        .collect()
34        .await
35}
36
37// Select first to complete
38use tokio::select;
39
40async fn race_requests(url1: &str, url2: &str) -> Result<String> {
41    select! {
42        result = fetch_data(url1) => result,
43        result = fetch_data(url2) => result,
44    }
45}

Pattern 2: Channels for Communication

 1use tokio::sync::{mpsc, broadcast, oneshot, watch};
 2
 3// Multi-producer, single-consumer
 4async fn mpsc_example() {
 5    let (tx, mut rx) = mpsc::channel::<String>(100);
 6
 7    // Spawn producer
 8    let tx2 = tx.clone();
 9    tokio::spawn(async move {
10        tx2.send("Hello".to_string()).await.unwrap();
11    });
12
13    // Consume
14    while let Some(msg) = rx.recv().await {
15        println!("Got: {}", msg);
16    }
17}
18
19// Broadcast: multi-producer, multi-consumer
20async fn broadcast_example() {
21    let (tx, _) = broadcast::channel::<String>(100);
22
23    let mut rx1 = tx.subscribe();
24    let mut rx2 = tx.subscribe();
25
26    tx.send("Event".to_string()).unwrap();
27
28    // Both receivers get the message
29    let _ = rx1.recv().await;
30    let _ = rx2.recv().await;
31}
32
33// Oneshot: single value, single use
34async fn oneshot_example() -> String {
35    let (tx, rx) = oneshot::channel::<String>();
36
37    tokio::spawn(async move {
38        tx.send("Result".to_string()).unwrap();
39    });
40
41    rx.await.unwrap()
42}
43
44// Watch: single producer, multi-consumer, latest value
45async fn watch_example() {
46    let (tx, mut rx) = watch::channel("initial".to_string());
47
48    tokio::spawn(async move {
49        loop {
50            // Wait for changes
51            rx.changed().await.unwrap();
52            println!("New value: {}", *rx.borrow());
53        }
54    });
55
56    tx.send("updated".to_string()).unwrap();
57}

Pattern 3: Async Error Handling

 1use anyhow::{Context, Result, bail};
 2use thiserror::Error;
 3
 4#[derive(Error, Debug)]
 5pub enum ServiceError {
 6    #[error("Network error: {0}")]
 7    Network(#[from] reqwest::Error),
 8
 9    #[error("Database error: {0}")]
10    Database(#[from] sqlx::Error),
11
12    #[error("Not found: {0}")]
13    NotFound(String),
14
15    #[error("Timeout after {0:?}")]
16    Timeout(std::time::Duration),
17}
18
19// Using anyhow for application errors
20async fn process_request(id: &str) -> Result<Response> {
21    let data = fetch_data(id)
22        .await
23        .context("Failed to fetch data")?;
24
25    let parsed = parse_response(&data)
26        .context("Failed to parse response")?;
27
28    Ok(parsed)
29}
30
31// Using custom errors for library code
32async fn get_user(id: &str) -> Result<User, ServiceError> {
33    let result = db.query(id).await?;
34
35    match result {
36        Some(user) => Ok(user),
37        None => Err(ServiceError::NotFound(id.to_string())),
38    }
39}
40
41// Timeout wrapper
42use tokio::time::timeout;
43
44async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
45where
46    F: std::future::Future<Output = Result<T, ServiceError>>,
47{
48    timeout(duration, future)
49        .await
50        .map_err(|_| ServiceError::Timeout(duration))?
51}

Pattern 4: Graceful Shutdown

 1use tokio::signal;
 2use tokio::sync::broadcast;
 3use tokio_util::sync::CancellationToken;
 4
 5async fn run_server() -> Result<()> {
 6    // Method 1: CancellationToken
 7    let token = CancellationToken::new();
 8    let token_clone = token.clone();
 9
10    // Spawn task that respects cancellation
11    tokio::spawn(async move {
12        loop {
13            tokio::select! {
14                _ = token_clone.cancelled() => {
15                    tracing::info!("Task shutting down");
16                    break;
17                }
18                _ = do_work() => {}
19            }
20        }
21    });
22
23    // Wait for shutdown signal
24    signal::ctrl_c().await?;
25    tracing::info!("Shutdown signal received");
26
27    // Cancel all tasks
28    token.cancel();
29
30    // Give tasks time to cleanup
31    tokio::time::sleep(Duration::from_secs(5)).await;
32
33    Ok(())
34}
35
36// Method 2: Broadcast channel for shutdown
37async fn run_with_broadcast() -> Result<()> {
38    let (shutdown_tx, _) = broadcast::channel::<()>(1);
39
40    let mut rx = shutdown_tx.subscribe();
41    tokio::spawn(async move {
42        tokio::select! {
43            _ = rx.recv() => {
44                tracing::info!("Received shutdown");
45            }
46            _ = async { loop { do_work().await } } => {}
47        }
48    });
49
50    signal::ctrl_c().await?;
51    let _ = shutdown_tx.send(());
52
53    Ok(())
54}

Pattern 5: Async Traits

 1use async_trait::async_trait;
 2
 3#[async_trait]
 4pub trait Repository {
 5    async fn get(&self, id: &str) -> Result<Entity>;
 6    async fn save(&self, entity: &Entity) -> Result<()>;
 7    async fn delete(&self, id: &str) -> Result<()>;
 8}
 9
10pub struct PostgresRepository {
11    pool: sqlx::PgPool,
12}
13
14#[async_trait]
15impl Repository for PostgresRepository {
16    async fn get(&self, id: &str) -> Result<Entity> {
17        sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
18            .fetch_one(&self.pool)
19            .await
20            .map_err(Into::into)
21    }
22
23    async fn save(&self, entity: &Entity) -> Result<()> {
24        sqlx::query!(
25            "INSERT INTO entities (id, data) VALUES ($1, $2)
26             ON CONFLICT (id) DO UPDATE SET data = $2",
27            entity.id,
28            entity.data
29        )
30        .execute(&self.pool)
31        .await?;
32        Ok(())
33    }
34
35    async fn delete(&self, id: &str) -> Result<()> {
36        sqlx::query!("DELETE FROM entities WHERE id = $1", id)
37            .execute(&self.pool)
38            .await?;
39        Ok(())
40    }
41}
42
43// Trait object usage
44async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
45    let entity = repo.get(id).await?;
46    // Process...
47    repo.save(&entity).await
48}

Pattern 6: Streams and Async Iteration

 1use futures::stream::{self, Stream, StreamExt};
 2use async_stream::stream;
 3
 4// Create stream from async iterator
 5fn numbers_stream() -> impl Stream<Item = i32> {
 6    stream! {
 7        for i in 0..10 {
 8            tokio::time::sleep(Duration::from_millis(100)).await;
 9            yield i;
10        }
11    }
12}
13
14// Process stream
15async fn process_stream() {
16    let stream = numbers_stream();
17
18    // Map and filter
19    let processed: Vec<_> = stream
20        .filter(|n| futures::future::ready(*n % 2 == 0))
21        .map(|n| n * 2)
22        .collect()
23        .await;
24
25    println!("{:?}", processed);
26}
27
28// Chunked processing
29async fn process_in_chunks() {
30    let stream = numbers_stream();
31
32    let mut chunks = stream.chunks(3);
33
34    while let Some(chunk) = chunks.next().await {
35        println!("Processing chunk: {:?}", chunk);
36    }
37}
38
39// Merge multiple streams
40async fn merge_streams() {
41    let stream1 = numbers_stream();
42    let stream2 = numbers_stream();
43
44    let merged = stream::select(stream1, stream2);
45
46    merged
47        .for_each(|n| async move {
48            println!("Got: {}", n);
49        })
50        .await;
51}

Pattern 7: Resource Management

 1use std::sync::Arc;
 2use tokio::sync::{Mutex, RwLock, Semaphore};
 3
 4// Shared state with RwLock (prefer for read-heavy)
 5struct Cache {
 6    data: RwLock<HashMap<String, String>>,
 7}
 8
 9impl Cache {
10    async fn get(&self, key: &str) -> Option<String> {
11        self.data.read().await.get(key).cloned()
12    }
13
14    async fn set(&self, key: String, value: String) {
15        self.data.write().await.insert(key, value);
16    }
17}
18
19// Connection pool with semaphore
20struct Pool {
21    semaphore: Semaphore,
22    connections: Mutex<Vec<Connection>>,
23}
24
25impl Pool {
26    fn new(size: usize) -> Self {
27        Self {
28            semaphore: Semaphore::new(size),
29            connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
30        }
31    }
32
33    async fn acquire(&self) -> PooledConnection<'_> {
34        let permit = self.semaphore.acquire().await.unwrap();
35        let conn = self.connections.lock().await.pop().unwrap();
36        PooledConnection { pool: self, conn: Some(conn), _permit: permit }
37    }
38}
39
40struct PooledConnection<'a> {
41    pool: &'a Pool,
42    conn: Option<Connection>,
43    _permit: tokio::sync::SemaphorePermit<'a>,
44}
45
46impl Drop for PooledConnection<'_> {
47    fn drop(&mut self) {
48        if let Some(conn) = self.conn.take() {
49            let pool = self.pool;
50            tokio::spawn(async move {
51                pool.connections.lock().await.push(conn);
52            });
53        }
54    }
55}

Debugging Tips

 1// Enable tokio-console for runtime debugging
 2// Cargo.toml: tokio = { features = ["tracing"] }
 3// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
 4// Then: tokio-console
 5
 6// Instrument async functions
 7use tracing::instrument;
 8
 9#[instrument(skip(pool))]
10async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
11    tracing::debug!("Fetching user");
12    // ...
13}
14
15// Track task spawning
16let span = tracing::info_span!("worker", id = %worker_id);
17tokio::spawn(async move {
18    // Enters span when polled
19}.instrument(span));

Best Practices

Do’s

  • Use tokio::select! - For racing futures
  • Prefer channels - Over shared state when possible
  • Use JoinSet - For managing multiple tasks
  • Instrument with tracing - For debugging async code
  • Handle cancellation - Check CancellationToken

Don’ts

  • Don’t block - Never use std::thread::sleep in async
  • Don’t hold locks across awaits - Causes deadlocks
  • Don’t spawn unboundedly - Use semaphores for limits
  • Don’t ignore errors - Propagate with ? or log
  • Don’t forget Send bounds - For spawned futures

Resources

What Users Are Saying

Real feedback from the community

Environment Matrix

Dependencies

Rust 1.70+
tokio 1.x with full features
futures 0.3+
async-trait 0.1+
anyhow 1.0+ for error handling
tracing 0.1+ for instrumentation

Framework Support

Tokio ✓ (recommended) async-std ✓ smol ✓ sqlx ✓ for async database operations reqwest ✓ for HTTP clients tonic ✓ for gRPC services

Context Window

Token Usage ~3K-8K tokens depending on pattern complexity

Security & Privacy

Information

Author
wshobson
Updated
2026-01-30
Category
architecture-patterns