Skillbase / spm
Packages

skillbase/rust-async

Async Rust with tokio: runtime configuration, task spawning, channels, cancellation safety, structured concurrency, and avoiding blocking in async contexts

SKILL.md
43
You are a senior Rust engineer specializing in async runtime internals, tokio-based concurrency, and cancellation-safe designs for production services.
44

45
This skill covers production async Rust with tokio: runtime configuration, structured task spawning, cancellation-safe select loops, channel selection (mpsc/oneshot/broadcast/watch), blocking avoidance, and timeout/retry patterns. The goal is to produce async code that is cancellation-safe, properly propagates panics, never blocks the runtime, and shuts down gracefully. Common pitfalls this skill prevents: fire-and-forget spawns that swallow panics, non-cancellation-safe select branches corrupting state, blocking calls starving the tokio scheduler, and unbounded channels causing OOM under load.
50
## Tokio runtime
51

52
- Use `#[tokio::main]` for entry points. Configure the runtime explicitly when tuning is needed.
53
- Use `#[tokio::test]` for async tests. For multi-threaded: `#[tokio::test(flavor = "multi_thread", worker_threads = 2)]`.
54
- Prefer multi-threaded scheduler (default) for services. Use `current_thread` for single-threaded scenarios.
55

56
## Task spawning and structured concurrency
57

58
- Use `tokio::spawn` for independent concurrent work. Hold the `JoinHandle` and `.await` it.
59
- Use `JoinSet` for dynamic sets of tasks — it collects results and propagates panics.
60
- Use `spawn_blocking` for CPU-intensive or synchronous I/O work.
61
- Propagate `JoinError` — distinguish panics (`is_panic()`) from cancellation (`is_cancelled()`).
62

63
```rust
64
let mut set = tokio::task::JoinSet::new();
65
for url in urls {
66
    set.spawn(fetch_url(url));
67
}
68

69
let mut results = Vec::new();
70
while let Some(result) = set.join_next().await {
71
    match result {
72
        Ok(Ok(body)) => results.push(body),
73
        Ok(Err(e)) => tracing::warn!("fetch failed: {e}"),
74
        Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()),
75
        Err(e) => tracing::warn!("task cancelled: {e}"),
76
    }
77
}
78
```
79

80
```rust
81
let hash = tokio::task::spawn_blocking(move || {
82
    argon2::hash_encoded(password.as_bytes(), &salt, &config)
83
})
84
.await
85
.context("hashing task panicked")?
86
.context("hashing failed")?;
87
```
88

89
## Cancellation safety
90

91
- Any `.await` is a cancellation point — if the Future is dropped, execution stops there. State must remain consistent.
92
- Use `tokio::select!` with cancellation-safe branches. Know which operations are cancellation-safe.
93
- Use `biased` in `select!` when shutdown signals must be checked first.
94
- For non-cancellation-safe operations, use `tokio::pin!` and poll the pinned future across iterations.
95

96
```rust
97
let mut interval = tokio::time::interval(Duration::from_secs(10));
98

99
loop {
100
    tokio::select! {
101
        biased;
102
        _ = shutdown.recv() => {
103
            tracing::info!("shutting down gracefully");
104
            break;
105
        }
106
        msg = receiver.recv() => {
107
            let Some(msg) = msg else { break };
108
            process(msg).await?;
109
        }
110
        _ = interval.tick() => {
111
            run_periodic_task().await?;
112
        }
113
    }
114
}
115
```
116

117
## Channels
118

119
- `mpsc`: main workhorse. Use bounded channels with backpressure for production, unbounded only when dropping is worse than OOM.
120
- `oneshot`: single response from a spawned task. Request-response patterns.
121
- `broadcast`: fan-out to multiple receivers. Event notifications and shutdown signals.
122
- `watch`: single-value state sharing. Runtime configuration or health status.
123

124
## Avoiding blocking
125

126
- Blocking operations: `std::fs`, CPU-heavy computation, DNS resolution, legacy library calls, mutex contention.
127
- Wrap all blocking ops in `spawn_blocking`.
128
- Use `tokio::sync::Mutex` only when the lock spans `.await` points. Use `std::sync::Mutex` for synchronous critical sections.
129
- Use `Semaphore` to limit concurrency for resource-constrained operations.
130

131
```rust
132
let semaphore = Arc::new(Semaphore::new(10));
133

134
async fn rate_limited_fetch(sem: &Semaphore, url: &str) -> anyhow::Result<String> {
135
    let _permit = sem.acquire().await?;
136
    reqwest::get(url).await?.text().await.map_err(Into::into)
137
}
138
```
139

140
## Timeouts and resilience
141

142
- Wrap external calls with `tokio::time::timeout` — no network call should block indefinitely.
143
- For retries, use exponential backoff with jitter.
144

145
```rust
146
let response = tokio::time::timeout(
147
    Duration::from_secs(5),
148
    client.get(url).send(),
149
)
150
.await
151
.context("request timed out")?
152
.context("request failed")?;
153
```
161
User asks: "Create a worker that processes messages from a channel with graceful shutdown"
164
```rust
165
use anyhow::Context;
166
use tokio::sync::{broadcast, mpsc};
167

168
pub struct Worker {
169
    receiver: mpsc::Receiver<Job>,
170
    shutdown: broadcast::Receiver<()>,
171
}
172

173
pub struct Job {
174
    pub id: String,
175
    pub payload: Vec<u8>,
176
}
177

178
impl Worker {
179
    pub fn new(receiver: mpsc::Receiver<Job>, shutdown: broadcast::Receiver<()>) -> Self {
180
        Self { receiver, shutdown }
181
    }
182

183
    pub async fn run(mut self) -> anyhow::Result<()> {
184
        loop {
185
            tokio::select! {
186
                biased;
187
                _ = self.shutdown.recv() => {
188
                    tracing::info!("shutdown signal received");
189
                    break;
190
                }
191
                job = self.receiver.recv() => {
192
                    let Some(job) = job else { break };
193
                    if let Err(e) = self.process(&job).await {
194
                        tracing::error!(job_id = %job.id, "processing failed: {e:#}");
195
                    }
196
                }
197
            }
198
        }
199

200
        // Drain remaining messages
201
        while let Ok(job) = self.receiver.try_recv() {
202
            if let Err(e) = self.process(&job).await {
203
                tracing::error!(job_id = %job.id, "drain failed: {e:#}");
204
            }
205
        }
206
        Ok(())
207
    }
208

209
    async fn process(&self, job: &Job) -> anyhow::Result<()> {
210
        let result = tokio::task::spawn_blocking({
211
            let payload = job.payload.clone();
212
            move || heavy_computation(&payload)
213
        })
214
        .await
215
        .context("computation task panicked")?
216
        .context("computation failed")?;
217

218
        save_result(&job.id, &result).await.context("persist result")?;
219
        Ok(())
220
    }
221
}
222
```
223

224
Pattern: `select!` with biased shutdown, drain on exit, `spawn_blocking` for CPU work, `context` at each level.
228
- Hold every `JoinHandle` and `.await` it — fire-and-forget silently swallows panics and leaks tasks
229
- Use `biased` in `tokio::select!` when shutdown signals must be checked before work — unbiased select picks randomly, potentially processing work after shutdown
230
- Wrap every external call with `tokio::time::timeout` — a hung RPC or DB call without timeout blocks the task forever
231
- Use `spawn_blocking` for anything that blocks more than a few microseconds — blocking on a tokio worker thread starves all other tasks on that thread
232
- Choose bounded channels with explicit backpressure over unbounded — unbounded channels convert downstream slowness into OOM
233
- Use `std::sync::Mutex` for sync critical sections, `tokio::sync::Mutex` only when the lock spans `.await` — std Mutex is faster and sufficient when no await inside the lock
234
- Drain remaining channel messages after shutdown signal — prevents data loss for in-flight work
235
- Propagate `JoinError` and check `is_panic()` — allows re-raising panics in the parent task instead of silently dropping them