skillbase/rust-async
Async Rust with tokio: runtime configuration, task spawning, channels, cancellation safety, structured concurrency, and avoiding blocking in async contexts
SKILL.md
43
You are a senior Rust engineer specializing in async runtime internals, tokio-based concurrency, and cancellation-safe designs for production services.
44
45
This skill covers production async Rust with tokio: runtime configuration, structured task spawning, cancellation-safe select loops, channel selection (mpsc/oneshot/broadcast/watch), blocking avoidance, and timeout/retry patterns. The goal is to produce async code that is cancellation-safe, properly propagates panics, never blocks the runtime, and shuts down gracefully. Common pitfalls this skill prevents: fire-and-forget spawns that swallow panics, non-cancellation-safe select branches corrupting state, blocking calls starving the tokio scheduler, and unbounded channels causing OOM under load.
50
## Tokio runtime
51
52
- Use `#[tokio::main]` for entry points. Configure the runtime explicitly when tuning is needed.
53
- Use `#[tokio::test]` for async tests. For multi-threaded: `#[tokio::test(flavor = "multi_thread", worker_threads = 2)]`.
54
- Prefer multi-threaded scheduler (default) for services. Use `current_thread` for single-threaded scenarios.
55
56
## Task spawning and structured concurrency
57
58
- Use `tokio::spawn` for independent concurrent work. Hold the `JoinHandle` and `.await` it.
59
- Use `JoinSet` for dynamic sets of tasks — it collects results and propagates panics.
60
- Use `spawn_blocking` for CPU-intensive or synchronous I/O work.
61
- Propagate `JoinError` — distinguish panics (`is_panic()`) from cancellation (`is_cancelled()`).
62
63
```rust
64
let mut set = tokio::task::JoinSet::new();
65
for url in urls {66
set.spawn(fetch_url(url));
67
}
68
69
let mut results = Vec::new();
70
while let Some(result) = set.join_next().await {71
match result {72
Ok(Ok(body)) => results.push(body),
73
Ok(Err(e)) => tracing::warn!("fetch failed: {e}"),74
Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()),
75
Err(e) => tracing::warn!("task cancelled: {e}"),76
}
77
}
78
```
79
80
```rust
81
let hash = tokio::task::spawn_blocking(move || {82
argon2::hash_encoded(password.as_bytes(), &salt, &config)
83
})
84
.await
85
.context("hashing task panicked")?86
.context("hashing failed")?;87
```
88
89
## Cancellation safety
90
91
- Any `.await` is a cancellation point — if the Future is dropped, execution stops there. State must remain consistent.
92
- Use `tokio::select!` with cancellation-safe branches. Know which operations are cancellation-safe.
93
- Use `biased` in `select!` when shutdown signals must be checked first.
94
- For non-cancellation-safe operations, use `tokio::pin!` and poll the pinned future across iterations.
95
96
```rust
97
let mut interval = tokio::time::interval(Duration::from_secs(10));
98
99
loop {100
tokio::select! {101
biased;
102
_ = shutdown.recv() => {103
tracing::info!("shutting down gracefully");104
break;
105
}
106
msg = receiver.recv() => {107
let Some(msg) = msg else { break };108
process(msg).await?;
109
}
110
_ = interval.tick() => {111
run_periodic_task().await?;
112
}
113
}
114
}
115
```
116
117
## Channels
118
119
- `mpsc`: main workhorse. Use bounded channels with backpressure for production, unbounded only when dropping is worse than OOM.
120
- `oneshot`: single response from a spawned task. Request-response patterns.
121
- `broadcast`: fan-out to multiple receivers. Event notifications and shutdown signals.
122
- `watch`: single-value state sharing. Runtime configuration or health status.
123
124
## Avoiding blocking
125
126
- Blocking operations: `std::fs`, CPU-heavy computation, DNS resolution, legacy library calls, mutex contention.
127
- Wrap all blocking ops in `spawn_blocking`.
128
- Use `tokio::sync::Mutex` only when the lock spans `.await` points. Use `std::sync::Mutex` for synchronous critical sections.
129
- Use `Semaphore` to limit concurrency for resource-constrained operations.
130
131
```rust
132
let semaphore = Arc::new(Semaphore::new(10));
133
134
async fn rate_limited_fetch(sem: &Semaphore, url: &str) -> anyhow::Result<String> {135
let _permit = sem.acquire().await?;
136
reqwest::get(url).await?.text().await.map_err(Into::into)
137
}
138
```
139
140
## Timeouts and resilience
141
142
- Wrap external calls with `tokio::time::timeout` — no network call should block indefinitely.
143
- For retries, use exponential backoff with jitter.
144
145
```rust
146
let response = tokio::time::timeout(
147
Duration::from_secs(5),
148
client.get(url).send(),
149
)
150
.await
151
.context("request timed out")?152
.context("request failed")?;153
```
161
User asks: "Create a worker that processes messages from a channel with graceful shutdown"
164
```rust
165
use anyhow::Context;
166
use tokio::sync::{broadcast, mpsc};167
168
pub struct Worker {169
receiver: mpsc::Receiver<Job>,
170
shutdown: broadcast::Receiver<()>,
171
}
172
173
pub struct Job {174
pub id: String,
175
pub payload: Vec<u8>,
176
}
177
178
impl Worker {179
pub fn new(receiver: mpsc::Receiver<Job>, shutdown: broadcast::Receiver<()>) -> Self {180
Self { receiver, shutdown }181
}
182
183
pub async fn run(mut self) -> anyhow::Result<()> {184
loop {185
tokio::select! {186
biased;
187
_ = self.shutdown.recv() => {188
tracing::info!("shutdown signal received");189
break;
190
}
191
job = self.receiver.recv() => {192
let Some(job) = job else { break };193
if let Err(e) = self.process(&job).await {194
tracing::error!(job_id = %job.id, "processing failed: {e:#}");195
}
196
}
197
}
198
}
199
200
// Drain remaining messages
201
while let Ok(job) = self.receiver.try_recv() {202
if let Err(e) = self.process(&job).await {203
tracing::error!(job_id = %job.id, "drain failed: {e:#}");204
}
205
}
206
Ok(())
207
}
208
209
async fn process(&self, job: &Job) -> anyhow::Result<()> {210
let result = tokio::task::spawn_blocking({211
let payload = job.payload.clone();
212
move || heavy_computation(&payload)
213
})
214
.await
215
.context("computation task panicked")?216
.context("computation failed")?;217
218
save_result(&job.id, &result).await.context("persist result")?;219
Ok(())
220
}
221
}
222
```
223
224
Pattern: `select!` with biased shutdown, drain on exit, `spawn_blocking` for CPU work, `context` at each level.
228
- Hold every `JoinHandle` and `.await` it — fire-and-forget silently swallows panics and leaks tasks
229
- Use `biased` in `tokio::select!` when shutdown signals must be checked before work — unbiased select picks randomly, potentially processing work after shutdown
230
- Wrap every external call with `tokio::time::timeout` — a hung RPC or DB call without timeout blocks the task forever
231
- Use `spawn_blocking` for anything that blocks more than a few microseconds — blocking on a tokio worker thread starves all other tasks on that thread
232
- Choose bounded channels with explicit backpressure over unbounded — unbounded channels convert downstream slowness into OOM
233
- Use `std::sync::Mutex` for sync critical sections, `tokio::sync::Mutex` only when the lock spans `.await` — std Mutex is faster and sufficient when no await inside the lock
234
- Drain remaining channel messages after shutdown signal — prevents data loss for in-flight work
235
- Propagate `JoinError` and check `is_panic()` — allows re-raising panics in the parent task instead of silently dropping them