Rust streams and timeouts gotcha
15 points by laplab
15 points by laplab
Sometimes one of these tasks will take a particularly long time before getting to the next
.await
and releasing the control to the runtime.
That’s not how async works! You’re not supposed to be blocking the runtime, and the time between .await
points should always be negligible.
tokio::spawn(task1())
If the task1()
is blocking, then this is a bad advice. It just dumps this latency-spiking blocking poll on some other random victim that will be sharing the thread/queue with it.
The correct solution is to avoid having long-running blocking code in async
functions. When you have to block from inside async code, then spawn_blocking()
is the right solution. If spawn_blocking()
is too difficult to use (due to borrowing), then at minimum wrap the blocking code in block_in_place()
to warn tokio
about the problem, and give it a chance to partially mitigate the problem.
What an insane footgun. It’s absolutely unintuitive that this would be the case. I assumed the polling would just have some slight backoff as the block continues for longer. I definitely need to read more about spawn_blocking and block_in_place now lol. Or maybe just use Go.
It’s the same in most async runtimes in most languages (C, JS, etc.): if you block, no polling happens on your thread. It’s not a slight backoff: nothing else happens while the thread is busy running your slow code. (There are a few green thread runtimes that can interrupt a slow function, e.g. Erlang, Golang, but they are relatively rare.)
Jesus christ, what a comment.
If another thread receives a potentially blocking task, tasks from this thread will be stolen by others. This is why tokio is called a work stealing executor.
If there are a lot of tasks like this, all threads will be blocked and spawn_blocking could be the solution.
If another thread receives a potentially blocking task, tasks from this thread will be stolen by others. This is why tokio is called a work stealing executor.
There are two problems with this - firstly if you’re using a single threaded runtime, and secondly tokio’s “LIFO slot” optimization prevents one task per thread from being stolen by other workers. That can cause deadlocks if you have made the mistake of blocking a worker thread but require all async tasks to make progress concurrently (ask me how I know!)
Thanks, I didn’t know about “LIFO slot” optimisation! Turns out there is even an issue to make this slot stealable. In this case, spawning the blocking task definitely isn’t the solution, I will update the post
I think the most correct thing is what the top comment mentioned, spawn_blocking
or block_in_place
which are called out in the tokio docs for this use case.
Don’t be caught up in small mistakes. The important part is that you are not supposed to do blocking work in async tasks.
We can even add StreamExt::buffered(n) to make fetching happen in parallel at the expense of some over-fetching in the end.
It’s not entirely relevant to the point of the article, but I fail to see how this causes over-fetching?
Buffering maintains up to N futures internally. If you have a list of 30 files, construct stream with .buffered(5)
and consume only 10 elements, there are five more waiting in the buffer, potentially ready. These items will never be consumed, but we fetched them anyway
construct stream with .buffered(5) and consume only 10 elements
Well yes, but your loop clearly consumes all of them, hence my confusion:
while let Some(footer) = footers.next().await {
// TODO: Process footer.
}
You’re right, this is confusing. In the actual code it’s quite different, but in the context of the article that’s misleading. I will update the wording
I should mention that I was puzzling through if there was something about the async-ness that was causing this, I’m not just being a total pedant! Thanks for the article