bevy_tasks/
single_threaded_task_pool.rs

1use alloc::{rc::Rc, sync::Arc};
2use core::{cell::RefCell, future::Future, marker::PhantomData, mem};
3
4use crate::Task;
5
6thread_local! {
7    static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
8}
9
10/// Used to create a [`TaskPool`].
11#[derive(Debug, Default, Clone)]
12pub struct TaskPoolBuilder {}
13
14/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
15/// task pool. In the case of the multithreaded task pool this struct is used to spawn
16/// tasks on a specific thread. But the wasm task pool just calls
17/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
18/// and so the [`ThreadExecutor`] does nothing.
19#[derive(Default)]
20pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
21impl<'a> ThreadExecutor<'a> {
22    /// Creates a new `ThreadExecutor`
23    pub fn new() -> Self {
24        Self::default()
25    }
26}
27
28impl TaskPoolBuilder {
29    /// Creates a new `TaskPoolBuilder` instance
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// No op on the single threaded task pool
35    pub fn num_threads(self, _num_threads: usize) -> Self {
36        self
37    }
38
39    /// No op on the single threaded task pool
40    pub fn stack_size(self, _stack_size: usize) -> Self {
41        self
42    }
43
44    /// No op on the single threaded task pool
45    pub fn thread_name(self, _thread_name: String) -> Self {
46        self
47    }
48
49    /// Creates a new [`TaskPool`]
50    pub fn build(self) -> TaskPool {
51        TaskPool::new_internal()
52    }
53}
54
55/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
56/// the pool on threads owned by the pool. In this case - main thread only.
57#[derive(Debug, Default, Clone)]
58pub struct TaskPool {}
59
60impl TaskPool {
61    /// Just create a new `ThreadExecutor` for wasm
62    pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
63        Arc::new(ThreadExecutor::new())
64    }
65
66    /// Create a `TaskPool` with the default configuration.
67    pub fn new() -> Self {
68        TaskPoolBuilder::new().build()
69    }
70
71    fn new_internal() -> Self {
72        Self {}
73    }
74
75    /// Return the number of threads owned by the task pool
76    pub fn thread_num(&self) -> usize {
77        1
78    }
79
80    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
81    /// passing a scope object into it. The scope object provided to the callback can be used
82    /// to spawn tasks. This function will await the completion of all tasks before returning.
83    ///
84    /// This is similar to `rayon::scope` and `crossbeam::scope`
85    pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
86    where
87        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
88        T: Send + 'static,
89    {
90        self.scope_with_executor(false, None, f)
91    }
92
93    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
94    /// passing a scope object into it. The scope object provided to the callback can be used
95    /// to spawn tasks. This function will await the completion of all tasks before returning.
96    ///
97    /// This is similar to `rayon::scope` and `crossbeam::scope`
98    #[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
99    pub fn scope_with_executor<'env, F, T>(
100        &self,
101        _tick_task_pool_executor: bool,
102        _thread_executor: Option<&ThreadExecutor>,
103        f: F,
104    ) -> Vec<T>
105    where
106        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
107        T: Send + 'static,
108    {
109        // SAFETY: This safety comment applies to all references transmuted to 'env.
110        // Any futures spawned with these references need to return before this function completes.
111        // This is guaranteed because we drive all the futures spawned onto the Scope
112        // to completion in this function. However, rust has no way of knowing this so we
113        // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
114        // Any usages of the references passed into `Scope` must be accessed through
115        // the transmuted reference for the rest of this function.
116
117        let executor = &async_executor::LocalExecutor::new();
118        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
119        let executor: &'env async_executor::LocalExecutor<'env> =
120            unsafe { mem::transmute(executor) };
121
122        let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
123        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
124        let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
125            unsafe { mem::transmute(&results) };
126
127        let mut scope = Scope {
128            executor,
129            results,
130            scope: PhantomData,
131            env: PhantomData,
132        };
133
134        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
135        let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
136
137        f(scope_ref);
138
139        // Loop until all tasks are done
140        while executor.try_tick() {}
141
142        let results = scope.results.borrow();
143        results
144            .iter()
145            .map(|result| result.borrow_mut().take().unwrap())
146            .collect()
147    }
148
149    /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
150    /// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
151    /// It can also be "detached", allowing it to continue running without having to be polled by the
152    /// end-user.
153    ///
154    /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
155    pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
156    where
157        T: 'static,
158    {
159        #[cfg(target_arch = "wasm32")]
160        return Task::wrap_future(future);
161
162        #[cfg(not(target_arch = "wasm32"))]
163        {
164            LOCAL_EXECUTOR.with(|executor| {
165                let task = executor.spawn(future);
166                // Loop until all tasks are done
167                while executor.try_tick() {}
168
169                Task::new(task)
170            })
171        }
172    }
173
174    /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
175    pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
176    where
177        T: 'static,
178    {
179        self.spawn(future)
180    }
181
182    /// Runs a function with the local executor. Typically used to tick
183    /// the local executor on the main thread as it needs to share time with
184    /// other things.
185    ///
186    /// ```
187    /// use bevy_tasks::TaskPool;
188    ///
189    /// TaskPool::new().with_local_executor(|local_executor| {
190    ///     local_executor.try_tick();
191    /// });
192    /// ```
193    pub fn with_local_executor<F, R>(&self, f: F) -> R
194    where
195        F: FnOnce(&async_executor::LocalExecutor) -> R,
196    {
197        LOCAL_EXECUTOR.with(f)
198    }
199}
200
201/// A `TaskPool` scope for running one or more non-`'static` futures.
202///
203/// For more information, see [`TaskPool::scope`].
204#[derive(Debug)]
205pub struct Scope<'scope, 'env: 'scope, T> {
206    executor: &'scope async_executor::LocalExecutor<'scope>,
207    // Vector to gather results of all futures spawned during scope run
208    results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,
209
210    // make `Scope` invariant over 'scope and 'env
211    scope: PhantomData<&'scope mut &'scope ()>,
212    env: PhantomData<&'env mut &'env ()>,
213}
214
215impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
216    /// Spawns a scoped future onto the executor. The scope *must* outlive
217    /// the provided future. The results of the future will be returned as a part of
218    /// [`TaskPool::scope`]'s return value.
219    ///
220    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
221    ///
222    /// For more information, see [`TaskPool::scope`].
223    pub fn spawn<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
224        self.spawn_on_scope(f);
225    }
226
227    /// Spawns a scoped future onto the executor. The scope *must* outlive
228    /// the provided future. The results of the future will be returned as a part of
229    /// [`TaskPool::scope`]'s return value.
230    ///
231    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
232    ///
233    /// For more information, see [`TaskPool::scope`].
234    pub fn spawn_on_external<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
235        self.spawn_on_scope(f);
236    }
237
238    /// Spawns a scoped future that runs on the thread the scope called from. The
239    /// scope *must* outlive the provided future. The results of the future will be
240    /// returned as a part of [`TaskPool::scope`]'s return value.
241    ///
242    /// For more information, see [`TaskPool::scope`].
243    pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
244        let result = Rc::new(RefCell::new(None));
245        self.results.borrow_mut().push(result.clone());
246        let f = async move {
247            let temp_result = f.await;
248            result.borrow_mut().replace(temp_result);
249        };
250        self.executor.spawn(f).detach();
251    }
252}