blocking/
lib.rs

1//! A thread pool for isolating blocking I/O in async programs.
2//!
3//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async
4//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible
5//! solutions, they're not always available or ideal.
6//!
7//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special thread
8//! pool provided by this crate. The pool dynamically spawns and stops threads depending on the
9//! current number of running I/O jobs.
10//!
11//! Note that there is a limit on the number of active threads. Once that limit is hit, a running
12//! job has to finish before others get a chance to run. When a thread is idle, it waits for the
13//! next job or shuts down after a certain timeout.
14//!
15//! The default number of threads (set to 500) can be altered by setting BLOCKING_MAX_THREADS environment
16//! variable with value between 1 and 10000.
17//!
18//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port
19//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html
20//! [io_uring]: https://lwn.net/Articles/776703
21//!
22//! # Examples
23//!
24//! Read the contents of a file:
25//!
26//! ```no_run
27//! use blocking::unblock;
28//! use std::fs;
29//!
30//! # futures_lite::future::block_on(async {
31//! let contents = unblock(|| fs::read_to_string("file.txt")).await?;
32//! println!("{}", contents);
33//! # std::io::Result::Ok(()) });
34//! ```
35//!
36//! Read a file and pipe its contents to stdout:
37//!
38//! ```no_run
39//! use blocking::{unblock, Unblock};
40//! use futures_lite::io;
41//! use std::fs::File;
42//!
43//! # futures_lite::future::block_on(async {
44//! let input = unblock(|| File::open("file.txt")).await?;
45//! let input = Unblock::new(input);
46//! let mut output = Unblock::new(std::io::stdout());
47//!
48//! io::copy(input, &mut output).await?;
49//! # std::io::Result::Ok(()) });
50//! ```
51//!
52//! Iterate over the contents of a directory:
53//!
54//! ```no_run
55//! use blocking::Unblock;
56//! use futures_lite::prelude::*;
57//! use std::fs;
58//!
59//! # futures_lite::future::block_on(async {
60//! let mut dir = Unblock::new(fs::read_dir(".")?);
61//! while let Some(item) = dir.next().await {
62//!     println!("{}", item?.file_name().to_string_lossy());
63//! }
64//! # std::io::Result::Ok(()) });
65//! ```
66//!
67//! Spawn a process:
68//!
69//! ```no_run
70//! use blocking::unblock;
71//! use std::process::Command;
72//!
73//! # futures_lite::future::block_on(async {
74//! let out = unblock(|| Command::new("dir").output()).await?;
75//! # std::io::Result::Ok(()) });
76//! ```
77
78#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
79#![forbid(unsafe_code)]
80#![doc(
81    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
82)]
83#![doc(
84    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
85)]
86
87use std::any::Any;
88use std::collections::VecDeque;
89use std::fmt;
90use std::io::{self, Read, Seek, SeekFrom, Write};
91use std::num::NonZeroUsize;
92use std::panic;
93use std::pin::Pin;
94use std::sync::atomic::{AtomicUsize, Ordering};
95use std::sync::{Condvar, Mutex, MutexGuard};
96use std::task::{Context, Poll};
97use std::thread;
98use std::time::Duration;
99
100#[cfg(not(target_family = "wasm"))]
101use std::env;
102
103use async_channel::{bounded, Receiver};
104use async_task::Runnable;
105use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
106use futures_lite::{
107    future::{self, Future},
108    ready,
109    stream::Stream,
110};
111use piper::{pipe, Reader, Writer};
112
113#[doc(no_inline)]
114pub use async_task::Task;
115
116/// Default value for max threads that Executor can grow to
117#[cfg(not(target_family = "wasm"))]
118const DEFAULT_MAX_THREADS: NonZeroUsize = {
119    if let Some(size) = NonZeroUsize::new(500) {
120        size
121    } else {
122        panic!("DEFAULT_MAX_THREADS is non-zero");
123    }
124};
125
126/// Minimum value for max threads config
127#[cfg(not(target_family = "wasm"))]
128const MIN_MAX_THREADS: usize = 1;
129
130/// Maximum value for max threads config
131#[cfg(not(target_family = "wasm"))]
132const MAX_MAX_THREADS: usize = 10000;
133
134/// Env variable that allows to override default value for max threads.
135#[cfg(not(target_family = "wasm"))]
136const MAX_THREADS_ENV: &str = "BLOCKING_MAX_THREADS";
137
138/// The blocking executor.
139struct Executor {
140    /// Inner state of the executor.
141    inner: Mutex<Inner>,
142
143    /// Used to put idle threads to sleep and wake them up when new work comes in.
144    cvar: Condvar,
145}
146
147/// Inner state of the blocking executor.
148struct Inner {
149    /// Number of idle threads in the pool.
150    ///
151    /// Idle threads are sleeping, waiting to get a task to run.
152    idle_count: usize,
153
154    /// Total number of threads in the pool.
155    ///
156    /// This is the number of idle threads + the number of active threads.
157    thread_count: usize,
158
159    // TODO: The option is only used for const-initialization. This can be replaced with
160    // a normal VecDeque when the MSRV can be bumped passed
161    /// The queue of blocking tasks.
162    queue: Option<VecDeque<Runnable>>,
163
164    /// Maximum number of threads in the pool
165    thread_limit: Option<NonZeroUsize>,
166}
167
168impl Inner {
169    #[inline]
170    fn queue(&mut self) -> &mut VecDeque<Runnable> {
171        self.queue.get_or_insert_with(VecDeque::new)
172    }
173}
174
175impl Executor {
176    #[cfg(not(target_family = "wasm"))]
177    fn max_threads() -> NonZeroUsize {
178        match env::var(MAX_THREADS_ENV) {
179            Ok(v) => v
180                .parse::<usize>()
181                .ok()
182                .and_then(|v| NonZeroUsize::new(v.clamp(MIN_MAX_THREADS, MAX_MAX_THREADS)))
183                .unwrap_or(DEFAULT_MAX_THREADS),
184            Err(_) => DEFAULT_MAX_THREADS,
185        }
186    }
187
188    #[cfg(target_family = "wasm")]
189    fn max_threads() -> NonZeroUsize {
190        NonZeroUsize::new(1).unwrap()
191    }
192
193    /// Get a reference to the global executor.
194    #[inline]
195    fn get() -> &'static Self {
196        #[cfg(not(target_family = "wasm"))]
197        {
198            static EXECUTOR: Executor = Executor {
199                inner: Mutex::new(Inner {
200                    idle_count: 0,
201                    thread_count: 0,
202                    queue: None,
203                    thread_limit: None,
204                }),
205                cvar: Condvar::new(),
206            };
207
208            &EXECUTOR
209        }
210
211        #[cfg(target_family = "wasm")]
212        panic!("cannot spawn a blocking task on WASM")
213    }
214
215    /// Spawns a future onto this executor.
216    ///
217    /// Returns a [`Task`] handle for the spawned task.
218    fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
219        let (runnable, task) = async_task::Builder::new().propagate_panic(true).spawn(
220            move |()| future,
221            |r| {
222                // Initialize the executor if we haven't already.
223                let executor = Self::get();
224
225                // Schedule the task on our executor.
226                executor.schedule(r)
227            },
228        );
229        runnable.schedule();
230        task
231    }
232
233    /// Runs the main loop on the current thread.
234    ///
235    /// This function runs blocking tasks until it becomes idle and times out.
236    fn main_loop(&'static self) {
237        #[cfg(feature = "tracing")]
238        let _span = tracing::trace_span!("blocking::main_loop").entered();
239
240        let mut inner = self.inner.lock().unwrap();
241        loop {
242            // This thread is not idle anymore because it's going to run tasks.
243            inner.idle_count -= 1;
244
245            // Run tasks in the queue.
246            while let Some(runnable) = inner.queue().pop_front() {
247                // We have found a task - grow the pool if needed.
248                self.grow_pool(inner);
249
250                // Run the task.
251                panic::catch_unwind(|| runnable.run()).ok();
252
253                // Re-lock the inner state and continue.
254                inner = self.inner.lock().unwrap();
255            }
256
257            // This thread is now becoming idle.
258            inner.idle_count += 1;
259
260            // Put the thread to sleep until another task is scheduled.
261            let timeout = Duration::from_millis(500);
262            #[cfg(feature = "tracing")]
263            tracing::trace!(?timeout, "going to sleep");
264            let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap();
265            inner = lock;
266
267            // If there are no tasks after a while, stop this thread.
268            if res.timed_out() && inner.queue().is_empty() {
269                inner.idle_count -= 1;
270                inner.thread_count -= 1;
271                break;
272            }
273
274            #[cfg(feature = "tracing")]
275            tracing::trace!("notified");
276        }
277
278        #[cfg(feature = "tracing")]
279        tracing::trace!("shutting down due to lack of tasks");
280    }
281
282    /// Schedules a runnable task for execution.
283    fn schedule(&'static self, runnable: Runnable) {
284        let mut inner = self.inner.lock().unwrap();
285        inner.queue().push_back(runnable);
286
287        // Notify a sleeping thread and spawn more threads if needed.
288        self.cvar.notify_one();
289        self.grow_pool(inner);
290    }
291
292    /// Spawns more blocking threads if the pool is overloaded with work.
293    fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
294        #[cfg(feature = "tracing")]
295        let _span = tracing::trace_span!(
296            "grow_pool",
297            queue_len = inner.queue().len(),
298            idle_count = inner.idle_count,
299            thread_count = inner.thread_count,
300        )
301        .entered();
302
303        let thread_limit = inner
304            .thread_limit
305            .get_or_insert_with(Self::max_threads)
306            .get();
307
308        // If runnable tasks greatly outnumber idle threads and there aren't too many threads
309        // already, then be aggressive: wake all idle threads and spawn one more thread.
310        while inner.queue().len() > inner.idle_count * 5 && inner.thread_count < thread_limit {
311            #[cfg(feature = "tracing")]
312            tracing::trace!("spawning a new thread to handle blocking tasks");
313
314            // The new thread starts in idle state.
315            inner.idle_count += 1;
316            inner.thread_count += 1;
317
318            // Notify all existing idle threads because we need to hurry up.
319            self.cvar.notify_all();
320
321            // Generate a new thread ID.
322            static ID: AtomicUsize = AtomicUsize::new(1);
323            let id = ID.fetch_add(1, Ordering::Relaxed);
324
325            // Spawn the new thread.
326            if let Err(_e) = thread::Builder::new()
327                .name(format!("blocking-{}", id))
328                .spawn(move || self.main_loop())
329            {
330                // We were unable to spawn the thread, so we need to undo the state changes.
331                #[cfg(feature = "tracing")]
332                tracing::error!("failed to spawn a blocking thread: {}", _e);
333                inner.idle_count -= 1;
334                inner.thread_count -= 1;
335
336                // The current number of threads is likely to be the system's upper limit, so update
337                // thread_limit accordingly.
338                inner.thread_limit = {
339                    let new_limit = inner.thread_count;
340
341                    // If the limit is about to be set to zero, set it to one instead so that if,
342                    // in the future, we are able to spawn more threads, we will be able to do so.
343                    Some(NonZeroUsize::new(new_limit).unwrap_or_else(|| {
344                        #[cfg(feature = "tracing")]
345                        tracing::warn!(
346                            "attempted to lower thread_limit to zero; setting to one instead"
347                        );
348                        NonZeroUsize::new(1).unwrap()
349                    }))
350                };
351            }
352        }
353    }
354}
355
356/// Runs blocking code on a thread pool.
357///
358/// # Examples
359///
360/// Read the contents of a file:
361///
362/// ```no_run
363/// use blocking::unblock;
364/// use std::fs;
365///
366/// # futures_lite::future::block_on(async {
367/// let contents = unblock(|| fs::read_to_string("file.txt")).await?;
368/// # std::io::Result::Ok(()) });
369/// ```
370///
371/// Spawn a process:
372///
373/// ```no_run
374/// use blocking::unblock;
375/// use std::process::Command;
376///
377/// # futures_lite::future::block_on(async {
378/// let out = unblock(|| Command::new("dir").output()).await?;
379/// # std::io::Result::Ok(()) });
380/// ```
381pub fn unblock<T, F>(f: F) -> Task<T>
382where
383    F: FnOnce() -> T + Send + 'static,
384    T: Send + 'static,
385{
386    Executor::spawn(async move { f() })
387}
388
389/// Runs blocking I/O on a thread pool.
390///
391/// Blocking I/O must be isolated from async code. This type moves blocking I/O operations onto a
392/// special thread pool while exposing a familiar async interface.
393///
394/// This type implements traits [`Stream`], [`AsyncRead`], [`AsyncWrite`], or [`AsyncSeek`] if the
395/// inner type implements [`Iterator`], [`Read`], [`Write`], or [`Seek`], respectively.
396///
397/// # Caveats
398///
399/// [`Unblock`] is a low-level primitive, and as such it comes with some caveats.
400///
401/// For higher-level primitives built on top of [`Unblock`], look into [`async-fs`] or
402/// [`async-process`] (on Windows).
403///
404/// [`async-fs`]: https://github.com/smol-rs/async-fs
405/// [`async-process`]: https://github.com/smol-rs/async-process
406///
407/// [`Unblock`] communicates with I/O operations on the thread pool through a pipe. That means an
408/// async read/write operation simply receives/sends some bytes from/into the pipe. When in reading
409/// mode, the thread pool reads bytes from the I/O handle and forwards them into the pipe until it
410/// becomes full. When in writing mode, the thread pool reads bytes from the pipe and forwards them
411/// into the I/O handle.
412///
413/// Use [`Unblock::with_capacity()`] to configure the capacity of the pipe.
414///
415/// ### Reading
416///
417/// If you create an [`Unblock`]`<`[`Stdin`][`std::io::Stdin`]`>`, read some bytes from it,
418/// and then drop it, a blocked read operation may keep hanging on the thread pool. The next
419/// attempt to read from stdin will lose bytes read by the hanging operation. This is a difficult
420/// problem to solve, so make sure you only use a single stdin handle for the duration of the
421/// entire program.
422///
423/// ### Writing
424///
425/// If writing data through the [`AsyncWrite`] trait, make sure to flush before dropping the
426/// [`Unblock`] handle or some buffered data might get lost.
427///
428/// ### Seeking
429///
430/// Because of buffering in the pipe, if [`Unblock`] wraps a [`File`][`std::fs::File`], a single
431/// read operation may move the file cursor farther than is the span of the operation. In fact,
432/// reading just keeps going in the background until the pipe gets full. Keep this mind when
433/// using [`AsyncSeek`] with [relative][`SeekFrom::Current`] offsets.
434///
435/// # Examples
436///
437/// ```
438/// use blocking::Unblock;
439/// use futures_lite::prelude::*;
440///
441/// # futures_lite::future::block_on(async {
442/// let mut stdout = Unblock::new(std::io::stdout());
443/// stdout.write_all(b"Hello world!").await?;
444/// stdout.flush().await?;
445/// # std::io::Result::Ok(()) });
446/// ```
447pub struct Unblock<T> {
448    state: State<T>,
449    cap: Option<usize>,
450}
451
452impl<T> Unblock<T> {
453    /// Wraps a blocking I/O handle into the async [`Unblock`] interface.
454    ///
455    /// # Examples
456    ///
457    /// ```no_run
458    /// use blocking::Unblock;
459    ///
460    /// let stdin = Unblock::new(std::io::stdin());
461    /// ```
462    pub fn new(io: T) -> Unblock<T> {
463        Unblock {
464            state: State::Idle(Some(Box::new(io))),
465            cap: None,
466        }
467    }
468
469    /// Wraps a blocking I/O handle into the async [`Unblock`] interface with a custom buffer
470    /// capacity.
471    ///
472    /// When communicating with the inner [`Stream`]/[`Read`]/[`Write`] type from async code, data
473    /// transferred between blocking and async code goes through a buffer of limited capacity. This
474    /// constructor configures that capacity.
475    ///
476    /// The default capacity is:
477    ///
478    /// * For [`Iterator`] types: 8192 items.
479    /// * For [`Read`]/[`Write`] types: 8 MB.
480    ///
481    /// # Examples
482    ///
483    /// ```no_run
484    /// use blocking::Unblock;
485    ///
486    /// let stdout = Unblock::with_capacity(64 * 1024, std::io::stdout());
487    /// ```
488    pub fn with_capacity(cap: usize, io: T) -> Unblock<T> {
489        Unblock {
490            state: State::Idle(Some(Box::new(io))),
491            cap: Some(cap),
492        }
493    }
494
495    /// Gets a mutable reference to the blocking I/O handle.
496    ///
497    /// This is an async method because the I/O handle might be on the thread pool and needs to
498    /// be moved onto the current thread before we can get a reference to it.
499    ///
500    /// # Examples
501    ///
502    /// ```no_run
503    /// use blocking::{unblock, Unblock};
504    /// use std::fs::File;
505    ///
506    /// # futures_lite::future::block_on(async {
507    /// let file = unblock(|| File::create("file.txt")).await?;
508    /// let mut file = Unblock::new(file);
509    ///
510    /// let metadata = file.get_mut().await.metadata()?;
511    /// # std::io::Result::Ok(()) });
512    /// ```
513    pub async fn get_mut(&mut self) -> &mut T {
514        // Wait for the running task to stop and ignore I/O errors if there are any.
515        future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
516
517        // Assume idle state and get a reference to the inner value.
518        match &mut self.state {
519            State::Idle(t) => t.as_mut().expect("inner value was taken out"),
520            State::WithMut(..)
521            | State::Streaming(..)
522            | State::Reading(..)
523            | State::Writing(..)
524            | State::Seeking(..) => {
525                unreachable!("when stopped, the state machine must be in idle state");
526            }
527        }
528    }
529
530    /// Performs a blocking operation on the I/O handle.
531    ///
532    /// # Examples
533    ///
534    /// ```no_run
535    /// use blocking::{unblock, Unblock};
536    /// use std::fs::File;
537    ///
538    /// # futures_lite::future::block_on(async {
539    /// let file = unblock(|| File::create("file.txt")).await?;
540    /// let mut file = Unblock::new(file);
541    ///
542    /// let metadata = file.with_mut(|f| f.metadata()).await?;
543    /// # std::io::Result::Ok(()) });
544    /// ```
545    pub async fn with_mut<R, F>(&mut self, op: F) -> R
546    where
547        F: FnOnce(&mut T) -> R + Send + 'static,
548        R: Send + 'static,
549        T: Send + 'static,
550    {
551        // Wait for the running task to stop and ignore I/O errors if there are any.
552        future::poll_fn(|cx| self.poll_stop(cx)).await.ok();
553
554        // Assume idle state and take out the inner value.
555        let mut t = match &mut self.state {
556            State::Idle(t) => t.take().expect("inner value was taken out"),
557            State::WithMut(..)
558            | State::Streaming(..)
559            | State::Reading(..)
560            | State::Writing(..)
561            | State::Seeking(..) => {
562                unreachable!("when stopped, the state machine must be in idle state");
563            }
564        };
565
566        let (sender, receiver) = bounded(1);
567        let task = Executor::spawn(async move {
568            sender.try_send(op(&mut t)).ok();
569            t
570        });
571        self.state = State::WithMut(task);
572
573        receiver
574            .recv()
575            .await
576            .expect("`Unblock::with_mut()` operation has panicked")
577    }
578
579    /// Extracts the inner blocking I/O handle.
580    ///
581    /// This is an async method because the I/O handle might be on the thread pool and needs to
582    /// be moved onto the current thread before we can extract it.
583    ///
584    /// # Examples
585    ///
586    /// ```no_run
587    /// use blocking::{unblock, Unblock};
588    /// use futures_lite::prelude::*;
589    /// use std::fs::File;
590    ///
591    /// # futures_lite::future::block_on(async {
592    /// let file = unblock(|| File::create("file.txt")).await?;
593    /// let file = Unblock::new(file);
594    ///
595    /// let file = file.into_inner().await;
596    /// # std::io::Result::Ok(()) });
597    /// ```
598    pub async fn into_inner(self) -> T {
599        // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
600        // bind `self` to a local mutable variable.
601        let mut this = self;
602
603        // Wait for the running task to stop and ignore I/O errors if there are any.
604        future::poll_fn(|cx| this.poll_stop(cx)).await.ok();
605
606        // Assume idle state and extract the inner value.
607        match &mut this.state {
608            State::Idle(t) => *t.take().expect("inner value was taken out"),
609            State::WithMut(..)
610            | State::Streaming(..)
611            | State::Reading(..)
612            | State::Writing(..)
613            | State::Seeking(..) => {
614                unreachable!("when stopped, the state machine must be in idle state");
615            }
616        }
617    }
618
619    /// Waits for the running task to stop.
620    ///
621    /// On success, the state machine is moved into the idle state.
622    fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
623        loop {
624            match &mut self.state {
625                State::Idle(_) => return Poll::Ready(Ok(())),
626
627                State::WithMut(task) => {
628                    // Poll the task to wait for it to finish.
629                    let io = ready!(Pin::new(task).poll(cx));
630                    self.state = State::Idle(Some(io));
631                }
632
633                State::Streaming(any, task) => {
634                    // Drop the receiver to close the channel. This stops the `send()` operation in
635                    // the task, after which the task returns the iterator back.
636                    any.take();
637
638                    // Poll the task to retrieve the iterator.
639                    let iter = ready!(Pin::new(task).poll(cx));
640                    self.state = State::Idle(Some(iter));
641                }
642
643                State::Reading(reader, task) => {
644                    // Drop the reader to close the pipe. This stops copying inside the task, after
645                    // which the task returns the I/O handle back.
646                    reader.take();
647
648                    // Poll the task to retrieve the I/O handle.
649                    let (res, io) = ready!(Pin::new(task).poll(cx));
650                    // Make sure to move into the idle state before reporting errors.
651                    self.state = State::Idle(Some(io));
652                    res?;
653                }
654
655                State::Writing(writer, task) => {
656                    // Drop the writer to close the pipe. This stops copying inside the task, after
657                    // which the task flushes the I/O handle and
658                    writer.take();
659
660                    // Poll the task to retrieve the I/O handle.
661                    let (res, io) = ready!(Pin::new(task).poll(cx));
662                    // Make sure to move into the idle state before reporting errors.
663                    self.state = State::Idle(Some(io));
664                    res?;
665                }
666
667                State::Seeking(task) => {
668                    // Poll the task to wait for it to finish.
669                    let (_, res, io) = ready!(Pin::new(task).poll(cx));
670                    // Make sure to move into the idle state before reporting errors.
671                    self.state = State::Idle(Some(io));
672                    res?;
673                }
674            }
675        }
676    }
677}
678
679impl<T: fmt::Debug> fmt::Debug for Unblock<T> {
680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681        struct Closed;
682        impl fmt::Debug for Closed {
683            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684                f.write_str("<closed>")
685            }
686        }
687
688        struct Blocked;
689        impl fmt::Debug for Blocked {
690            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691                f.write_str("<blocked>")
692            }
693        }
694
695        match &self.state {
696            State::Idle(None) => f.debug_struct("Unblock").field("io", &Closed).finish(),
697            State::Idle(Some(io)) => {
698                let io: &T = io;
699                f.debug_struct("Unblock").field("io", io).finish()
700            }
701            State::WithMut(..)
702            | State::Streaming(..)
703            | State::Reading(..)
704            | State::Writing(..)
705            | State::Seeking(..) => f.debug_struct("Unblock").field("io", &Blocked).finish(),
706        }
707    }
708}
709
710/// Current state of a blocking task.
711enum State<T> {
712    /// There is no blocking task.
713    ///
714    /// The inner value is readily available, unless it has already been extracted. The value is
715    /// extracted out by [`Unblock::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting
716    /// [`Unblock`].
717    Idle(Option<Box<T>>),
718
719    /// A [`Unblock::with_mut()`] closure was spawned and is still running.
720    WithMut(Task<Box<T>>),
721
722    /// The inner value is an [`Iterator`] currently iterating in a task.
723    ///
724    /// The `dyn Any` value here is a `Pin<Box<Receiver<<T as Iterator>::Item>>>`.
725    Streaming(Option<Box<dyn Any + Send + Sync>>, Task<Box<T>>),
726
727    /// The inner value is a [`Read`] currently reading in a task.
728    Reading(Option<Reader>, Task<(io::Result<()>, Box<T>)>),
729
730    /// The inner value is a [`Write`] currently writing in a task.
731    Writing(Option<Writer>, Task<(io::Result<()>, Box<T>)>),
732
733    /// The inner value is a [`Seek`] currently seeking in a task.
734    Seeking(Task<(SeekFrom, io::Result<u64>, Box<T>)>),
735}
736
737impl<T: Iterator + Send + 'static> Stream for Unblock<T>
738where
739    T::Item: Send + 'static,
740{
741    type Item = T::Item;
742
743    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
744        loop {
745            match &mut self.state {
746                // If not in idle or active streaming state, stop the running task.
747                State::WithMut(..)
748                | State::Streaming(None, _)
749                | State::Reading(..)
750                | State::Writing(..)
751                | State::Seeking(..) => {
752                    // Wait for the running task to stop.
753                    ready!(self.poll_stop(cx)).ok();
754                }
755
756                // If idle, start a streaming task.
757                State::Idle(iter) => {
758                    // Take the iterator out to run it on a blocking task.
759                    let mut iter = iter.take().expect("inner iterator was taken out");
760
761                    // This channel capacity seems to work well in practice. If it's too low, there
762                    // will be too much synchronization between tasks. If too high, memory
763                    // consumption increases.
764                    let (sender, receiver) = bounded(self.cap.unwrap_or(8 * 1024)); // 8192 items
765
766                    // Spawn a blocking task that runs the iterator and returns it when done.
767                    let task = Executor::spawn(async move {
768                        for item in &mut iter {
769                            if sender.send(item).await.is_err() {
770                                break;
771                            }
772                        }
773                        iter
774                    });
775
776                    // Move into the busy state and poll again.
777                    self.state = State::Streaming(Some(Box::new(Box::pin(receiver))), task);
778                }
779
780                // If streaming, receive an item.
781                State::Streaming(Some(any), task) => {
782                    let receiver = any.downcast_mut::<Pin<Box<Receiver<T::Item>>>>().unwrap();
783
784                    // Poll the channel.
785                    let opt = ready!(receiver.as_mut().poll_next(cx));
786
787                    // If the channel is closed, retrieve the iterator back from the blocking task.
788                    // This is not really a required step, but it's cleaner to drop the iterator on
789                    // the same thread that created it.
790                    if opt.is_none() {
791                        // Poll the task to retrieve the iterator.
792                        let iter = ready!(Pin::new(task).poll(cx));
793                        self.state = State::Idle(Some(iter));
794                    }
795
796                    return Poll::Ready(opt);
797                }
798            }
799        }
800    }
801}
802
803impl<T: Read + Send + 'static> AsyncRead for Unblock<T> {
804    fn poll_read(
805        mut self: Pin<&mut Self>,
806        cx: &mut Context<'_>,
807        buf: &mut [u8],
808    ) -> Poll<io::Result<usize>> {
809        loop {
810            match &mut self.state {
811                // If not in idle or active reading state, stop the running task.
812                State::WithMut(..)
813                | State::Reading(None, _)
814                | State::Streaming(..)
815                | State::Writing(..)
816                | State::Seeking(..) => {
817                    // Wait for the running task to stop.
818                    ready!(self.poll_stop(cx))?;
819                }
820
821                // If idle, start a reading task.
822                State::Idle(io) => {
823                    // Take the I/O handle out to read it on a blocking task.
824                    let mut io = io.take().expect("inner value was taken out");
825
826                    // This pipe capacity seems to work well in practice. If it's too low, there
827                    // will be too much synchronization between tasks. If too high, memory
828                    // consumption increases.
829                    let (reader, mut writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
830
831                    // Spawn a blocking task that reads and returns the I/O handle when done.
832                    let task = Executor::spawn(async move {
833                        // Copy bytes from the I/O handle into the pipe until the pipe is closed or
834                        // an error occurs.
835                        loop {
836                            match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await {
837                                Ok(0) => return (Ok(()), io),
838                                Ok(_) => {}
839                                Err(err) => return (Err(err), io),
840                            }
841                        }
842                    });
843
844                    // Move into the busy state and poll again.
845                    self.state = State::Reading(Some(reader), task);
846                }
847
848                // If reading, read bytes from the pipe.
849                State::Reading(Some(reader), task) => {
850                    // Poll the pipe.
851                    let n = ready!(reader.poll_drain(cx, buf))?;
852
853                    // If the pipe is closed, retrieve the I/O handle back from the blocking task.
854                    // This is not really a required step, but it's cleaner to drop the handle on
855                    // the same thread that created it.
856                    if n == 0 {
857                        // Poll the task to retrieve the I/O handle.
858                        let (res, io) = ready!(Pin::new(task).poll(cx));
859                        // Make sure to move into the idle state before reporting errors.
860                        self.state = State::Idle(Some(io));
861                        res?;
862                    }
863
864                    return Poll::Ready(Ok(n));
865                }
866            }
867        }
868    }
869}
870
871impl<T: Write + Send + 'static> AsyncWrite for Unblock<T> {
872    fn poll_write(
873        mut self: Pin<&mut Self>,
874        cx: &mut Context<'_>,
875        buf: &[u8],
876    ) -> Poll<io::Result<usize>> {
877        loop {
878            match &mut self.state {
879                // If not in idle or active writing state, stop the running task.
880                State::WithMut(..)
881                | State::Writing(None, _)
882                | State::Streaming(..)
883                | State::Reading(..)
884                | State::Seeking(..) => {
885                    // Wait for the running task to stop.
886                    ready!(self.poll_stop(cx))?;
887                }
888
889                // If idle, start the writing task.
890                State::Idle(io) => {
891                    // Take the I/O handle out to write on a blocking task.
892                    let mut io = io.take().expect("inner value was taken out");
893
894                    // This pipe capacity seems to work well in practice. If it's too low, there will
895                    // be too much synchronization between tasks. If too high, memory consumption
896                    // increases.
897                    let (mut reader, writer) = pipe(self.cap.unwrap_or(8 * 1024 * 1024)); // 8 MB
898
899                    // Spawn a blocking task that writes and returns the I/O handle when done.
900                    let task = Executor::spawn(async move {
901                        // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
902                        // error occurs. Flush the I/O handle at the end.
903                        loop {
904                            match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await {
905                                Ok(0) => return (io.flush(), io),
906                                Ok(_) => {}
907                                Err(err) => {
908                                    io.flush().ok();
909                                    return (Err(err), io);
910                                }
911                            }
912                        }
913                    });
914
915                    // Move into the busy state and poll again.
916                    self.state = State::Writing(Some(writer), task);
917                }
918
919                // If writing, write more bytes into the pipe.
920                State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf),
921            }
922        }
923    }
924
925    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
926        loop {
927            match &mut self.state {
928                // If not in idle state, stop the running task.
929                State::WithMut(..)
930                | State::Streaming(..)
931                | State::Writing(..)
932                | State::Reading(..)
933                | State::Seeking(..) => {
934                    // Wait for the running task to stop.
935                    ready!(self.poll_stop(cx))?;
936                }
937
938                // Idle implies flushed.
939                State::Idle(_) => return Poll::Ready(Ok(())),
940            }
941        }
942    }
943
944    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
945        // First, make sure the I/O handle is flushed.
946        ready!(Pin::new(&mut self).poll_flush(cx))?;
947
948        // Then move into the idle state with no I/O handle, thus dropping it.
949        self.state = State::Idle(None);
950        Poll::Ready(Ok(()))
951    }
952}
953
954impl<T: Seek + Send + 'static> AsyncSeek for Unblock<T> {
955    fn poll_seek(
956        mut self: Pin<&mut Self>,
957        cx: &mut Context<'_>,
958        pos: SeekFrom,
959    ) -> Poll<io::Result<u64>> {
960        loop {
961            match &mut self.state {
962                // If not in idle state, stop the running task.
963                State::WithMut(..)
964                | State::Streaming(..)
965                | State::Reading(..)
966                | State::Writing(..) => {
967                    // Wait for the running task to stop.
968                    ready!(self.poll_stop(cx))?;
969                }
970
971                State::Idle(io) => {
972                    // Take the I/O handle out to seek on a blocking task.
973                    let mut io = io.take().expect("inner value was taken out");
974
975                    let task = Executor::spawn(async move {
976                        let res = io.seek(pos);
977                        (pos, res, io)
978                    });
979                    self.state = State::Seeking(task);
980                }
981
982                State::Seeking(task) => {
983                    // Poll the task to wait for it to finish.
984                    let (original_pos, res, io) = ready!(Pin::new(task).poll(cx));
985                    // Make sure to move into the idle state before reporting errors.
986                    self.state = State::Idle(Some(io));
987                    let current = res?;
988
989                    // If the `pos` argument matches the original one, return the result.
990                    if original_pos == pos {
991                        return Poll::Ready(Ok(current));
992                    }
993                }
994            }
995        }
996    }
997}
998
999#[cfg(all(test, not(target_family = "wasm")))]
1000mod tests {
1001    use super::*;
1002
1003    #[test]
1004    fn test_max_threads() {
1005        // properly set env var
1006        env::set_var(MAX_THREADS_ENV, "100");
1007        assert_eq!(100, Executor::max_threads().get());
1008
1009        // passed value below minimum, so we set it to minimum
1010        env::set_var(MAX_THREADS_ENV, "0");
1011        assert_eq!(1, Executor::max_threads().get());
1012
1013        // passed value above maximum, so we set to allowed maximum
1014        env::set_var(MAX_THREADS_ENV, "50000");
1015        assert_eq!(10000, Executor::max_threads().get());
1016
1017        // no env var, use default
1018        env::set_var(MAX_THREADS_ENV, "");
1019        assert_eq!(500, Executor::max_threads().get());
1020
1021        // not a number, use default
1022        env::set_var(MAX_THREADS_ENV, "NOTINT");
1023        assert_eq!(500, Executor::max_threads().get());
1024    }
1025}