async_executor/lib.rs
1//! Async executors.
2//!
3//! This crate provides two reference executors that trade performance for
4//! functionality. They should be considered reference executors that are "good
5//! enough" for most use cases. For more specialized use cases, consider writing
6//! your own executor on top of [`async-task`].
7//!
8//! [`async-task`]: https://crates.io/crates/async-task
9//!
10//! # Examples
11//!
12//! ```
13//! use async_executor::Executor;
14//! use futures_lite::future;
15//!
16//! // Create a new executor.
17//! let ex = Executor::new();
18//!
19//! // Spawn a task.
20//! let task = ex.spawn(async {
21//! println!("Hello world");
22//! });
23//!
24//! // Run the executor until the task completes.
25//! future::block_on(ex.run(task));
26//! ```
27
28#![warn(
29 missing_docs,
30 missing_debug_implementations,
31 rust_2018_idioms,
32 clippy::undocumented_unsafe_blocks
33)]
34#![doc(
35 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
36)]
37#![doc(
38 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
39)]
40#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
41
42use std::fmt;
43use std::marker::PhantomData;
44use std::panic::{RefUnwindSafe, UnwindSafe};
45use std::rc::Rc;
46use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
47use std::sync::{Arc, Mutex, RwLock, TryLockError};
48use std::task::{Poll, Waker};
49
50use async_task::{Builder, Runnable};
51use concurrent_queue::ConcurrentQueue;
52use futures_lite::{future, prelude::*};
53use slab::Slab;
54
55#[cfg(feature = "static")]
56mod static_executors;
57
58#[doc(no_inline)]
59pub use async_task::{FallibleTask, Task};
60#[cfg(feature = "static")]
61#[cfg_attr(docsrs, doc(cfg(any(feature = "static"))))]
62pub use static_executors::*;
63
64/// An async executor.
65///
66/// # Examples
67///
68/// A multi-threaded executor:
69///
70/// ```
71/// use async_channel::unbounded;
72/// use async_executor::Executor;
73/// use easy_parallel::Parallel;
74/// use futures_lite::future;
75///
76/// let ex = Executor::new();
77/// let (signal, shutdown) = unbounded::<()>();
78///
79/// Parallel::new()
80/// // Run four executor threads.
81/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
82/// // Run the main future on the current thread.
83/// .finish(|| future::block_on(async {
84/// println!("Hello world!");
85/// drop(signal);
86/// }));
87/// ```
88pub struct Executor<'a> {
89 /// The executor state.
90 state: AtomicPtr<State>,
91
92 /// Makes the `'a` lifetime invariant.
93 _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
94}
95
96// SAFETY: Executor stores no thread local state that can be accessed via other thread.
97unsafe impl Send for Executor<'_> {}
98// SAFETY: Executor internally synchronizes all of it's operations internally.
99unsafe impl Sync for Executor<'_> {}
100
101impl UnwindSafe for Executor<'_> {}
102impl RefUnwindSafe for Executor<'_> {}
103
104impl fmt::Debug for Executor<'_> {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 debug_executor(self, "Executor", f)
107 }
108}
109
110impl<'a> Executor<'a> {
111 /// Creates a new executor.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use async_executor::Executor;
117 ///
118 /// let ex = Executor::new();
119 /// ```
120 pub const fn new() -> Executor<'a> {
121 Executor {
122 state: AtomicPtr::new(std::ptr::null_mut()),
123 _marker: PhantomData,
124 }
125 }
126
127 /// Returns `true` if there are no unfinished tasks.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use async_executor::Executor;
133 ///
134 /// let ex = Executor::new();
135 /// assert!(ex.is_empty());
136 ///
137 /// let task = ex.spawn(async {
138 /// println!("Hello world");
139 /// });
140 /// assert!(!ex.is_empty());
141 ///
142 /// assert!(ex.try_tick());
143 /// assert!(ex.is_empty());
144 /// ```
145 pub fn is_empty(&self) -> bool {
146 self.state().active.lock().unwrap().is_empty()
147 }
148
149 /// Spawns a task onto the executor.
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use async_executor::Executor;
155 ///
156 /// let ex = Executor::new();
157 ///
158 /// let task = ex.spawn(async {
159 /// println!("Hello world");
160 /// });
161 /// ```
162 pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
163 let mut active = self.state().active.lock().unwrap();
164
165 // SAFETY: `T` and the future are `Send`.
166 unsafe { self.spawn_inner(future, &mut active) }
167 }
168
169 /// Spawns many tasks onto the executor.
170 ///
171 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
172 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
173 /// contention.
174 ///
175 /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
176 /// prevent runner thread starvation. It is assumed that the iterator provided does not
177 /// block; blocking iterators can lock up the internal mutex and therefore the entire
178 /// executor.
179 ///
180 /// ## Example
181 ///
182 /// ```
183 /// use async_executor::Executor;
184 /// use futures_lite::{stream, prelude::*};
185 /// use std::future::ready;
186 ///
187 /// # futures_lite::future::block_on(async {
188 /// let mut ex = Executor::new();
189 ///
190 /// let futures = [
191 /// ready(1),
192 /// ready(2),
193 /// ready(3)
194 /// ];
195 ///
196 /// // Spawn all of the futures onto the executor at once.
197 /// let mut tasks = vec![];
198 /// ex.spawn_many(futures, &mut tasks);
199 ///
200 /// // Await all of them.
201 /// let results = ex.run(async move {
202 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
203 /// }).await;
204 /// assert_eq!(results, [1, 2, 3]);
205 /// # });
206 /// ```
207 ///
208 /// [`spawn`]: Executor::spawn
209 pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
210 &self,
211 futures: impl IntoIterator<Item = F>,
212 handles: &mut impl Extend<Task<F::Output>>,
213 ) {
214 let mut active = Some(self.state().active.lock().unwrap());
215
216 // Convert the futures into tasks.
217 let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
218 // SAFETY: `T` and the future are `Send`.
219 let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
220
221 // Yield the lock every once in a while to ease contention.
222 if i.wrapping_sub(1) % 500 == 0 {
223 drop(active.take());
224 active = Some(self.state().active.lock().unwrap());
225 }
226
227 task
228 });
229
230 // Push the tasks to the user's collection.
231 handles.extend(tasks);
232 }
233
234 /// Spawn a future while holding the inner lock.
235 ///
236 /// # Safety
237 ///
238 /// If this is an `Executor`, `F` and `T` must be `Send`.
239 unsafe fn spawn_inner<T: 'a>(
240 &self,
241 future: impl Future<Output = T> + 'a,
242 active: &mut Slab<Waker>,
243 ) -> Task<T> {
244 // Remove the task from the set of active tasks when the future finishes.
245 let entry = active.vacant_entry();
246 let index = entry.key();
247 let state = self.state_as_arc();
248 let future = async move {
249 let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
250 future.await
251 };
252
253 // Create the task and register it in the set of active tasks.
254 //
255 // SAFETY:
256 //
257 // If `future` is not `Send`, this must be a `LocalExecutor` as per this
258 // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
259 // `try_tick`, `tick` and `run` can only be called from the origin
260 // thread of the `LocalExecutor`. Similarly, `spawn` can only be called
261 // from the origin thread, ensuring that `future` and the executor share
262 // the same origin thread. The `Runnable` can be scheduled from other
263 // threads, but because of the above `Runnable` can only be called or
264 // dropped on the origin thread.
265 //
266 // `future` is not `'static`, but we make sure that the `Runnable` does
267 // not outlive `'a`. When the executor is dropped, the `active` field is
268 // drained and all of the `Waker`s are woken. Then, the queue inside of
269 // the `Executor` is drained of all of its runnables. This ensures that
270 // runnables are dropped and this precondition is satisfied.
271 //
272 // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273 // Therefore we do not need to worry about what is done with the
274 // `Waker`.
275 let (runnable, task) = Builder::new()
276 .propagate_panic(true)
277 .spawn_unchecked(|()| future, self.schedule());
278 entry.insert(runnable.waker());
279
280 runnable.schedule();
281 task
282 }
283
284 /// Attempts to run a task if at least one is scheduled.
285 ///
286 /// Running a scheduled task means simply polling its future once.
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// use async_executor::Executor;
292 ///
293 /// let ex = Executor::new();
294 /// assert!(!ex.try_tick()); // no tasks to run
295 ///
296 /// let task = ex.spawn(async {
297 /// println!("Hello world");
298 /// });
299 /// assert!(ex.try_tick()); // a task was found
300 /// ```
301 pub fn try_tick(&self) -> bool {
302 self.state().try_tick()
303 }
304
305 /// Runs a single task.
306 ///
307 /// Running a task means simply polling its future once.
308 ///
309 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use async_executor::Executor;
315 /// use futures_lite::future;
316 ///
317 /// let ex = Executor::new();
318 ///
319 /// let task = ex.spawn(async {
320 /// println!("Hello world");
321 /// });
322 /// future::block_on(ex.tick()); // runs the task
323 /// ```
324 pub async fn tick(&self) {
325 self.state().tick().await;
326 }
327
328 /// Runs the executor until the given future completes.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use async_executor::Executor;
334 /// use futures_lite::future;
335 ///
336 /// let ex = Executor::new();
337 ///
338 /// let task = ex.spawn(async { 1 + 2 });
339 /// let res = future::block_on(ex.run(async { task.await * 2 }));
340 ///
341 /// assert_eq!(res, 6);
342 /// ```
343 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
344 self.state().run(future).await
345 }
346
347 /// Returns a function that schedules a runnable task when it gets woken up.
348 fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349 let state = self.state_as_arc();
350
351 // TODO: If possible, push into the current local queue and notify the ticker.
352 move |runnable| {
353 state.queue.push(runnable).unwrap();
354 state.notify();
355 }
356 }
357
358 /// Returns a pointer to the inner state.
359 #[inline]
360 fn state_ptr(&self) -> *const State {
361 #[cold]
362 fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
363 let state = Arc::new(State::new());
364 // TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
365 let ptr = Arc::into_raw(state) as *mut State;
366 if let Err(actual) = atomic_ptr.compare_exchange(
367 std::ptr::null_mut(),
368 ptr,
369 Ordering::AcqRel,
370 Ordering::Acquire,
371 ) {
372 // SAFETY: This was just created from Arc::into_raw.
373 drop(unsafe { Arc::from_raw(ptr) });
374 actual
375 } else {
376 ptr
377 }
378 }
379
380 let mut ptr = self.state.load(Ordering::Acquire);
381 if ptr.is_null() {
382 ptr = alloc_state(&self.state);
383 }
384 ptr
385 }
386
387 /// Returns a reference to the inner state.
388 #[inline]
389 fn state(&self) -> &State {
390 // SAFETY: So long as an Executor lives, it's state pointer will always be valid
391 // when accessed through state_ptr.
392 unsafe { &*self.state_ptr() }
393 }
394
395 // Clones the inner state Arc
396 #[inline]
397 fn state_as_arc(&self) -> Arc<State> {
398 // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
399 // Arc when accessed through state_ptr.
400 let arc = unsafe { Arc::from_raw(self.state_ptr()) };
401 let clone = arc.clone();
402 std::mem::forget(arc);
403 clone
404 }
405}
406
407impl Drop for Executor<'_> {
408 fn drop(&mut self) {
409 let ptr = *self.state.get_mut();
410 if ptr.is_null() {
411 return;
412 }
413
414 // SAFETY: As ptr is not null, it was allocated via Arc::new and converted
415 // via Arc::into_raw in state_ptr.
416 let state = unsafe { Arc::from_raw(ptr) };
417
418 let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
419 for w in active.drain() {
420 w.wake();
421 }
422 drop(active);
423
424 while state.queue.pop().is_ok() {}
425 }
426}
427
428impl<'a> Default for Executor<'a> {
429 fn default() -> Executor<'a> {
430 Executor::new()
431 }
432}
433
434/// A thread-local executor.
435///
436/// The executor can only be run on the thread that created it.
437///
438/// # Examples
439///
440/// ```
441/// use async_executor::LocalExecutor;
442/// use futures_lite::future;
443///
444/// let local_ex = LocalExecutor::new();
445///
446/// future::block_on(local_ex.run(async {
447/// println!("Hello world!");
448/// }));
449/// ```
450pub struct LocalExecutor<'a> {
451 /// The inner executor.
452 inner: Executor<'a>,
453
454 /// Makes the type `!Send` and `!Sync`.
455 _marker: PhantomData<Rc<()>>,
456}
457
458impl UnwindSafe for LocalExecutor<'_> {}
459impl RefUnwindSafe for LocalExecutor<'_> {}
460
461impl fmt::Debug for LocalExecutor<'_> {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 debug_executor(&self.inner, "LocalExecutor", f)
464 }
465}
466
467impl<'a> LocalExecutor<'a> {
468 /// Creates a single-threaded executor.
469 ///
470 /// # Examples
471 ///
472 /// ```
473 /// use async_executor::LocalExecutor;
474 ///
475 /// let local_ex = LocalExecutor::new();
476 /// ```
477 pub const fn new() -> LocalExecutor<'a> {
478 LocalExecutor {
479 inner: Executor::new(),
480 _marker: PhantomData,
481 }
482 }
483
484 /// Returns `true` if there are no unfinished tasks.
485 ///
486 /// # Examples
487 ///
488 /// ```
489 /// use async_executor::LocalExecutor;
490 ///
491 /// let local_ex = LocalExecutor::new();
492 /// assert!(local_ex.is_empty());
493 ///
494 /// let task = local_ex.spawn(async {
495 /// println!("Hello world");
496 /// });
497 /// assert!(!local_ex.is_empty());
498 ///
499 /// assert!(local_ex.try_tick());
500 /// assert!(local_ex.is_empty());
501 /// ```
502 pub fn is_empty(&self) -> bool {
503 self.inner().is_empty()
504 }
505
506 /// Spawns a task onto the executor.
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// use async_executor::LocalExecutor;
512 ///
513 /// let local_ex = LocalExecutor::new();
514 ///
515 /// let task = local_ex.spawn(async {
516 /// println!("Hello world");
517 /// });
518 /// ```
519 pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
520 let mut active = self.inner().state().active.lock().unwrap();
521
522 // SAFETY: This executor is not thread safe, so the future and its result
523 // cannot be sent to another thread.
524 unsafe { self.inner().spawn_inner(future, &mut active) }
525 }
526
527 /// Spawns many tasks onto the executor.
528 ///
529 /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
530 /// spawns all of the tasks in one go. With large amounts of tasks this can improve
531 /// contention.
532 ///
533 /// It is assumed that the iterator provided does not block; blocking iterators can lock up
534 /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
535 /// mutex is not released, as there are no other threads that can poll this executor.
536 ///
537 /// ## Example
538 ///
539 /// ```
540 /// use async_executor::LocalExecutor;
541 /// use futures_lite::{stream, prelude::*};
542 /// use std::future::ready;
543 ///
544 /// # futures_lite::future::block_on(async {
545 /// let mut ex = LocalExecutor::new();
546 ///
547 /// let futures = [
548 /// ready(1),
549 /// ready(2),
550 /// ready(3)
551 /// ];
552 ///
553 /// // Spawn all of the futures onto the executor at once.
554 /// let mut tasks = vec![];
555 /// ex.spawn_many(futures, &mut tasks);
556 ///
557 /// // Await all of them.
558 /// let results = ex.run(async move {
559 /// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
560 /// }).await;
561 /// assert_eq!(results, [1, 2, 3]);
562 /// # });
563 /// ```
564 ///
565 /// [`spawn`]: LocalExecutor::spawn
566 /// [`Executor::spawn_many`]: Executor::spawn_many
567 pub fn spawn_many<T: 'a, F: Future<Output = T> + 'a>(
568 &self,
569 futures: impl IntoIterator<Item = F>,
570 handles: &mut impl Extend<Task<F::Output>>,
571 ) {
572 let mut active = self.inner().state().active.lock().unwrap();
573
574 // Convert all of the futures to tasks.
575 let tasks = futures.into_iter().map(|future| {
576 // SAFETY: This executor is not thread safe, so the future and its result
577 // cannot be sent to another thread.
578 unsafe { self.inner().spawn_inner(future, &mut active) }
579
580 // As only one thread can spawn or poll tasks at a time, there is no need
581 // to release lock contention here.
582 });
583
584 // Push them to the user's collection.
585 handles.extend(tasks);
586 }
587
588 /// Attempts to run a task if at least one is scheduled.
589 ///
590 /// Running a scheduled task means simply polling its future once.
591 ///
592 /// # Examples
593 ///
594 /// ```
595 /// use async_executor::LocalExecutor;
596 ///
597 /// let ex = LocalExecutor::new();
598 /// assert!(!ex.try_tick()); // no tasks to run
599 ///
600 /// let task = ex.spawn(async {
601 /// println!("Hello world");
602 /// });
603 /// assert!(ex.try_tick()); // a task was found
604 /// ```
605 pub fn try_tick(&self) -> bool {
606 self.inner().try_tick()
607 }
608
609 /// Runs a single task.
610 ///
611 /// Running a task means simply polling its future once.
612 ///
613 /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// use async_executor::LocalExecutor;
619 /// use futures_lite::future;
620 ///
621 /// let ex = LocalExecutor::new();
622 ///
623 /// let task = ex.spawn(async {
624 /// println!("Hello world");
625 /// });
626 /// future::block_on(ex.tick()); // runs the task
627 /// ```
628 pub async fn tick(&self) {
629 self.inner().tick().await
630 }
631
632 /// Runs the executor until the given future completes.
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// use async_executor::LocalExecutor;
638 /// use futures_lite::future;
639 ///
640 /// let local_ex = LocalExecutor::new();
641 ///
642 /// let task = local_ex.spawn(async { 1 + 2 });
643 /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
644 ///
645 /// assert_eq!(res, 6);
646 /// ```
647 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
648 self.inner().run(future).await
649 }
650
651 /// Returns a reference to the inner executor.
652 fn inner(&self) -> &Executor<'a> {
653 &self.inner
654 }
655}
656
657impl<'a> Default for LocalExecutor<'a> {
658 fn default() -> LocalExecutor<'a> {
659 LocalExecutor::new()
660 }
661}
662
663/// The state of a executor.
664struct State {
665 /// The global queue.
666 queue: ConcurrentQueue<Runnable>,
667
668 /// Local queues created by runners.
669 local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
670
671 /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
672 notified: AtomicBool,
673
674 /// A list of sleeping tickers.
675 sleepers: Mutex<Sleepers>,
676
677 /// Currently active tasks.
678 active: Mutex<Slab<Waker>>,
679}
680
681impl State {
682 /// Creates state for a new executor.
683 const fn new() -> State {
684 State {
685 queue: ConcurrentQueue::unbounded(),
686 local_queues: RwLock::new(Vec::new()),
687 notified: AtomicBool::new(true),
688 sleepers: Mutex::new(Sleepers {
689 count: 0,
690 wakers: Vec::new(),
691 free_ids: Vec::new(),
692 }),
693 active: Mutex::new(Slab::new()),
694 }
695 }
696
697 /// Notifies a sleeping ticker.
698 #[inline]
699 fn notify(&self) {
700 if self
701 .notified
702 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
703 .is_ok()
704 {
705 let waker = self.sleepers.lock().unwrap().notify();
706 if let Some(w) = waker {
707 w.wake();
708 }
709 }
710 }
711
712 pub(crate) fn try_tick(&self) -> bool {
713 match self.queue.pop() {
714 Err(_) => false,
715 Ok(runnable) => {
716 // Notify another ticker now to pick up where this ticker left off, just in case
717 // running the task takes a long time.
718 self.notify();
719
720 // Run the task.
721 runnable.run();
722 true
723 }
724 }
725 }
726
727 pub(crate) async fn tick(&self) {
728 let runnable = Ticker::new(self).runnable().await;
729 runnable.run();
730 }
731
732 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
733 let mut runner = Runner::new(self);
734 let mut rng = fastrand::Rng::new();
735
736 // A future that runs tasks forever.
737 let run_forever = async {
738 loop {
739 for _ in 0..200 {
740 let runnable = runner.runnable(&mut rng).await;
741 runnable.run();
742 }
743 future::yield_now().await;
744 }
745 };
746
747 // Run `future` and `run_forever` concurrently until `future` completes.
748 future.or(run_forever).await
749 }
750}
751
752/// A list of sleeping tickers.
753struct Sleepers {
754 /// Number of sleeping tickers (both notified and unnotified).
755 count: usize,
756
757 /// IDs and wakers of sleeping unnotified tickers.
758 ///
759 /// A sleeping ticker is notified when its waker is missing from this list.
760 wakers: Vec<(usize, Waker)>,
761
762 /// Reclaimed IDs.
763 free_ids: Vec<usize>,
764}
765
766impl Sleepers {
767 /// Inserts a new sleeping ticker.
768 fn insert(&mut self, waker: &Waker) -> usize {
769 let id = match self.free_ids.pop() {
770 Some(id) => id,
771 None => self.count + 1,
772 };
773 self.count += 1;
774 self.wakers.push((id, waker.clone()));
775 id
776 }
777
778 /// Re-inserts a sleeping ticker's waker if it was notified.
779 ///
780 /// Returns `true` if the ticker was notified.
781 fn update(&mut self, id: usize, waker: &Waker) -> bool {
782 for item in &mut self.wakers {
783 if item.0 == id {
784 item.1.clone_from(waker);
785 return false;
786 }
787 }
788
789 self.wakers.push((id, waker.clone()));
790 true
791 }
792
793 /// Removes a previously inserted sleeping ticker.
794 ///
795 /// Returns `true` if the ticker was notified.
796 fn remove(&mut self, id: usize) -> bool {
797 self.count -= 1;
798 self.free_ids.push(id);
799
800 for i in (0..self.wakers.len()).rev() {
801 if self.wakers[i].0 == id {
802 self.wakers.remove(i);
803 return false;
804 }
805 }
806 true
807 }
808
809 /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
810 fn is_notified(&self) -> bool {
811 self.count == 0 || self.count > self.wakers.len()
812 }
813
814 /// Returns notification waker for a sleeping ticker.
815 ///
816 /// If a ticker was notified already or there are no tickers, `None` will be returned.
817 fn notify(&mut self) -> Option<Waker> {
818 if self.wakers.len() == self.count {
819 self.wakers.pop().map(|item| item.1)
820 } else {
821 None
822 }
823 }
824}
825
826/// Runs task one by one.
827struct Ticker<'a> {
828 /// The executor state.
829 state: &'a State,
830
831 /// Set to a non-zero sleeper ID when in sleeping state.
832 ///
833 /// States a ticker can be in:
834 /// 1) Woken.
835 /// 2a) Sleeping and unnotified.
836 /// 2b) Sleeping and notified.
837 sleeping: usize,
838}
839
840impl Ticker<'_> {
841 /// Creates a ticker.
842 fn new(state: &State) -> Ticker<'_> {
843 Ticker { state, sleeping: 0 }
844 }
845
846 /// Moves the ticker into sleeping and unnotified state.
847 ///
848 /// Returns `false` if the ticker was already sleeping and unnotified.
849 fn sleep(&mut self, waker: &Waker) -> bool {
850 let mut sleepers = self.state.sleepers.lock().unwrap();
851
852 match self.sleeping {
853 // Move to sleeping state.
854 0 => {
855 self.sleeping = sleepers.insert(waker);
856 }
857
858 // Already sleeping, check if notified.
859 id => {
860 if !sleepers.update(id, waker) {
861 return false;
862 }
863 }
864 }
865
866 self.state
867 .notified
868 .store(sleepers.is_notified(), Ordering::Release);
869
870 true
871 }
872
873 /// Moves the ticker into woken state.
874 fn wake(&mut self) {
875 if self.sleeping != 0 {
876 let mut sleepers = self.state.sleepers.lock().unwrap();
877 sleepers.remove(self.sleeping);
878
879 self.state
880 .notified
881 .store(sleepers.is_notified(), Ordering::Release);
882 }
883 self.sleeping = 0;
884 }
885
886 /// Waits for the next runnable task to run.
887 async fn runnable(&mut self) -> Runnable {
888 self.runnable_with(|| self.state.queue.pop().ok()).await
889 }
890
891 /// Waits for the next runnable task to run, given a function that searches for a task.
892 async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
893 future::poll_fn(|cx| {
894 loop {
895 match search() {
896 None => {
897 // Move to sleeping and unnotified state.
898 if !self.sleep(cx.waker()) {
899 // If already sleeping and unnotified, return.
900 return Poll::Pending;
901 }
902 }
903 Some(r) => {
904 // Wake up.
905 self.wake();
906
907 // Notify another ticker now to pick up where this ticker left off, just in
908 // case running the task takes a long time.
909 self.state.notify();
910
911 return Poll::Ready(r);
912 }
913 }
914 }
915 })
916 .await
917 }
918}
919
920impl Drop for Ticker<'_> {
921 fn drop(&mut self) {
922 // If this ticker is in sleeping state, it must be removed from the sleepers list.
923 if self.sleeping != 0 {
924 let mut sleepers = self.state.sleepers.lock().unwrap();
925 let notified = sleepers.remove(self.sleeping);
926
927 self.state
928 .notified
929 .store(sleepers.is_notified(), Ordering::Release);
930
931 // If this ticker was notified, then notify another ticker.
932 if notified {
933 drop(sleepers);
934 self.state.notify();
935 }
936 }
937 }
938}
939
940/// A worker in a work-stealing executor.
941///
942/// This is just a ticker that also has an associated local queue for improved cache locality.
943struct Runner<'a> {
944 /// The executor state.
945 state: &'a State,
946
947 /// Inner ticker.
948 ticker: Ticker<'a>,
949
950 /// The local queue.
951 local: Arc<ConcurrentQueue<Runnable>>,
952
953 /// Bumped every time a runnable task is found.
954 ticks: usize,
955}
956
957impl Runner<'_> {
958 /// Creates a runner and registers it in the executor state.
959 fn new(state: &State) -> Runner<'_> {
960 let runner = Runner {
961 state,
962 ticker: Ticker::new(state),
963 local: Arc::new(ConcurrentQueue::bounded(512)),
964 ticks: 0,
965 };
966 state
967 .local_queues
968 .write()
969 .unwrap()
970 .push(runner.local.clone());
971 runner
972 }
973
974 /// Waits for the next runnable task to run.
975 async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
976 let runnable = self
977 .ticker
978 .runnable_with(|| {
979 // Try the local queue.
980 if let Ok(r) = self.local.pop() {
981 return Some(r);
982 }
983
984 // Try stealing from the global queue.
985 if let Ok(r) = self.state.queue.pop() {
986 steal(&self.state.queue, &self.local);
987 return Some(r);
988 }
989
990 // Try stealing from other runners.
991 let local_queues = self.state.local_queues.read().unwrap();
992
993 // Pick a random starting point in the iterator list and rotate the list.
994 let n = local_queues.len();
995 let start = rng.usize(..n);
996 let iter = local_queues
997 .iter()
998 .chain(local_queues.iter())
999 .skip(start)
1000 .take(n);
1001
1002 // Remove this runner's local queue.
1003 let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
1004
1005 // Try stealing from each local queue in the list.
1006 for local in iter {
1007 steal(local, &self.local);
1008 if let Ok(r) = self.local.pop() {
1009 return Some(r);
1010 }
1011 }
1012
1013 None
1014 })
1015 .await;
1016
1017 // Bump the tick counter.
1018 self.ticks = self.ticks.wrapping_add(1);
1019
1020 if self.ticks % 64 == 0 {
1021 // Steal tasks from the global queue to ensure fair task scheduling.
1022 steal(&self.state.queue, &self.local);
1023 }
1024
1025 runnable
1026 }
1027}
1028
1029impl Drop for Runner<'_> {
1030 fn drop(&mut self) {
1031 // Remove the local queue.
1032 self.state
1033 .local_queues
1034 .write()
1035 .unwrap()
1036 .retain(|local| !Arc::ptr_eq(local, &self.local));
1037
1038 // Re-schedule remaining tasks in the local queue.
1039 while let Ok(r) = self.local.pop() {
1040 r.schedule();
1041 }
1042 }
1043}
1044
1045/// Steals some items from one queue into another.
1046fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
1047 // Half of `src`'s length rounded up.
1048 let mut count = (src.len() + 1) / 2;
1049
1050 if count > 0 {
1051 // Don't steal more than fits into the queue.
1052 if let Some(cap) = dest.capacity() {
1053 count = count.min(cap - dest.len());
1054 }
1055
1056 // Steal tasks.
1057 for _ in 0..count {
1058 if let Ok(t) = src.pop() {
1059 assert!(dest.push(t).is_ok());
1060 } else {
1061 break;
1062 }
1063 }
1064 }
1065}
1066
1067/// Debug implementation for `Executor` and `LocalExecutor`.
1068fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1069 // Get a reference to the state.
1070 let ptr = executor.state.load(Ordering::Acquire);
1071 if ptr.is_null() {
1072 // The executor has not been initialized.
1073 struct Uninitialized;
1074
1075 impl fmt::Debug for Uninitialized {
1076 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077 f.write_str("<uninitialized>")
1078 }
1079 }
1080
1081 return f.debug_tuple(name).field(&Uninitialized).finish();
1082 }
1083
1084 // SAFETY: If the state pointer is not null, it must have been
1085 // allocated properly by Arc::new and converted via Arc::into_raw
1086 // in state_ptr.
1087 let state = unsafe { &*ptr };
1088
1089 debug_state(state, name, f)
1090}
1091
1092/// Debug implementation for `Executor` and `LocalExecutor`.
1093fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094 /// Debug wrapper for the number of active tasks.
1095 struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
1096
1097 impl fmt::Debug for ActiveTasks<'_> {
1098 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1099 match self.0.try_lock() {
1100 Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
1101 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1102 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1103 }
1104 }
1105 }
1106
1107 /// Debug wrapper for the local runners.
1108 struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
1109
1110 impl fmt::Debug for LocalRunners<'_> {
1111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1112 match self.0.try_read() {
1113 Ok(lock) => f
1114 .debug_list()
1115 .entries(lock.iter().map(|queue| queue.len()))
1116 .finish(),
1117 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1118 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1119 }
1120 }
1121 }
1122
1123 /// Debug wrapper for the sleepers.
1124 struct SleepCount<'a>(&'a Mutex<Sleepers>);
1125
1126 impl fmt::Debug for SleepCount<'_> {
1127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1128 match self.0.try_lock() {
1129 Ok(lock) => fmt::Debug::fmt(&lock.count, f),
1130 Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
1131 Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
1132 }
1133 }
1134 }
1135
1136 f.debug_struct(name)
1137 .field("active", &ActiveTasks(&state.active))
1138 .field("global_tasks", &state.queue.len())
1139 .field("local_runners", &LocalRunners(&state.local_queues))
1140 .field("sleepers", &SleepCount(&state.sleepers))
1141 .finish()
1142}
1143
1144/// Runs a closure when dropped.
1145struct CallOnDrop<F: FnMut()>(F);
1146
1147impl<F: FnMut()> Drop for CallOnDrop<F> {
1148 fn drop(&mut self) {
1149 (self.0)();
1150 }
1151}
1152
1153fn _ensure_send_and_sync() {
1154 use futures_lite::future::pending;
1155
1156 fn is_send<T: Send>(_: T) {}
1157 fn is_sync<T: Sync>(_: T) {}
1158 fn is_static<T: 'static>(_: T) {}
1159
1160 is_send::<Executor<'_>>(Executor::new());
1161 is_sync::<Executor<'_>>(Executor::new());
1162
1163 let ex = Executor::new();
1164 is_send(ex.run(pending::<()>()));
1165 is_sync(ex.run(pending::<()>()));
1166 is_send(ex.tick());
1167 is_sync(ex.tick());
1168 is_send(ex.schedule());
1169 is_sync(ex.schedule());
1170 is_static(ex.schedule());
1171
1172 /// ```compile_fail
1173 /// use async_executor::LocalExecutor;
1174 /// use futures_lite::future::pending;
1175 ///
1176 /// fn is_send<T: Send>(_: T) {}
1177 /// fn is_sync<T: Sync>(_: T) {}
1178 ///
1179 /// is_send::<LocalExecutor<'_>>(LocalExecutor::new());
1180 /// is_sync::<LocalExecutor<'_>>(LocalExecutor::new());
1181 ///
1182 /// let ex = LocalExecutor::new();
1183 /// is_send(ex.run(pending::<()>()));
1184 /// is_sync(ex.run(pending::<()>()));
1185 /// is_send(ex.tick());
1186 /// is_sync(ex.tick());
1187 /// ```
1188 fn _negative_test() {}
1189}