Rust Async Patterns
Master async Rust with Tokio patterns and concurrent programming techniques
✨ The solution you've been looking for
Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.
See It In Action
Interactive preview & real-world examples
AI Conversation Simulator
See how users interact with this skill
User Prompt
Help me build an async HTTP server in Rust that can handle 1000+ concurrent connections with proper error handling and graceful shutdown
Skill Processing
Analyzing request...
Agent Response
Complete async server implementation with Tokio, proper channel communication, error propagation, and shutdown patterns
Quick Start (3 Steps)
Get up and running in minutes
Install
claude-code skill install rust-async-patterns
claude-code skill install rust-async-patternsConfig
First Trigger
@rust-async-patterns helpCommands
| Command | Description | Required Args |
|---|---|---|
| @rust-async-patterns building-concurrent-web-services | Implement high-performance async web services with proper error handling and graceful shutdown | None |
| @rust-async-patterns async-data-processing-pipeline | Create efficient data processing pipelines using streams and channels for concurrent operations | None |
| @rust-async-patterns debugging-async-performance-issues | Identify and resolve common async performance bottlenecks and deadlocks | None |
Typical Use Cases
Building Concurrent Web Services
Implement high-performance async web services with proper error handling and graceful shutdown
Async Data Processing Pipeline
Create efficient data processing pipelines using streams and channels for concurrent operations
Debugging Async Performance Issues
Identify and resolve common async performance bottlenecks and deadlocks
Overview
Rust Async Patterns
Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.
When to Use This Skill
- Building async Rust applications
- Implementing concurrent network services
- Using Tokio for async I/O
- Handling async errors properly
- Debugging async code issues
- Optimizing async performance
Core Concepts
1. Async Execution Model
Future (lazy) → poll() → Ready(value) | Pending
↑ ↓
Waker ← Runtime schedules
2. Key Abstractions
| Concept | Purpose |
|---|---|
Future | Lazy computation that may complete later |
async fn | Function returning impl Future |
await | Suspend until future completes |
Task | Spawned future running concurrently |
Runtime | Executor that polls futures |
Quick Start
1# Cargo.toml
2[dependencies]
3tokio = { version = "1", features = ["full"] }
4futures = "0.3"
5async-trait = "0.1"
6anyhow = "1.0"
7tracing = "0.1"
8tracing-subscriber = "0.3"
1use tokio::time::{sleep, Duration};
2use anyhow::Result;
3
4#[tokio::main]
5async fn main() -> Result<()> {
6 // Initialize tracing
7 tracing_subscriber::fmt::init();
8
9 // Async operations
10 let result = fetch_data("https://api.example.com").await?;
11 println!("Got: {}", result);
12
13 Ok(())
14}
15
16async fn fetch_data(url: &str) -> Result<String> {
17 // Simulated async operation
18 sleep(Duration::from_millis(100)).await;
19 Ok(format!("Data from {}", url))
20}
Patterns
Pattern 1: Concurrent Task Execution
1use tokio::task::JoinSet;
2use anyhow::Result;
3
4// Spawn multiple concurrent tasks
5async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
6 let mut set = JoinSet::new();
7
8 for url in urls {
9 set.spawn(async move {
10 fetch_data(&url).await
11 });
12 }
13
14 let mut results = Vec::new();
15 while let Some(res) = set.join_next().await {
16 match res {
17 Ok(Ok(data)) => results.push(data),
18 Ok(Err(e)) => tracing::error!("Task failed: {}", e),
19 Err(e) => tracing::error!("Join error: {}", e),
20 }
21 }
22
23 Ok(results)
24}
25
26// With concurrency limit
27use futures::stream::{self, StreamExt};
28
29async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
30 stream::iter(urls)
31 .map(|url| async move { fetch_data(&url).await })
32 .buffer_unordered(limit) // Max concurrent tasks
33 .collect()
34 .await
35}
36
37// Select first to complete
38use tokio::select;
39
40async fn race_requests(url1: &str, url2: &str) -> Result<String> {
41 select! {
42 result = fetch_data(url1) => result,
43 result = fetch_data(url2) => result,
44 }
45}
Pattern 2: Channels for Communication
1use tokio::sync::{mpsc, broadcast, oneshot, watch};
2
3// Multi-producer, single-consumer
4async fn mpsc_example() {
5 let (tx, mut rx) = mpsc::channel::<String>(100);
6
7 // Spawn producer
8 let tx2 = tx.clone();
9 tokio::spawn(async move {
10 tx2.send("Hello".to_string()).await.unwrap();
11 });
12
13 // Consume
14 while let Some(msg) = rx.recv().await {
15 println!("Got: {}", msg);
16 }
17}
18
19// Broadcast: multi-producer, multi-consumer
20async fn broadcast_example() {
21 let (tx, _) = broadcast::channel::<String>(100);
22
23 let mut rx1 = tx.subscribe();
24 let mut rx2 = tx.subscribe();
25
26 tx.send("Event".to_string()).unwrap();
27
28 // Both receivers get the message
29 let _ = rx1.recv().await;
30 let _ = rx2.recv().await;
31}
32
33// Oneshot: single value, single use
34async fn oneshot_example() -> String {
35 let (tx, rx) = oneshot::channel::<String>();
36
37 tokio::spawn(async move {
38 tx.send("Result".to_string()).unwrap();
39 });
40
41 rx.await.unwrap()
42}
43
44// Watch: single producer, multi-consumer, latest value
45async fn watch_example() {
46 let (tx, mut rx) = watch::channel("initial".to_string());
47
48 tokio::spawn(async move {
49 loop {
50 // Wait for changes
51 rx.changed().await.unwrap();
52 println!("New value: {}", *rx.borrow());
53 }
54 });
55
56 tx.send("updated".to_string()).unwrap();
57}
Pattern 3: Async Error Handling
1use anyhow::{Context, Result, bail};
2use thiserror::Error;
3
4#[derive(Error, Debug)]
5pub enum ServiceError {
6 #[error("Network error: {0}")]
7 Network(#[from] reqwest::Error),
8
9 #[error("Database error: {0}")]
10 Database(#[from] sqlx::Error),
11
12 #[error("Not found: {0}")]
13 NotFound(String),
14
15 #[error("Timeout after {0:?}")]
16 Timeout(std::time::Duration),
17}
18
19// Using anyhow for application errors
20async fn process_request(id: &str) -> Result<Response> {
21 let data = fetch_data(id)
22 .await
23 .context("Failed to fetch data")?;
24
25 let parsed = parse_response(&data)
26 .context("Failed to parse response")?;
27
28 Ok(parsed)
29}
30
31// Using custom errors for library code
32async fn get_user(id: &str) -> Result<User, ServiceError> {
33 let result = db.query(id).await?;
34
35 match result {
36 Some(user) => Ok(user),
37 None => Err(ServiceError::NotFound(id.to_string())),
38 }
39}
40
41// Timeout wrapper
42use tokio::time::timeout;
43
44async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
45where
46 F: std::future::Future<Output = Result<T, ServiceError>>,
47{
48 timeout(duration, future)
49 .await
50 .map_err(|_| ServiceError::Timeout(duration))?
51}
Pattern 4: Graceful Shutdown
1use tokio::signal;
2use tokio::sync::broadcast;
3use tokio_util::sync::CancellationToken;
4
5async fn run_server() -> Result<()> {
6 // Method 1: CancellationToken
7 let token = CancellationToken::new();
8 let token_clone = token.clone();
9
10 // Spawn task that respects cancellation
11 tokio::spawn(async move {
12 loop {
13 tokio::select! {
14 _ = token_clone.cancelled() => {
15 tracing::info!("Task shutting down");
16 break;
17 }
18 _ = do_work() => {}
19 }
20 }
21 });
22
23 // Wait for shutdown signal
24 signal::ctrl_c().await?;
25 tracing::info!("Shutdown signal received");
26
27 // Cancel all tasks
28 token.cancel();
29
30 // Give tasks time to cleanup
31 tokio::time::sleep(Duration::from_secs(5)).await;
32
33 Ok(())
34}
35
36// Method 2: Broadcast channel for shutdown
37async fn run_with_broadcast() -> Result<()> {
38 let (shutdown_tx, _) = broadcast::channel::<()>(1);
39
40 let mut rx = shutdown_tx.subscribe();
41 tokio::spawn(async move {
42 tokio::select! {
43 _ = rx.recv() => {
44 tracing::info!("Received shutdown");
45 }
46 _ = async { loop { do_work().await } } => {}
47 }
48 });
49
50 signal::ctrl_c().await?;
51 let _ = shutdown_tx.send(());
52
53 Ok(())
54}
Pattern 5: Async Traits
1use async_trait::async_trait;
2
3#[async_trait]
4pub trait Repository {
5 async fn get(&self, id: &str) -> Result<Entity>;
6 async fn save(&self, entity: &Entity) -> Result<()>;
7 async fn delete(&self, id: &str) -> Result<()>;
8}
9
10pub struct PostgresRepository {
11 pool: sqlx::PgPool,
12}
13
14#[async_trait]
15impl Repository for PostgresRepository {
16 async fn get(&self, id: &str) -> Result<Entity> {
17 sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
18 .fetch_one(&self.pool)
19 .await
20 .map_err(Into::into)
21 }
22
23 async fn save(&self, entity: &Entity) -> Result<()> {
24 sqlx::query!(
25 "INSERT INTO entities (id, data) VALUES ($1, $2)
26 ON CONFLICT (id) DO UPDATE SET data = $2",
27 entity.id,
28 entity.data
29 )
30 .execute(&self.pool)
31 .await?;
32 Ok(())
33 }
34
35 async fn delete(&self, id: &str) -> Result<()> {
36 sqlx::query!("DELETE FROM entities WHERE id = $1", id)
37 .execute(&self.pool)
38 .await?;
39 Ok(())
40 }
41}
42
43// Trait object usage
44async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
45 let entity = repo.get(id).await?;
46 // Process...
47 repo.save(&entity).await
48}
Pattern 6: Streams and Async Iteration
1use futures::stream::{self, Stream, StreamExt};
2use async_stream::stream;
3
4// Create stream from async iterator
5fn numbers_stream() -> impl Stream<Item = i32> {
6 stream! {
7 for i in 0..10 {
8 tokio::time::sleep(Duration::from_millis(100)).await;
9 yield i;
10 }
11 }
12}
13
14// Process stream
15async fn process_stream() {
16 let stream = numbers_stream();
17
18 // Map and filter
19 let processed: Vec<_> = stream
20 .filter(|n| futures::future::ready(*n % 2 == 0))
21 .map(|n| n * 2)
22 .collect()
23 .await;
24
25 println!("{:?}", processed);
26}
27
28// Chunked processing
29async fn process_in_chunks() {
30 let stream = numbers_stream();
31
32 let mut chunks = stream.chunks(3);
33
34 while let Some(chunk) = chunks.next().await {
35 println!("Processing chunk: {:?}", chunk);
36 }
37}
38
39// Merge multiple streams
40async fn merge_streams() {
41 let stream1 = numbers_stream();
42 let stream2 = numbers_stream();
43
44 let merged = stream::select(stream1, stream2);
45
46 merged
47 .for_each(|n| async move {
48 println!("Got: {}", n);
49 })
50 .await;
51}
Pattern 7: Resource Management
1use std::sync::Arc;
2use tokio::sync::{Mutex, RwLock, Semaphore};
3
4// Shared state with RwLock (prefer for read-heavy)
5struct Cache {
6 data: RwLock<HashMap<String, String>>,
7}
8
9impl Cache {
10 async fn get(&self, key: &str) -> Option<String> {
11 self.data.read().await.get(key).cloned()
12 }
13
14 async fn set(&self, key: String, value: String) {
15 self.data.write().await.insert(key, value);
16 }
17}
18
19// Connection pool with semaphore
20struct Pool {
21 semaphore: Semaphore,
22 connections: Mutex<Vec<Connection>>,
23}
24
25impl Pool {
26 fn new(size: usize) -> Self {
27 Self {
28 semaphore: Semaphore::new(size),
29 connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
30 }
31 }
32
33 async fn acquire(&self) -> PooledConnection<'_> {
34 let permit = self.semaphore.acquire().await.unwrap();
35 let conn = self.connections.lock().await.pop().unwrap();
36 PooledConnection { pool: self, conn: Some(conn), _permit: permit }
37 }
38}
39
40struct PooledConnection<'a> {
41 pool: &'a Pool,
42 conn: Option<Connection>,
43 _permit: tokio::sync::SemaphorePermit<'a>,
44}
45
46impl Drop for PooledConnection<'_> {
47 fn drop(&mut self) {
48 if let Some(conn) = self.conn.take() {
49 let pool = self.pool;
50 tokio::spawn(async move {
51 pool.connections.lock().await.push(conn);
52 });
53 }
54 }
55}
Debugging Tips
1// Enable tokio-console for runtime debugging
2// Cargo.toml: tokio = { features = ["tracing"] }
3// Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
4// Then: tokio-console
5
6// Instrument async functions
7use tracing::instrument;
8
9#[instrument(skip(pool))]
10async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
11 tracing::debug!("Fetching user");
12 // ...
13}
14
15// Track task spawning
16let span = tracing::info_span!("worker", id = %worker_id);
17tokio::spawn(async move {
18 // Enters span when polled
19}.instrument(span));
Best Practices
Do’s
- Use
tokio::select!- For racing futures - Prefer channels - Over shared state when possible
- Use
JoinSet- For managing multiple tasks - Instrument with tracing - For debugging async code
- Handle cancellation - Check
CancellationToken
Don’ts
- Don’t block - Never use
std::thread::sleepin async - Don’t hold locks across awaits - Causes deadlocks
- Don’t spawn unboundedly - Use semaphores for limits
- Don’t ignore errors - Propagate with
?or log - Don’t forget Send bounds - For spawned futures
Resources
What Users Are Saying
Real feedback from the community
Environment Matrix
Dependencies
Framework Support
Context Window
Security & Privacy
Information
- Author
- wshobson
- Updated
- 2026-01-30
- Category
- architecture-patterns
Related Skills
Rust Async Patterns
Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use …
View Details →Async Python Patterns
Master Python asyncio, concurrent programming, and async/await patterns for high-performance …
View Details →Async Python Patterns
Master Python asyncio, concurrent programming, and async/await patterns for high-performance …
View Details →