async_executor/
lib.rs

1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//!     println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29    missing_docs,
30    missing_debug_implementations,
31    rust_2018_idioms,
32    clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41
42use std::fmt;
43use std::marker::PhantomData;
44use std::panic::{RefUnwindSafe, UnwindSafe};
45use std::rc::Rc;
46use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
47use std::sync::{Arc, Mutex, RwLock, TryLockError};
48use std::task::{Poll, Waker};
49
50use async_task::{Builder, Runnable};
51use concurrent_queue::ConcurrentQueue;
52use futures_lite::{future, prelude::*};
53use slab::Slab;
54
55#[cfg(feature = "static")]
56mod static_executors;
57
58#[doc(no_inline)]
59pub use async_task::{FallibleTask, Task};
60#[cfg(feature = "static")]
61#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
62pub use static_executors::*;
63
64/// An async executor.
65///
66/// # Examples
67///
68/// A multi-threaded executor:
69///
70/// ```
71/// use async_channel::unbounded;
72/// use async_executor::Executor;
73/// use easy_parallel::Parallel;
74/// use futures_lite::future;
75///
76/// let ex = Executor::new();
77/// let (signal, shutdown) = unbounded::<()>();
78///
79/// Parallel::new()
80///     // Run four executor threads.
81///     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
82///     // Run the main future on the current thread.
83///     .finish(|| future::block_on(async {
84///         println!("Hello world!");
85///         drop(signal);
86///     }));
87/// ```
88pub struct Executor<'a> {
89    /// The executor state.
90    state: AtomicPtr<State>,
91
92    /// Makes the `'a` lifetime invariant.
93    _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
94}
95
96// SAFETY: Executor stores no thread local state that can be accessed via other thread.
97unsafe impl Send for Executor<'_> {}
98// SAFETY: Executor internally synchronizes all of it's operations internally.
99unsafe impl Sync for Executor<'_> {}
100
101impl UnwindSafe for Executor<'_> {}
102impl RefUnwindSafe for Executor<'_> {}
103
104impl fmt::Debug for Executor<'_> {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        debug_executor(self, "Executor", f)
107    }
108}
109
110impl<'a> Executor<'a> {
111    /// Creates a new executor.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use async_executor::Executor;
117    ///
118    /// let ex = Executor::new();
119    /// ```
120    pub const fn new() -> Executor<'a> {
121        Executor {
122            state: AtomicPtr::new(std::ptr::null_mut()),
123            _marker: PhantomData,
124        }
125    }
126
127    /// Returns `true` if there are no unfinished tasks.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use async_executor::Executor;
133    ///
134    /// let ex = Executor::new();
135    /// assert!(ex.is_empty());
136    ///
137    /// let task = ex.spawn(async {
138    ///     println!("Hello world");
139    /// });
140    /// assert!(!ex.is_empty());
141    ///
142    /// assert!(ex.try_tick());
143    /// assert!(ex.is_empty());
144    /// ```
145    pub fn is_empty(&self) -> bool {
146        self.state().active.lock().unwrap().is_empty()
147    }
148
149    /// Spawns a task onto the executor.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use async_executor::Executor;
155    ///
156    /// let ex = Executor::new();
157    ///
158    /// let task = ex.spawn(async {
159    ///     println!("Hello world");
160    /// });
161    /// ```
162    pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
163        let mut active = self.state().active.lock().unwrap();
164
165        // SAFETY: `T` and the future are `Send`.
166        unsafe { self.spawn_inner(future, &mut active) }
167    }
168
169    /// Spawns many tasks onto the executor.
170    ///
171    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
172    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
173    /// contention.
174    ///
175    /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
176    /// prevent runner thread starvation. It is assumed that the iterator provided does not
177    /// block; blocking iterators can lock up the internal mutex and therefore the entire
178    /// executor.
179    ///
180    /// ## Example
181    ///
182    /// ```
183    /// use async_executor::Executor;
184    /// use futures_lite::{stream, prelude::*};
185    /// use std::future::ready;
186    ///
187    /// # futures_lite::future::block_on(async {
188    /// let mut ex = Executor::new();
189    ///
190    /// let futures = [
191    ///     ready(1),
192    ///     ready(2),
193    ///     ready(3)
194    /// ];
195    ///
196    /// // Spawn all of the futures onto the executor at once.
197    /// let mut tasks = vec![];
198    /// ex.spawn_many(futures, &mut tasks);
199    ///
200    /// // Await all of them.
201    /// let results = ex.run(async move {
202    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
203    /// }).await;
204    /// assert_eq!(results, [1, 2, 3]);
205    /// # });
206    /// ```
207    ///
208    /// [`spawn`]: Executor::spawn
209    pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
210        &self,
211        futures: impl IntoIterator<Item = F>,
212        handles: &mut impl Extend<Task<F::Output>>,
213    ) {
214        let mut active = Some(self.state().active.lock().unwrap());
215
216        // Convert the futures into tasks.
217        let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
218            // SAFETY: `T` and the future are `Send`.
219            let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
220
221            // Yield the lock every once in a while to ease contention.
222            if i.wrapping_sub(1) % 500 == 0 {
223                drop(active.take());
224                active = Some(self.state().active.lock().unwrap());
225            }
226
227            task
228        });
229
230        // Push the tasks to the user's collection.
231        handles.extend(tasks);
232    }
233
234    /// Spawn a future while holding the inner lock.
235    ///
236    /// # Safety
237    ///
238    /// If this is an `Executor`, `F` and `T` must be `Send`.
239    unsafe fn spawn_inner<T: 'a>(
240        &self,
241        future: impl Future<Output = T> + 'a,
242        active: &mut Slab<Waker>,
243    ) -> Task<T> {
244        // Remove the task from the set of active tasks when the future finishes.
245        let entry = active.vacant_entry();
246        let index = entry.key();
247        let state = self.state_as_arc();
248        let future = async move {
249            let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
250            future.await
251        };
252
253        // Create the task and register it in the set of active tasks.
254        //
255        // SAFETY:
256        //
257        // If `future` is not `Send`, this must be a `LocalExecutor` as per this
258        // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
259        // `try_tick`, `tick` and `run` can only be called from the origin
260        // thread of the `LocalExecutor`. Similarly, `spawn` can only  be called
261        // from the origin thread, ensuring that `future` and the executor share
262        // the same origin thread. The `Runnable` can be scheduled from other
263        // threads, but because of the above `Runnable` can only be called or
264        // dropped on the origin thread.
265        //
266        // `future` is not `'static`, but we make sure that the `Runnable` does
267        // not outlive `'a`. When the executor is dropped, the `active` field is
268        // drained and all of the `Waker`s are woken. Then, the queue inside of
269        // the `Executor` is drained of all of its runnables. This ensures that
270        // runnables are dropped and this precondition is satisfied.
271        //
272        // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273        // Therefore we do not need to worry about what is done with the
274        // `Waker`.
275        let (runnable, task) = Builder::new()
276            .propagate_panic(true)
277            .spawn_unchecked(|()| future, self.schedule());
278        entry.insert(runnable.waker());
279
280        runnable.schedule();
281        task
282    }
283
284    /// Attempts to run a task if at least one is scheduled.
285    ///
286    /// Running a scheduled task means simply polling its future once.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use async_executor::Executor;
292    ///
293    /// let ex = Executor::new();
294    /// assert!(!ex.try_tick()); // no tasks to run
295    ///
296    /// let task = ex.spawn(async {
297    ///     println!("Hello world");
298    /// });
299    /// assert!(ex.try_tick()); // a task was found
300    /// ```
301    pub fn try_tick(&self) -> bool {
302        self.state().try_tick()
303    }
304
305    /// Runs a single task.
306    ///
307    /// Running a task means simply polling its future once.
308    ///
309    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use async_executor::Executor;
315    /// use futures_lite::future;
316    ///
317    /// let ex = Executor::new();
318    ///
319    /// let task = ex.spawn(async {
320    ///     println!("Hello world");
321    /// });
322    /// future::block_on(ex.tick()); // runs the task
323    /// ```
324    pub async fn tick(&self) {
325        self.state().tick().await;
326    }
327
328    /// Runs the executor until the given future completes.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use async_executor::Executor;
334    /// use futures_lite::future;
335    ///
336    /// let ex = Executor::new();
337    ///
338    /// let task = ex.spawn(async { 1 + 2 });
339    /// let res = future::block_on(ex.run(async { task.await * 2 }));
340    ///
341    /// assert_eq!(res, 6);
342    /// ```
343    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
344        self.state().run(future).await
345    }
346
347    /// Returns a function that schedules a runnable task when it gets woken up.
348    fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349        let state = self.state_as_arc();
350
351        // TODO: If possible, push into the current local queue and notify the ticker.
352        move |runnable| {
353            state.queue.push(runnable).unwrap();
354            state.notify();
355        }
356    }
357
358    /// Returns a pointer to the inner state.
359    #[inline]
360    fn state_ptr(&self) -> *const State {
361        #[cold]
362        fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
363            let state = Arc::new(State::new());
364            // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
365            let ptr = Arc::into_raw(state) as *mut State;
366            if let Err(actual) = atomic_ptr.compare_exchange(
367                std::ptr::null_mut(),
368                ptr,
369                Ordering::AcqRel,
370                Ordering::Acquire,
371            ) {
372                // SAFETY: This was just created from Arc::into_raw.
373                drop(unsafe { Arc::from_raw(ptr) });
374                actual
375            } else {
376                ptr
377            }
378        }
379
380        let mut ptr = self.state.load(Ordering::Acquire);
381        if ptr.is_null() {
382            ptr = alloc_state(&self.state);
383        }
384        ptr
385    }
386
387    /// Returns a reference to the inner state.
388    #[inline]
389    fn state(&self) -> &State {
390        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
391        // when accessed through state_ptr.
392        unsafe { &*self.state_ptr() }
393    }
394
395    // Clones the inner state Arc
396    #[inline]
397    fn state_as_arc(&self) -> Arc<State> {
398        // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
399        // Arc when accessed through state_ptr.
400        let arc = unsafe { Arc::from_raw(self.state_ptr()) };
401        let clone = arc.clone();
402        std::mem::forget(arc);
403        clone
404    }
405}
406
407impl Drop for Executor<'_> {
408    fn drop(&mut self) {
409        let ptr = *self.state.get_mut();
410        if ptr.is_null() {
411            return;
412        }
413
414        // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
415        // via Arc::into_raw in state_ptr.
416        let state = unsafe { Arc::from_raw(ptr) };
417
418        let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
419        for w in active.drain() {
420            w.wake();
421        }
422        drop(active);
423
424        while state.queue.pop().is_ok() {}
425    }
426}
427
428impl<'a> Default for Executor<'a> {
429    fn default() -> Executor<'a> {
430        Executor::new()
431    }
432}
433
434/// A thread-local executor.
435///
436/// The executor can only be run on the thread that created it.
437///
438/// # Examples
439///
440/// ```
441/// use async_executor::LocalExecutor;
442/// use futures_lite::future;
443///
444/// let local_ex = LocalExecutor::new();
445///
446/// future::block_on(local_ex.run(async {
447///     println!("Hello world!");
448/// }));
449/// ```
450pub struct LocalExecutor<'a> {
451    /// The inner executor.
452    inner: Executor<'a>,
453
454    /// Makes the type `!Send` and `!Sync`.
455    _marker: PhantomData<Rc<()>>,
456}
457
458impl UnwindSafe for LocalExecutor<'_> {}
459impl RefUnwindSafe for LocalExecutor<'_> {}
460
461impl fmt::Debug for LocalExecutor<'_> {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        debug_executor(&self.inner, "LocalExecutor", f)
464    }
465}
466
467impl<'a> LocalExecutor<'a> {
468    /// Creates a single-threaded executor.
469    ///
470    /// # Examples
471    ///
472    /// ```
473    /// use async_executor::LocalExecutor;
474    ///
475    /// let local_ex = LocalExecutor::new();
476    /// ```
477    pub const fn new() -> LocalExecutor<'a> {
478        LocalExecutor {
479            inner: Executor::new(),
480            _marker: PhantomData,
481        }
482    }
483
484    /// Returns `true` if there are no unfinished tasks.
485    ///
486    /// # Examples
487    ///
488    /// ```
489    /// use async_executor::LocalExecutor;
490    ///
491    /// let local_ex = LocalExecutor::new();
492    /// assert!(local_ex.is_empty());
493    ///
494    /// let task = local_ex.spawn(async {
495    ///     println!("Hello world");
496    /// });
497    /// assert!(!local_ex.is_empty());
498    ///
499    /// assert!(local_ex.try_tick());
500    /// assert!(local_ex.is_empty());
501    /// ```
502    pub fn is_empty(&self) -> bool {
503        self.inner().is_empty()
504    }
505
506    /// Spawns a task onto the executor.
507    ///
508    /// # Examples
509    ///
510    /// ```
511    /// use async_executor::LocalExecutor;
512    ///
513    /// let local_ex = LocalExecutor::new();
514    ///
515    /// let task = local_ex.spawn(async {
516    ///     println!("Hello world");
517    /// });
518    /// ```
519    pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
520        let mut active = self.inner().state().active.lock().unwrap();
521
522        // SAFETY: This executor is not thread safe, so the future and its result
523        //         cannot be sent to another thread.
524        unsafe { self.inner().spawn_inner(future, &mut active) }
525    }
526
527    /// Spawns many tasks onto the executor.
528    ///
529    /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
530    /// spawns all of the tasks in one go. With large amounts of tasks this can improve
531    /// contention.
532    ///
533    /// It is assumed that the iterator provided does not block; blocking iterators can lock up
534    /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
535    /// mutex is not released, as there are no other threads that can poll this executor.
536    ///
537    /// ## Example
538    ///
539    /// ```
540    /// use async_executor::LocalExecutor;
541    /// use futures_lite::{stream, prelude::*};
542    /// use std::future::ready;
543    ///
544    /// # futures_lite::future::block_on(async {
545    /// let mut ex = LocalExecutor::new();
546    ///
547    /// let futures = [
548    ///     ready(1),
549    ///     ready(2),
550    ///     ready(3)
551    /// ];
552    ///
553    /// // Spawn all of the futures onto the executor at once.
554    /// let mut tasks = vec![];
555    /// ex.spawn_many(futures, &mut tasks);
556    ///
557    /// // Await all of them.
558    /// let results = ex.run(async move {
559    ///     stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
560    /// }).await;
561    /// assert_eq!(results, [1, 2, 3]);
562    /// # });
563    /// ```
564    ///
565    /// [`spawn`]: LocalExecutor::spawn
566    /// [`Executor::spawn_many`]: Executor::spawn_many
567    pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
568        &self,
569        futures: impl IntoIterator<Item = F>,
570        handles: &mut impl Extend<Task<F::Output>>,
571    ) {
572        let mut active = self.inner().state().active.lock().unwrap();
573
574        // Convert all of the futures to tasks.
575        let tasks = futures.into_iter().map(|future| {
576            // SAFETY: This executor is not thread safe, so the future and its result
577            //         cannot be sent to another thread.
578            unsafe { self.inner().spawn_inner(future, &mut active) }
579
580            // As only one thread can spawn or poll tasks at a time, there is no need
581            // to release lock contention here.
582        });
583
584        // Push them to the user's collection.
585        handles.extend(tasks);
586    }
587
588    /// Attempts to run a task if at least one is scheduled.
589    ///
590    /// Running a scheduled task means simply polling its future once.
591    ///
592    /// # Examples
593    ///
594    /// ```
595    /// use async_executor::LocalExecutor;
596    ///
597    /// let ex = LocalExecutor::new();
598    /// assert!(!ex.try_tick()); // no tasks to run
599    ///
600    /// let task = ex.spawn(async {
601    ///     println!("Hello world");
602    /// });
603    /// assert!(ex.try_tick()); // a task was found
604    /// ```
605    pub fn try_tick(&self) -> bool {
606        self.inner().try_tick()
607    }
608
609    /// Runs a single task.
610    ///
611    /// Running a task means simply polling its future once.
612    ///
613    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// use async_executor::LocalExecutor;
619    /// use futures_lite::future;
620    ///
621    /// let ex = LocalExecutor::new();
622    ///
623    /// let task = ex.spawn(async {
624    ///     println!("Hello world");
625    /// });
626    /// future::block_on(ex.tick()); // runs the task
627    /// ```
628    pub async fn tick(&self) {
629        self.inner().tick().await
630    }
631
632    /// Runs the executor until the given future completes.
633    ///
634    /// # Examples
635    ///
636    /// ```
637    /// use async_executor::LocalExecutor;
638    /// use futures_lite::future;
639    ///
640    /// let local_ex = LocalExecutor::new();
641    ///
642    /// let task = local_ex.spawn(async { 1 + 2 });
643    /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
644    ///
645    /// assert_eq!(res, 6);
646    /// ```
647    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
648        self.inner().run(future).await
649    }
650
651    /// Returns a reference to the inner executor.
652    fn inner(&self) -> &Executor<'a> {
653        &self.inner
654    }
655}
656
657impl<'a> Default for LocalExecutor<'a> {
658    fn default() -> LocalExecutor<'a> {
659        LocalExecutor::new()
660    }
661}
662
663/// The state of a executor.
664struct State {
665    /// The global queue.
666    queue: ConcurrentQueue<Runnable>,
667
668    /// Local queues created by runners.
669    local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
670
671    /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
672    notified: AtomicBool,
673
674    /// A list of sleeping tickers.
675    sleepers: Mutex<Sleepers>,
676
677    /// Currently active tasks.
678    active: Mutex<Slab<Waker>>,
679}
680
681impl State {
682    /// Creates state for a new executor.
683    const fn new() -> State {
684        State {
685            queue: ConcurrentQueue::unbounded(),
686            local_queues: RwLock::new(Vec::new()),
687            notified: AtomicBool::new(true),
688            sleepers: Mutex::new(Sleepers {
689                count: 0,
690                wakers: Vec::new(),
691                free_ids: Vec::new(),
692            }),
693            active: Mutex::new(Slab::new()),
694        }
695    }
696
697    /// Notifies a sleeping ticker.
698    #[inline]
699    fn notify(&self) {
700        if self
701            .notified
702            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
703            .is_ok()
704        {
705            let waker = self.sleepers.lock().unwrap().notify();
706            if let Some(w) = waker {
707                w.wake();
708            }
709        }
710    }
711
712    pub(crate) fn try_tick(&self) -> bool {
713        match self.queue.pop() {
714            Err(_) => false,
715            Ok(runnable) => {
716                // Notify another ticker now to pick up where this ticker left off, just in case
717                // running the task takes a long time.
718                self.notify();
719
720                // Run the task.
721                runnable.run();
722                true
723            }
724        }
725    }
726
727    pub(crate) async fn tick(&self) {
728        let runnable = Ticker::new(self).runnable().await;
729        runnable.run();
730    }
731
732    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
733        let mut runner = Runner::new(self);
734        let mut rng = fastrand::Rng::new();
735
736        // A future that runs tasks forever.
737        let run_forever = async {
738            loop {
739                for _ in 0..200 {
740                    let runnable = runner.runnable(&mut rng).await;
741                    runnable.run();
742                }
743                future::yield_now().await;
744            }
745        };
746
747        // Run `future` and `run_forever` concurrently until `future` completes.
748        future.or(run_forever).await
749    }
750}
751
752/// A list of sleeping tickers.
753struct Sleepers {
754    /// Number of sleeping tickers (both notified and unnotified).
755    count: usize,
756
757    /// IDs and wakers of sleeping unnotified tickers.
758    ///
759    /// A sleeping ticker is notified when its waker is missing from this list.
760    wakers: Vec<(usize, Waker)>,
761
762    /// Reclaimed IDs.
763    free_ids: Vec<usize>,
764}
765
766impl Sleepers {
767    /// Inserts a new sleeping ticker.
768    fn insert(&mut self, waker: &Waker) -> usize {
769        let id = match self.free_ids.pop() {
770            Some(id) => id,
771            None => self.count + 1,
772        };
773        self.count += 1;
774        self.wakers.push((id, waker.clone()));
775        id
776    }
777
778    /// Re-inserts a sleeping ticker's waker if it was notified.
779    ///
780    /// Returns `true` if the ticker was notified.
781    fn update(&mut self, id: usize, waker: &Waker) -> bool {
782        for item in &mut self.wakers {
783            if item.0 == id {
784                item.1.clone_from(waker);
785                return false;
786            }
787        }
788
789        self.wakers.push((id, waker.clone()));
790        true
791    }
792
793    /// Removes a previously inserted sleeping ticker.
794    ///
795    /// Returns `true` if the ticker was notified.
796    fn remove(&mut self, id: usize) -> bool {
797        self.count -= 1;
798        self.free_ids.push(id);
799
800        for i in (0..self.wakers.len()).rev() {
801            if self.wakers[i].0 == id {
802                self.wakers.remove(i);
803                return false;
804            }
805        }
806        true
807    }
808
809    /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
810    fn is_notified(&self) -> bool {
811        self.count == 0 || self.count > self.wakers.len()
812    }
813
814    /// Returns notification waker for a sleeping ticker.
815    ///
816    /// If a ticker was notified already or there are no tickers, `None` will be returned.
817    fn notify(&mut self) -> Option<Waker> {
818        if self.wakers.len() == self.count {
819            self.wakers.pop().map(|item| item.1)
820        } else {
821            None
822        }
823    }
824}
825
826/// Runs task one by one.
827struct Ticker<'a> {
828    /// The executor state.
829    state: &'a State,
830
831    /// Set to a non-zero sleeper ID when in sleeping state.
832    ///
833    /// States a ticker can be in:
834    /// 1) Woken.
835    ///    2a) Sleeping and unnotified.
836    ///    2b) Sleeping and notified.
837    sleeping: usize,
838}
839
840impl Ticker<'_> {
841    /// Creates a ticker.
842    fn new(state: &State) -> Ticker<'_> {
843        Ticker { state, sleeping: 0 }
844    }
845
846    /// Moves the ticker into sleeping and unnotified state.
847    ///
848    /// Returns `false` if the ticker was already sleeping and unnotified.
849    fn sleep(&mut self, waker: &Waker) -> bool {
850        let mut sleepers = self.state.sleepers.lock().unwrap();
851
852        match self.sleeping {
853            // Move to sleeping state.
854            0 => {
855                self.sleeping = sleepers.insert(waker);
856            }
857
858            // Already sleeping, check if notified.
859            id => {
860                if !sleepers.update(id, waker) {
861                    return false;
862                }
863            }
864        }
865
866        self.state
867            .notified
868            .store(sleepers.is_notified(), Ordering::Release);
869
870        true
871    }
872
873    /// Moves the ticker into woken state.
874    fn wake(&mut self) {
875        if self.sleeping != 0 {
876            let mut sleepers = self.state.sleepers.lock().unwrap();
877            sleepers.remove(self.sleeping);
878
879            self.state
880                .notified
881                .store(sleepers.is_notified(), Ordering::Release);
882        }
883        self.sleeping = 0;
884    }
885
886    /// Waits for the next runnable task to run.
887    async fn runnable(&mut self) -> Runnable {
888        self.runnable_with(|| self.state.queue.pop().ok()).await
889    }
890
891    /// Waits for the next runnable task to run, given a function that searches for a task.
892    async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
893        future::poll_fn(|cx| {
894            loop {
895                match search() {
896                    None => {
897                        // Move to sleeping and unnotified state.
898                        if !self.sleep(cx.waker()) {
899                            // If already sleeping and unnotified, return.
900                            return Poll::Pending;
901                        }
902                    }
903                    Some(r) => {
904                        // Wake up.
905                        self.wake();
906
907                        // Notify another ticker now to pick up where this ticker left off, just in
908                        // case running the task takes a long time.
909                        self.state.notify();
910
911                        return Poll::Ready(r);
912                    }
913                }
914            }
915        })
916        .await
917    }
918}
919
920impl Drop for Ticker<'_> {
921    fn drop(&mut self) {
922        // If this ticker is in sleeping state, it must be removed from the sleepers list.
923        if self.sleeping != 0 {
924            let mut sleepers = self.state.sleepers.lock().unwrap();
925            let notified = sleepers.remove(self.sleeping);
926
927            self.state
928                .notified
929                .store(sleepers.is_notified(), Ordering::Release);
930
931            // If this ticker was notified, then notify another ticker.
932            if notified {
933                drop(sleepers);
934                self.state.notify();
935            }
936        }
937    }
938}
939
940/// A worker in a work-stealing executor.
941///
942/// This is just a ticker that also has an associated local queue for improved cache locality.
943struct Runner<'a> {
944    /// The executor state.
945    state: &'a State,
946
947    /// Inner ticker.
948    ticker: Ticker<'a>,
949
950    /// The local queue.
951    local: Arc<ConcurrentQueue<Runnable>>,
952
953    /// Bumped every time a runnable task is found.
954    ticks: usize,
955}
956
957impl Runner<'_> {
958    /// Creates a runner and registers it in the executor state.
959    fn new(state: &State) -> Runner<'_> {
960        let runner = Runner {
961            state,
962            ticker: Ticker::new(state),
963            local: Arc::new(ConcurrentQueue::bounded(512)),
964            ticks: 0,
965        };
966        state
967            .local_queues
968            .write()
969            .unwrap()
970            .push(runner.local.clone());
971        runner
972    }
973
974    /// Waits for the next runnable task to run.
975    async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
976        let runnable = self
977            .ticker
978            .runnable_with(|| {
979                // Try the local queue.
980                if let Ok(r) = self.local.pop() {
981                    return Some(r);
982                }
983
984                // Try stealing from the global queue.
985                if let Ok(r) = self.state.queue.pop() {
986                    steal(&self.state.queue, &self.local);
987                    return Some(r);
988                }
989
990                // Try stealing from other runners.
991                let local_queues = self.state.local_queues.read().unwrap();
992
993                // Pick a random starting point in the iterator list and rotate the list.
994                let n = local_queues.len();
995                let start = rng.usize(..n);
996                let iter = local_queues
997                    .iter()
998                    .chain(local_queues.iter())
999                    .skip(start)
1000                    .take(n);
1001
1002                // Remove this runner's local queue.
1003                let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1004
1005                // Try stealing from each local queue in the list.
1006                for local in iter {
1007                    steal(local, &self.local);
1008                    if let Ok(r) = self.local.pop() {
1009                        return Some(r);
1010                    }
1011                }
1012
1013                None
1014            })
1015            .await;
1016
1017        // Bump the tick counter.
1018        self.ticks = self.ticks.wrapping_add(1);
1019
1020        if self.ticks % 64 == 0 {
1021            // Steal tasks from the global queue to ensure fair task scheduling.
1022            steal(&self.state.queue, &self.local);
1023        }
1024
1025        runnable
1026    }
1027}
1028
1029impl Drop for Runner<'_> {
1030    fn drop(&mut self) {
1031        // Remove the local queue.
1032        self.state
1033            .local_queues
1034            .write()
1035            .unwrap()
1036            .retain(|local| !Arc::ptr_eq(local, &self.local));
1037
1038        // Re-schedule remaining tasks in the local queue.
1039        while let Ok(r) = self.local.pop() {
1040            r.schedule();
1041        }
1042    }
1043}
1044
1045/// Steals some items from one queue into another.
1046fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1047    // Half of `src`'s length rounded up.
1048    let mut count = (src.len() + 1) / 2;
1049
1050    if count > 0 {
1051        // Don't steal more than fits into the queue.
1052        if let Some(cap) = dest.capacity() {
1053            count = count.min(cap - dest.len());
1054        }
1055
1056        // Steal tasks.
1057        for _ in 0..count {
1058            if let Ok(t) = src.pop() {
1059                assert!(dest.push(t).is_ok());
1060            } else {
1061                break;
1062            }
1063        }
1064    }
1065}
1066
1067/// Debug implementation for `Executor` and `LocalExecutor`.
1068fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1069    // Get a reference to the state.
1070    let ptr = executor.state.load(Ordering::Acquire);
1071    if ptr.is_null() {
1072        // The executor has not been initialized.
1073        struct Uninitialized;
1074
1075        impl fmt::Debug for Uninitialized {
1076            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077                f.write_str("<uninitialized>")
1078            }
1079        }
1080
1081        return f.debug_tuple(name).field(&Uninitialized).finish();
1082    }
1083
1084    // SAFETY: If the state pointer is not null, it must have been
1085    // allocated properly by Arc::new and converted via Arc::into_raw
1086    // in state_ptr.
1087    let state = unsafe { &*ptr };
1088
1089    debug_state(state, name, f)
1090}
1091
1092/// Debug implementation for `Executor` and `LocalExecutor`.
1093fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094    /// Debug wrapper for the number of active tasks.
1095    struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1096
1097    impl fmt::Debug for ActiveTasks<'_> {
1098        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1099            match self.0.try_lock() {
1100                Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1101                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1102                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1103            }
1104        }
1105    }
1106
1107    /// Debug wrapper for the local runners.
1108    struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1109
1110    impl fmt::Debug for LocalRunners<'_> {
1111        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1112            match self.0.try_read() {
1113                Ok(lock) => f
1114                    .debug_list()
1115                    .entries(lock.iter().map(|queue| queue.len()))
1116                    .finish(),
1117                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1118                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1119            }
1120        }
1121    }
1122
1123    /// Debug wrapper for the sleepers.
1124    struct SleepCount<'a>(&'a Mutex<Sleepers>);
1125
1126    impl fmt::Debug for SleepCount<'_> {
1127        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1128            match self.0.try_lock() {
1129                Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1130                Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1131                Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1132            }
1133        }
1134    }
1135
1136    f.debug_struct(name)
1137        .field("active", &ActiveTasks(&state.active))
1138        .field("global_tasks", &state.queue.len())
1139        .field("local_runners", &LocalRunners(&state.local_queues))
1140        .field("sleepers", &SleepCount(&state.sleepers))
1141        .finish()
1142}
1143
1144/// Runs a closure when dropped.
1145struct CallOnDrop<F: FnMut()>(F);
1146
1147impl<F: FnMut()> Drop for CallOnDrop<F> {
1148    fn drop(&mut self) {
1149        (self.0)();
1150    }
1151}
1152
1153fn _ensure_send_and_sync() {
1154    use futures_lite::future::pending;
1155
1156    fn is_send<T: Send>(_: T) {}
1157    fn is_sync<T: Sync>(_: T) {}
1158    fn is_static<T: 'static>(_: T) {}
1159
1160    is_send::<Executor<'_>>(Executor::new());
1161    is_sync::<Executor<'_>>(Executor::new());
1162
1163    let ex = Executor::new();
1164    is_send(ex.run(pending::<()>()));
1165    is_sync(ex.run(pending::<()>()));
1166    is_send(ex.tick());
1167    is_sync(ex.tick());
1168    is_send(ex.schedule());
1169    is_sync(ex.schedule());
1170    is_static(ex.schedule());
1171
1172    /// ```compile_fail
1173    /// use async_executor::LocalExecutor;
1174    /// use futures_lite::future::pending;
1175    ///
1176    /// fn is_send<T: Send>(_: T) {}
1177    /// fn is_sync<T: Sync>(_: T) {}
1178    ///
1179    /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1180    /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1181    ///
1182    /// let ex = LocalExecutor::new();
1183    /// is_send(ex.run(pending::<()>()));
1184    /// is_sync(ex.run(pending::<()>()));
1185    /// is_send(ex.tick());
1186    /// is_sync(ex.tick());
1187    /// ```
1188    fn _negative_test() {}
1189}