bevy_tasks/
thread_executor.rs

1use core::marker::PhantomData;
2use std::thread::{self, ThreadId};
3
4use crate::executor::Executor;
5use async_task::Task;
6use futures_lite::Future;
7
8/// An executor that can only be ticked on the thread it was instantiated on. But
9/// can spawn `Send` tasks from other threads.
10///
11/// # Example
12/// ```
13/// # use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
14/// use bevy_tasks::ThreadExecutor;
15///
16/// let thread_executor = ThreadExecutor::new();
17/// let count = Arc::new(AtomicI32::new(0));
18///
19/// // create some owned values that can be moved into another thread
20/// let count_clone = count.clone();
21///
22/// std::thread::scope(|scope| {
23///     scope.spawn(|| {
24///         // we cannot get the ticker from another thread
25///         let not_thread_ticker = thread_executor.ticker();
26///         assert!(not_thread_ticker.is_none());
27///         
28///         // but we can spawn tasks from another thread
29///         thread_executor.spawn(async move {
30///             count_clone.fetch_add(1, Ordering::Relaxed);
31///         }).detach();
32///     });
33/// });
34///
35/// // the tasks do not make progress unless the executor is manually ticked
36/// assert_eq!(count.load(Ordering::Relaxed), 0);
37///
38/// // tick the ticker until task finishes
39/// let thread_ticker = thread_executor.ticker().unwrap();
40/// thread_ticker.try_tick();
41/// assert_eq!(count.load(Ordering::Relaxed), 1);
42/// ```
43#[derive(Debug)]
44pub struct ThreadExecutor<'task> {
45    executor: Executor<'task>,
46    thread_id: ThreadId,
47}
48
49impl<'task> Default for ThreadExecutor<'task> {
50    fn default() -> Self {
51        Self {
52            executor: Executor::new(),
53            thread_id: thread::current().id(),
54        }
55    }
56}
57
58impl<'task> ThreadExecutor<'task> {
59    /// create a new [`ThreadExecutor`]
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Spawn a task on the thread executor
65    pub fn spawn<T: Send + 'task>(
66        &self,
67        future: impl Future<Output = T> + Send + 'task,
68    ) -> Task<T> {
69        self.executor.spawn(future)
70    }
71
72    /// Gets the [`ThreadExecutorTicker`] for this executor.
73    /// Use this to tick the executor.
74    /// It only returns the ticker if it's on the thread the executor was created on
75    /// and returns `None` otherwise.
76    pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
77        if thread::current().id() == self.thread_id {
78            return Some(ThreadExecutorTicker {
79                executor: self,
80                _marker: PhantomData,
81            });
82        }
83        None
84    }
85
86    /// Returns true if `self` and `other`'s executor is same
87    pub fn is_same(&self, other: &Self) -> bool {
88        core::ptr::eq(self, other)
89    }
90}
91
92/// Used to tick the [`ThreadExecutor`]. The executor does not
93/// make progress unless it is manually ticked on the thread it was
94/// created on.
95#[derive(Debug)]
96pub struct ThreadExecutorTicker<'task, 'ticker> {
97    executor: &'ticker ThreadExecutor<'task>,
98    // make type not send or sync
99    _marker: PhantomData<*const ()>,
100}
101impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
102    /// Tick the thread executor.
103    pub async fn tick(&self) {
104        self.executor.executor.tick().await;
105    }
106
107    /// Synchronously try to tick a task on the executor.
108    /// Returns false if does not find a task to tick.
109    pub fn try_tick(&self) -> bool {
110        self.executor.executor.try_tick()
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use alloc::sync::Arc;
118
119    #[test]
120    fn test_ticker() {
121        let executor = Arc::new(ThreadExecutor::new());
122        let ticker = executor.ticker();
123        assert!(ticker.is_some());
124
125        thread::scope(|s| {
126            s.spawn(|| {
127                let ticker = executor.ticker();
128                assert!(ticker.is_none());
129            });
130        });
131    }
132}