blocking/lib.rs
1//! A thread pool for isolating blocking I/O in async programs.
2//!
3//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5//! solutions, they're not always available or ideal.
6//!
7//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8//! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9//! current number of running I/O jobs.
10//!
11//! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12//! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13//! next job or shuts down after a certain timeout.
14//!
15//! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment
16//! variable with value between 1 and 10000.
17//!
18//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
19//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
20//! [io_uring]: https://lwn.net/Articles/776703
21//!
22//! # Examples
23//!
24//! Read the contents of a file:
25//!
26//! ```no_run
27//! use blocking::unblock;
28//! use std::fs;
29//!
30//! # futures_lite::future::block_on(async {
31//! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
32//! println!("{}", contents);
33//! # std::io::Result::Ok(()) });
34//! ```
35//!
36//! Read a file and pipe its contents to stdout:
37//!
38//! ```no_run
39//! use blocking::{unblock, Unblock};
40//! use futures_lite::io;
41//! use std::fs::File;
42//!
43//! # futures_lite::future::block_on(async {
44//! let input = unblock(|| File::open("file.txt")).await?;
45//! let input = Unblock::new(input);
46//! let mut output = Unblock::new(std::io::stdout());
47//!
48//! io::copy(input, &mut output).await?;
49//! # std::io::Result::Ok(()) });
50//! ```
51//!
52//! Iterate over the contents of a directory:
53//!
54//! ```no_run
55//! use blocking::Unblock;
56//! use futures_lite::prelude::*;
57//! use std::fs;
58//!
59//! # futures_lite::future::block_on(async {
60//! let mut dir = Unblock::new(fs::read_dir(".")?);
61//! while let Some(item) = dir.next().await {
62//! println!("{}", item?.file_name().to_string_lossy());
63//! }
64//! # std::io::Result::Ok(()) });
65//! ```
66//!
67//! Spawn a process:
68//!
69//! ```no_run
70//! use blocking::unblock;
71//! use std::process::Command;
72//!
73//! # futures_lite::future::block_on(async {
74//! let out = unblock(|| Command::new("dir").output()).await?;
75//! # std::io::Result::Ok(()) });
76//! ```
77
78#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
79#![forbid(unsafe_code)]
80#![doc(
81 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
82)]
83#![doc(
84 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
85)]
86
87use std::any::Any;
88use std::collections::VecDeque;
89use std::fmt;
90use std::io::{self, Read, Seek, SeekFrom, Write};
91use std::num::NonZeroUsize;
92use std::panic;
93use std::pin::Pin;
94use std::sync::atomic::{AtomicUsize, Ordering};
95use std::sync::{Condvar, Mutex, MutexGuard};
96use std::task::{Context, Poll};
97use std::thread;
98use std::time::Duration;
99
100#[cfg(not(target_family = "wasm"))]
101use std::env;
102
103use async_channel::{bounded, Receiver};
104use async_task::Runnable;
105use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
106use futures_lite::{
107 future::{self, Future},
108 ready,
109 stream::Stream,
110};
111use piper::{pipe, Reader, Writer};
112
113#[doc(no_inline)]
114pub use async_task::Task;
115
116/// Default value for max threads that Executor can grow to
117#[cfg(not(target_family = "wasm"))]
118const DEFAULT_MAX_THREADS: NonZeroUsize = {
119 if let Some(size) = NonZeroUsize::new(500) {
120 size
121 } else {
122 panic!("DEFAULT_MAX_THREADS is non-zero");
123 }
124};
125
126/// Minimum value for max threads config
127#[cfg(not(target_family = "wasm"))]
128const MIN_MAX_THREADS: usize = 1;
129
130/// Maximum value for max threads config
131#[cfg(not(target_family = "wasm"))]
132const MAX_MAX_THREADS: usize = 10000;
133
134/// Env variable that allows to override default value for max threads.
135#[cfg(not(target_family = "wasm"))]
136const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
137
138/// The blocking executor.
139struct Executor {
140 /// Inner state of the executor.
141 inner: Mutex<Inner>,
142
143 /// Used to put idle threads to sleep and wake them up when new work comes in.
144 cvar: Condvar,
145}
146
147/// Inner state of the blocking executor.
148struct Inner {
149 /// Number of idle threads in the pool.
150 ///
151 /// Idle threads are sleeping, waiting to get a task to run.
152 idle_count: usize,
153
154 /// Total number of threads in the pool.
155 ///
156 /// This is the number of idle threads + the number of active threads.
157 thread_count: usize,
158
159 // TODO: The option is only used for const-initialization. This can be replaced with
160 // a normal VecDeque when the MSRV can be bumped passed
161 /// The queue of blocking tasks.
162 queue: Option<VecDeque<Runnable>>,
163
164 /// Maximum number of threads in the pool
165 thread_limit: Option<NonZeroUsize>,
166}
167
168impl Inner {
169 #[inline]
170 fn queue(&mut self) -> &mut VecDeque<Runnable> {
171 self.queue.get_or_insert_with(VecDeque::new)
172 }
173}
174
175impl Executor {
176 #[cfg(not(target_family = "wasm"))]
177 fn max_threads() -> NonZeroUsize {
178 match env::var(MAX_THREADS_ENV) {
179 Ok(v) => v
180 .parse::<usize>()
181 .ok()
182 .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)))
183 .unwrap_or(DEFAULT_MAX_THREADS),
184 Err(_) => DEFAULT_MAX_THREADS,
185 }
186 }
187
188 #[cfg(target_family = "wasm")]
189 fn max_threads() -> NonZeroUsize {
190 NonZeroUsize::new(1).unwrap()
191 }
192
193 /// Get a reference to the global executor.
194 #[inline]
195 fn get() -> &'static Self {
196 #[cfg(not(target_family = "wasm"))]
197 {
198 static EXECUTOR: Executor = Executor {
199 inner: Mutex::new(Inner {
200 idle_count: 0,
201 thread_count: 0,
202 queue: None,
203 thread_limit: None,
204 }),
205 cvar: Condvar::new(),
206 };
207
208 &EXECUTOR
209 }
210
211 #[cfg(target_family = "wasm")]
212 panic!("cannot spawn a blocking task on WASM")
213 }
214
215 /// Spawns a future onto this executor.
216 ///
217 /// Returns a [`Task`] handle for the spawned task.
218 fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
219 let (runnable, task) = async_task::Builder::new().propagate_panic(true).spawn(
220 move |()| future,
221 |r| {
222 // Initialize the executor if we haven't already.
223 let executor = Self::get();
224
225 // Schedule the task on our executor.
226 executor.schedule(r)
227 },
228 );
229 runnable.schedule();
230 task
231 }
232
233 /// Runs the main loop on the current thread.
234 ///
235 /// This function runs blocking tasks until it becomes idle and times out.
236 fn main_loop(&'static self) {
237 #[cfg(feature = "tracing")]
238 let _span = tracing::trace_span!("blocking::main_loop").entered();
239
240 let mut inner = self.inner.lock().unwrap();
241 loop {
242 // This thread is not idle anymore because it's going to run tasks.
243 inner.idle_count -= 1;
244
245 // Run tasks in the queue.
246 while let Some(runnable) = inner.queue().pop_front() {
247 // We have found a task - grow the pool if needed.
248 self.grow_pool(inner);
249
250 // Run the task.
251 panic::catch_unwind(|| runnable.run()).ok();
252
253 // Re-lock the inner state and continue.
254 inner = self.inner.lock().unwrap();
255 }
256
257 // This thread is now becoming idle.
258 inner.idle_count += 1;
259
260 // Put the thread to sleep until another task is scheduled.
261 let timeout = Duration::from_millis(500);
262 #[cfg(feature = "tracing")]
263 tracing::trace!(?timeout, "going to sleep");
264 let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
265 inner = lock;
266
267 // If there are no tasks after a while, stop this thread.
268 if res.timed_out() && inner.queue().is_empty() {
269 inner.idle_count -= 1;
270 inner.thread_count -= 1;
271 break;
272 }
273
274 #[cfg(feature = "tracing")]
275 tracing::trace!("notified");
276 }
277
278 #[cfg(feature = "tracing")]
279 tracing::trace!("shutting down due to lack of tasks");
280 }
281
282 /// Schedules a runnable task for execution.
283 fn schedule(&'static self, runnable: Runnable) {
284 let mut inner = self.inner.lock().unwrap();
285 inner.queue().push_back(runnable);
286
287 // Notify a sleeping thread and spawn more threads if needed.
288 self.cvar.notify_one();
289 self.grow_pool(inner);
290 }
291
292 /// Spawns more blocking threads if the pool is overloaded with work.
293 fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
294 #[cfg(feature = "tracing")]
295 let _span = tracing::trace_span!(
296 "grow_pool",
297 queue_len = inner.queue().len(),
298 idle_count = inner.idle_count,
299 thread_count = inner.thread_count,
300 )
301 .entered();
302
303 let thread_limit = inner
304 .thread_limit
305 .get_or_insert_with(Self::max_threads)
306 .get();
307
308 // If runnable tasks greatly outnumber idle threads and there aren't too many threads
309 // already, then be aggressive: wake all idle threads and spawn one more thread.
310 while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit {
311 #[cfg(feature = "tracing")]
312 tracing::trace!("spawning a new thread to handle blocking tasks");
313
314 // The new thread starts in idle state.
315 inner.idle_count += 1;
316 inner.thread_count += 1;
317
318 // Notify all existing idle threads because we need to hurry up.
319 self.cvar.notify_all();
320
321 // Generate a new thread ID.
322 static ID: AtomicUsize = AtomicUsize::new(1);
323 let id = ID.fetch_add(1, Ordering::Relaxed);
324
325 // Spawn the new thread.
326 if let Err(_e) = thread::Builder::new()
327 .name(format!("blocking-{}", id))
328 .spawn(move || self.main_loop())
329 {
330 // We were unable to spawn the thread, so we need to undo the state changes.
331 #[cfg(feature = "tracing")]
332 tracing::error!("failed to spawn a blocking thread: {}", _e);
333 inner.idle_count -= 1;
334 inner.thread_count -= 1;
335
336 // The current number of threads is likely to be the system's upper limit, so update
337 // thread_limit accordingly.
338 inner.thread_limit = {
339 let new_limit = inner.thread_count;
340
341 // If the limit is about to be set to zero, set it to one instead so that if,
342 // in the future, we are able to spawn more threads, we will be able to do so.
343 Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| {
344 #[cfg(feature = "tracing")]
345 tracing::warn!(
346 "attempted to lower thread_limit to zero; setting to one instead"
347 );
348 NonZeroUsize::new(1).unwrap()
349 }))
350 };
351 }
352 }
353 }
354}
355
356/// Runs blocking code on a thread pool.
357///
358/// # Examples
359///
360/// Read the contents of a file:
361///
362/// ```no_run
363/// use blocking::unblock;
364/// use std::fs;
365///
366/// # futures_lite::future::block_on(async {
367/// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
368/// # std::io::Result::Ok(()) });
369/// ```
370///
371/// Spawn a process:
372///
373/// ```no_run
374/// use blocking::unblock;
375/// use std::process::Command;
376///
377/// # futures_lite::future::block_on(async {
378/// let out = unblock(|| Command::new("dir").output()).await?;
379/// # std::io::Result::Ok(()) });
380/// ```
381pub fn unblock<T, F>(f: F) -> Task<T>
382where
383 F: FnOnce() -> T + Send + 'static,
384 T: Send + 'static,
385{
386 Executor::spawn(async move { f() })
387}
388
389/// Runs blocking I/O on a thread pool.
390///
391/// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
392/// special thread pool while exposing a familiar async interface.
393///
394/// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
395/// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
396///
397/// # Caveats
398///
399/// [`Unblock`] is a low-level primitive, and as such it comes with some caveats.
400///
401/// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or
402/// [`async-process`] (on Windows).
403///
404/// [`async-fs`]: https://github.com/smol-rs/async-fs
405/// [`async-process`]: https://github.com/smol-rs/async-process
406///
407/// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
408/// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading
409/// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it
410/// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them
411/// into the I/O handle.
412///
413/// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
414///
415/// ### Reading
416///
417/// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it,
418/// and then drop it, a blocked read operation may keep hanging on the thread pool. The next
419/// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult
420/// problem to solve, so make sure you only use a single stdin handle for the duration of the
421/// entire program.
422///
423/// ### Writing
424///
425/// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
426/// [`Unblock`] handle or some buffered data might get lost.
427///
428/// ### Seeking
429///
430/// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single
431/// read operation may move the file cursor farther than is the span of the operation. In fact,
432/// reading just keeps going in the background until the pipe gets full. Keep this mind when
433/// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets.
434///
435/// # Examples
436///
437/// ```
438/// use blocking::Unblock;
439/// use futures_lite::prelude::*;
440///
441/// # futures_lite::future::block_on(async {
442/// let mut stdout = Unblock::new(std::io::stdout());
443/// stdout.write_all(b"Hello world!").await?;
444/// stdout.flush().await?;
445/// # std::io::Result::Ok(()) });
446/// ```
447pub struct Unblock<T> {
448 state: State<T>,
449 cap: Option<usize>,
450}
451
452impl<T> Unblock<T> {
453 /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
454 ///
455 /// # Examples
456 ///
457 /// ```no_run
458 /// use blocking::Unblock;
459 ///
460 /// let stdin = Unblock::new(std::io::stdin());
461 /// ```
462 pub fn new(io: T) -> Unblock<T> {
463 Unblock {
464 state: State::Idle(Some(Box::new(io))),
465 cap: None,
466 }
467 }
468
469 /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
470 /// capacity.
471 ///
472 /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
473 /// transferred between blocking and async code goes through a buffer of limited capacity. This
474 /// constructor configures that capacity.
475 ///
476 /// The default capacity is:
477 ///
478 /// * For [`Iterator`] types: 8192 items.
479 /// * For [`Read`]/[`Write`] types: 8 MB.
480 ///
481 /// # Examples
482 ///
483 /// ```no_run
484 /// use blocking::Unblock;
485 ///
486 /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
487 /// ```
488 pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
489 Unblock {
490 state: State::Idle(Some(Box::new(io))),
491 cap: Some(cap),
492 }
493 }
494
495 /// Gets a mutable reference to the blocking I/O handle.
496 ///
497 /// This is an async method because the I/O handle might be on the thread pool and needs to
498 /// be moved onto the current thread before we can get a reference to it.
499 ///
500 /// # Examples
501 ///
502 /// ```no_run
503 /// use blocking::{unblock, Unblock};
504 /// use std::fs::File;
505 ///
506 /// # futures_lite::future::block_on(async {
507 /// let file = unblock(|| File::create("file.txt")).await?;
508 /// let mut file = Unblock::new(file);
509 ///
510 /// let metadata = file.get_mut().await.metadata()?;
511 /// # std::io::Result::Ok(()) });
512 /// ```
513 pub async fn get_mut(&mut self) -> &mut T {
514 // Wait for the running task to stop and ignore I/O errors if there are any.
515 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
516
517 // Assume idle state and get a reference to the inner value.
518 match &mut self.state {
519 State::Idle(t) => t.as_mut().expect("inner value was taken out"),
520 State::WithMut(..)
521 | State::Streaming(..)
522 | State::Reading(..)
523 | State::Writing(..)
524 | State::Seeking(..) => {
525 unreachable!("when stopped, the state machine must be in idle state");
526 }
527 }
528 }
529
530 /// Performs a blocking operation on the I/O handle.
531 ///
532 /// # Examples
533 ///
534 /// ```no_run
535 /// use blocking::{unblock, Unblock};
536 /// use std::fs::File;
537 ///
538 /// # futures_lite::future::block_on(async {
539 /// let file = unblock(|| File::create("file.txt")).await?;
540 /// let mut file = Unblock::new(file);
541 ///
542 /// let metadata = file.with_mut(|f| f.metadata()).await?;
543 /// # std::io::Result::Ok(()) });
544 /// ```
545 pub async fn with_mut<R, F>(&mut self, op: F) -> R
546 where
547 F: FnOnce(&mut T) -> R + Send + 'static,
548 R: Send + 'static,
549 T: Send + 'static,
550 {
551 // Wait for the running task to stop and ignore I/O errors if there are any.
552 future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
553
554 // Assume idle state and take out the inner value.
555 let mut t = match &mut self.state {
556 State::Idle(t) => t.take().expect("inner value was taken out"),
557 State::WithMut(..)
558 | State::Streaming(..)
559 | State::Reading(..)
560 | State::Writing(..)
561 | State::Seeking(..) => {
562 unreachable!("when stopped, the state machine must be in idle state");
563 }
564 };
565
566 let (sender, receiver) = bounded(1);
567 let task = Executor::spawn(async move {
568 sender.try_send(op(&mut t)).ok();
569 t
570 });
571 self.state = State::WithMut(task);
572
573 receiver
574 .recv()
575 .await
576 .expect("`Unblock::with_mut()` operation has panicked")
577 }
578
579 /// Extracts the inner blocking I/O handle.
580 ///
581 /// This is an async method because the I/O handle might be on the thread pool and needs to
582 /// be moved onto the current thread before we can extract it.
583 ///
584 /// # Examples
585 ///
586 /// ```no_run
587 /// use blocking::{unblock, Unblock};
588 /// use futures_lite::prelude::*;
589 /// use std::fs::File;
590 ///
591 /// # futures_lite::future::block_on(async {
592 /// let file = unblock(|| File::create("file.txt")).await?;
593 /// let file = Unblock::new(file);
594 ///
595 /// let file = file.into_inner().await;
596 /// # std::io::Result::Ok(()) });
597 /// ```
598 pub async fn into_inner(self) -> T {
599 // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
600 // bind `self` to a local mutable variable.
601 let mut this = self;
602
603 // Wait for the running task to stop and ignore I/O errors if there are any.
604 future::poll_fn(|cx| this.poll_stop(cx)).await.ok();
605
606 // Assume idle state and extract the inner value.
607 match &mut this.state {
608 State::Idle(t) => *t.take().expect("inner value was taken out"),
609 State::WithMut(..)
610 | State::Streaming(..)
611 | State::Reading(..)
612 | State::Writing(..)
613 | State::Seeking(..) => {
614 unreachable!("when stopped, the state machine must be in idle state");
615 }
616 }
617 }
618
619 /// Waits for the running task to stop.
620 ///
621 /// On success, the state machine is moved into the idle state.
622 fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
623 loop {
624 match &mut self.state {
625 State::Idle(_) => return Poll::Ready(Ok(())),
626
627 State::WithMut(task) => {
628 // Poll the task to wait for it to finish.
629 let io = ready!(Pin::new(task).poll(cx));
630 self.state = State::Idle(Some(io));
631 }
632
633 State::Streaming(any, task) => {
634 // Drop the receiver to close the channel. This stops the `send()` operation in
635 // the task, after which the task returns the iterator back.
636 any.take();
637
638 // Poll the task to retrieve the iterator.
639 let iter = ready!(Pin::new(task).poll(cx));
640 self.state = State::Idle(Some(iter));
641 }
642
643 State::Reading(reader, task) => {
644 // Drop the reader to close the pipe. This stops copying inside the task, after
645 // which the task returns the I/O handle back.
646 reader.take();
647
648 // Poll the task to retrieve the I/O handle.
649 let (res, io) = ready!(Pin::new(task).poll(cx));
650 // Make sure to move into the idle state before reporting errors.
651 self.state = State::Idle(Some(io));
652 res?;
653 }
654
655 State::Writing(writer, task) => {
656 // Drop the writer to close the pipe. This stops copying inside the task, after
657 // which the task flushes the I/O handle and
658 writer.take();
659
660 // Poll the task to retrieve the I/O handle.
661 let (res, io) = ready!(Pin::new(task).poll(cx));
662 // Make sure to move into the idle state before reporting errors.
663 self.state = State::Idle(Some(io));
664 res?;
665 }
666
667 State::Seeking(task) => {
668 // Poll the task to wait for it to finish.
669 let (_, res, io) = ready!(Pin::new(task).poll(cx));
670 // Make sure to move into the idle state before reporting errors.
671 self.state = State::Idle(Some(io));
672 res?;
673 }
674 }
675 }
676 }
677}
678
679impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
680 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681 struct Closed;
682 impl fmt::Debug for Closed {
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 f.write_str("<closed>")
685 }
686 }
687
688 struct Blocked;
689 impl fmt::Debug for Blocked {
690 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691 f.write_str("<blocked>")
692 }
693 }
694
695 match &self.state {
696 State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
697 State::Idle(Some(io)) => {
698 let io: &T = io;
699 f.debug_struct("Unblock").field("io", io).finish()
700 }
701 State::WithMut(..)
702 | State::Streaming(..)
703 | State::Reading(..)
704 | State::Writing(..)
705 | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
706 }
707 }
708}
709
710/// Current state of a blocking task.
711enum State<T> {
712 /// There is no blocking task.
713 ///
714 /// The inner value is readily available, unless it has already been extracted. The value is
715 /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
716 /// [`Unblock`].
717 Idle(Option<Box<T>>),
718
719 /// A [`Unblock::with_mut()`] closure was spawned and is still running.
720 WithMut(Task<Box<T>>),
721
722 /// The inner value is an [`Iterator`] currently iterating in a task.
723 ///
724 /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`.
725 Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>),
726
727 /// The inner value is a [`Read`] currently reading in a task.
728 Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
729
730 /// The inner value is a [`Write`] currently writing in a task.
731 Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
732
733 /// The inner value is a [`Seek`] currently seeking in a task.
734 Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
735}
736
737impl<T: Iterator + Send + 'static> Stream for Unblock<T>
738where
739 T::Item: Send + 'static,
740{
741 type Item = T::Item;
742
743 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
744 loop {
745 match &mut self.state {
746 // If not in idle or active streaming state, stop the running task.
747 State::WithMut(..)
748 | State::Streaming(None, _)
749 | State::Reading(..)
750 | State::Writing(..)
751 | State::Seeking(..) => {
752 // Wait for the running task to stop.
753 ready!(self.poll_stop(cx)).ok();
754 }
755
756 // If idle, start a streaming task.
757 State::Idle(iter) => {
758 // Take the iterator out to run it on a blocking task.
759 let mut iter = iter.take().expect("inner iterator was taken out");
760
761 // This channel capacity seems to work well in practice. If it's too low, there
762 // will be too much synchronization between tasks. If too high, memory
763 // consumption increases.
764 let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
765
766 // Spawn a blocking task that runs the iterator and returns it when done.
767 let task = Executor::spawn(async move {
768 for item in &mut iter {
769 if sender.send(item).await.is_err() {
770 break;
771 }
772 }
773 iter
774 });
775
776 // Move into the busy state and poll again.
777 self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task);
778 }
779
780 // If streaming, receive an item.
781 State::Streaming(Some(any), task) => {
782 let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap();
783
784 // Poll the channel.
785 let opt = ready!(receiver.as_mut().poll_next(cx));
786
787 // If the channel is closed, retrieve the iterator back from the blocking task.
788 // This is not really a required step, but it's cleaner to drop the iterator on
789 // the same thread that created it.
790 if opt.is_none() {
791 // Poll the task to retrieve the iterator.
792 let iter = ready!(Pin::new(task).poll(cx));
793 self.state = State::Idle(Some(iter));
794 }
795
796 return Poll::Ready(opt);
797 }
798 }
799 }
800 }
801}
802
803impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
804 fn poll_read(
805 mut self: Pin<&mut Self>,
806 cx: &mut Context<'_>,
807 buf: &mut [u8],
808 ) -> Poll<io::Result<usize>> {
809 loop {
810 match &mut self.state {
811 // If not in idle or active reading state, stop the running task.
812 State::WithMut(..)
813 | State::Reading(None, _)
814 | State::Streaming(..)
815 | State::Writing(..)
816 | State::Seeking(..) => {
817 // Wait for the running task to stop.
818 ready!(self.poll_stop(cx))?;
819 }
820
821 // If idle, start a reading task.
822 State::Idle(io) => {
823 // Take the I/O handle out to read it on a blocking task.
824 let mut io = io.take().expect("inner value was taken out");
825
826 // This pipe capacity seems to work well in practice. If it's too low, there
827 // will be too much synchronization between tasks. If too high, memory
828 // consumption increases.
829 let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
830
831 // Spawn a blocking task that reads and returns the I/O handle when done.
832 let task = Executor::spawn(async move {
833 // Copy bytes from the I/O handle into the pipe until the pipe is closed or
834 // an error occurs.
835 loop {
836 match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await {
837 Ok(0) => return (Ok(()), io),
838 Ok(_) => {}
839 Err(err) => return (Err(err), io),
840 }
841 }
842 });
843
844 // Move into the busy state and poll again.
845 self.state = State::Reading(Some(reader), task);
846 }
847
848 // If reading, read bytes from the pipe.
849 State::Reading(Some(reader), task) => {
850 // Poll the pipe.
851 let n = ready!(reader.poll_drain(cx, buf))?;
852
853 // If the pipe is closed, retrieve the I/O handle back from the blocking task.
854 // This is not really a required step, but it's cleaner to drop the handle on
855 // the same thread that created it.
856 if n == 0 {
857 // Poll the task to retrieve the I/O handle.
858 let (res, io) = ready!(Pin::new(task).poll(cx));
859 // Make sure to move into the idle state before reporting errors.
860 self.state = State::Idle(Some(io));
861 res?;
862 }
863
864 return Poll::Ready(Ok(n));
865 }
866 }
867 }
868 }
869}
870
871impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
872 fn poll_write(
873 mut self: Pin<&mut Self>,
874 cx: &mut Context<'_>,
875 buf: &[u8],
876 ) -> Poll<io::Result<usize>> {
877 loop {
878 match &mut self.state {
879 // If not in idle or active writing state, stop the running task.
880 State::WithMut(..)
881 | State::Writing(None, _)
882 | State::Streaming(..)
883 | State::Reading(..)
884 | State::Seeking(..) => {
885 // Wait for the running task to stop.
886 ready!(self.poll_stop(cx))?;
887 }
888
889 // If idle, start the writing task.
890 State::Idle(io) => {
891 // Take the I/O handle out to write on a blocking task.
892 let mut io = io.take().expect("inner value was taken out");
893
894 // This pipe capacity seems to work well in practice. If it's too low, there will
895 // be too much synchronization between tasks. If too high, memory consumption
896 // increases.
897 let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
898
899 // Spawn a blocking task that writes and returns the I/O handle when done.
900 let task = Executor::spawn(async move {
901 // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
902 // error occurs. Flush the I/O handle at the end.
903 loop {
904 match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await {
905 Ok(0) => return (io.flush(), io),
906 Ok(_) => {}
907 Err(err) => {
908 io.flush().ok();
909 return (Err(err), io);
910 }
911 }
912 }
913 });
914
915 // Move into the busy state and poll again.
916 self.state = State::Writing(Some(writer), task);
917 }
918
919 // If writing, write more bytes into the pipe.
920 State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf),
921 }
922 }
923 }
924
925 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
926 loop {
927 match &mut self.state {
928 // If not in idle state, stop the running task.
929 State::WithMut(..)
930 | State::Streaming(..)
931 | State::Writing(..)
932 | State::Reading(..)
933 | State::Seeking(..) => {
934 // Wait for the running task to stop.
935 ready!(self.poll_stop(cx))?;
936 }
937
938 // Idle implies flushed.
939 State::Idle(_) => return Poll::Ready(Ok(())),
940 }
941 }
942 }
943
944 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
945 // First, make sure the I/O handle is flushed.
946 ready!(Pin::new(&mut self).poll_flush(cx))?;
947
948 // Then move into the idle state with no I/O handle, thus dropping it.
949 self.state = State::Idle(None);
950 Poll::Ready(Ok(()))
951 }
952}
953
954impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
955 fn poll_seek(
956 mut self: Pin<&mut Self>,
957 cx: &mut Context<'_>,
958 pos: SeekFrom,
959 ) -> Poll<io::Result<u64>> {
960 loop {
961 match &mut self.state {
962 // If not in idle state, stop the running task.
963 State::WithMut(..)
964 | State::Streaming(..)
965 | State::Reading(..)
966 | State::Writing(..) => {
967 // Wait for the running task to stop.
968 ready!(self.poll_stop(cx))?;
969 }
970
971 State::Idle(io) => {
972 // Take the I/O handle out to seek on a blocking task.
973 let mut io = io.take().expect("inner value was taken out");
974
975 let task = Executor::spawn(async move {
976 let res = io.seek(pos);
977 (pos, res, io)
978 });
979 self.state = State::Seeking(task);
980 }
981
982 State::Seeking(task) => {
983 // Poll the task to wait for it to finish.
984 let (original_pos, res, io) = ready!(Pin::new(task).poll(cx));
985 // Make sure to move into the idle state before reporting errors.
986 self.state = State::Idle(Some(io));
987 let current = res?;
988
989 // If the `pos` argument matches the original one, return the result.
990 if original_pos == pos {
991 return Poll::Ready(Ok(current));
992 }
993 }
994 }
995 }
996 }
997}
998
999#[cfg(all(test, not(target_family = "wasm")))]
1000mod tests {
1001 use super::*;
1002
1003 #[test]
1004 fn test_max_threads() {
1005 // properly set env var
1006 env::set_var(MAX_THREADS_ENV, "100");
1007 assert_eq!(100, Executor::max_threads().get());
1008
1009 // passed value below minimum, so we set it to minimum
1010 env::set_var(MAX_THREADS_ENV, "0");
1011 assert_eq!(1, Executor::max_threads().get());
1012
1013 // passed value above maximum, so we set to allowed maximum
1014 env::set_var(MAX_THREADS_ENV, "50000");
1015 assert_eq!(10000, Executor::max_threads().get());
1016
1017 // no env var, use default
1018 env::set_var(MAX_THREADS_ENV, "");
1019 assert_eq!(500, Executor::max_threads().get());
1020
1021 // not a number, use default
1022 env::set_var(MAX_THREADS_ENV, "NOTINT");
1023 assert_eq!(500, Executor::max_threads().get());
1024 }
1025}