bevy_tasks/
single_threaded_task_pool.rs

1use alloc::{string::String, vec::Vec};
2use bevy_platform::sync::Arc;
3use core::{cell::RefCell, future::Future, marker::PhantomData, mem};
4
5use crate::Task;
6
7#[cfg(feature = "std")]
8use std::thread_local;
9
10#[cfg(not(feature = "std"))]
11use bevy_platform::sync::{Mutex, PoisonError};
12
13#[cfg(feature = "std")]
14use crate::executor::LocalExecutor;
15
16#[cfg(not(feature = "std"))]
17use crate::executor::Executor as LocalExecutor;
18
19#[cfg(feature = "std")]
20thread_local! {
21    static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
22}
23
24#[cfg(not(feature = "std"))]
25static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
26
27#[cfg(feature = "std")]
28type ScopeResult<T> = alloc::rc::Rc<RefCell<Option<T>>>;
29
30#[cfg(not(feature = "std"))]
31type ScopeResult<T> = Arc<Mutex<Option<T>>>;
32
33/// Used to create a [`TaskPool`].
34#[derive(Debug, Default, Clone)]
35pub struct TaskPoolBuilder {}
36
37/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
38/// task pool. In the case of the multithreaded task pool this struct is used to spawn
39/// tasks on a specific thread. But the wasm task pool just calls
40/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
41/// and so the [`ThreadExecutor`] does nothing.
42#[derive(Default)]
43pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
44impl<'a> ThreadExecutor<'a> {
45    /// Creates a new `ThreadExecutor`
46    pub fn new() -> Self {
47        Self::default()
48    }
49}
50
51impl TaskPoolBuilder {
52    /// Creates a new `TaskPoolBuilder` instance
53    pub fn new() -> Self {
54        Self::default()
55    }
56
57    /// No op on the single threaded task pool
58    pub fn num_threads(self, _num_threads: usize) -> Self {
59        self
60    }
61
62    /// No op on the single threaded task pool
63    pub fn stack_size(self, _stack_size: usize) -> Self {
64        self
65    }
66
67    /// No op on the single threaded task pool
68    pub fn thread_name(self, _thread_name: String) -> Self {
69        self
70    }
71
72    /// No op on the single threaded task pool
73    pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
74        self
75    }
76
77    /// No op on the single threaded task pool
78    pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
79        self
80    }
81
82    /// Creates a new [`TaskPool`]
83    pub fn build(self) -> TaskPool {
84        TaskPool::new_internal()
85    }
86}
87
88/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
89/// the pool on threads owned by the pool. In this case - main thread only.
90#[derive(Debug, Default, Clone)]
91pub struct TaskPool {}
92
93impl TaskPool {
94    /// Just create a new `ThreadExecutor` for wasm
95    pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
96        Arc::new(ThreadExecutor::new())
97    }
98
99    /// Create a `TaskPool` with the default configuration.
100    pub fn new() -> Self {
101        TaskPoolBuilder::new().build()
102    }
103
104    fn new_internal() -> Self {
105        Self {}
106    }
107
108    /// Return the number of threads owned by the task pool
109    pub fn thread_num(&self) -> usize {
110        1
111    }
112
113    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
114    /// passing a scope object into it. The scope object provided to the callback can be used
115    /// to spawn tasks. This function will await the completion of all tasks before returning.
116    ///
117    /// This is similar to `rayon::scope` and `crossbeam::scope`
118    pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
119    where
120        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
121        T: Send + 'static,
122    {
123        self.scope_with_executor(false, None, f)
124    }
125
126    /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
127    /// passing a scope object into it. The scope object provided to the callback can be used
128    /// to spawn tasks. This function will await the completion of all tasks before returning.
129    ///
130    /// This is similar to `rayon::scope` and `crossbeam::scope`
131    #[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
132    pub fn scope_with_executor<'env, F, T>(
133        &self,
134        _tick_task_pool_executor: bool,
135        _thread_executor: Option<&ThreadExecutor>,
136        f: F,
137    ) -> Vec<T>
138    where
139        F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
140        T: Send + 'static,
141    {
142        // SAFETY: This safety comment applies to all references transmuted to 'env.
143        // Any futures spawned with these references need to return before this function completes.
144        // This is guaranteed because we drive all the futures spawned onto the Scope
145        // to completion in this function. However, rust has no way of knowing this so we
146        // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
147        // Any usages of the references passed into `Scope` must be accessed through
148        // the transmuted reference for the rest of this function.
149
150        let executor = &LocalExecutor::new();
151        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
152        let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) };
153
154        let results: RefCell<Vec<ScopeResult<T>>> = RefCell::new(Vec::new());
155        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
156        let results: &'env RefCell<Vec<ScopeResult<T>>> = unsafe { mem::transmute(&results) };
157
158        let mut scope = Scope {
159            executor,
160            results,
161            scope: PhantomData,
162            env: PhantomData,
163        };
164
165        // SAFETY: As above, all futures must complete in this function so we can change the lifetime
166        let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
167
168        f(scope_ref);
169
170        // Loop until all tasks are done
171        while executor.try_tick() {}
172
173        let results = scope.results.borrow();
174        results
175            .iter()
176            .map(|result| {
177                #[cfg(feature = "std")]
178                return result.borrow_mut().take().unwrap();
179
180                #[cfg(not(feature = "std"))]
181                {
182                    let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
183                    lock.take().unwrap()
184                }
185            })
186            .collect()
187    }
188
189    /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
190    /// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
191    /// It can also be "detached", allowing it to continue running without having to be polled by the
192    /// end-user.
193    ///
194    /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
195    pub fn spawn<T>(
196        &self,
197        future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
198    ) -> Task<T>
199    where
200        T: 'static + MaybeSend + MaybeSync,
201    {
202        cfg_if::cfg_if! {
203            if #[cfg(all(target_arch = "wasm32", feature = "web"))] {
204                Task::wrap_future(future)
205            } else if #[cfg(feature = "std")] {
206                LOCAL_EXECUTOR.with(|executor| {
207                    let task = executor.spawn(future);
208                    // Loop until all tasks are done
209                    while executor.try_tick() {}
210
211                    Task::new(task)
212                })
213            } else {
214                {
215                    let task = LOCAL_EXECUTOR.spawn(future);
216                    // Loop until all tasks are done
217                    while LOCAL_EXECUTOR.try_tick() {}
218
219                    Task::new(task)
220                }
221            }
222        }
223    }
224
225    /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
226    pub fn spawn_local<T>(
227        &self,
228        future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
229    ) -> Task<T>
230    where
231        T: 'static + MaybeSend + MaybeSync,
232    {
233        self.spawn(future)
234    }
235
236    /// Runs a function with the local executor. Typically used to tick
237    /// the local executor on the main thread as it needs to share time with
238    /// other things.
239    ///
240    /// ```
241    /// use bevy_tasks::TaskPool;
242    ///
243    /// TaskPool::new().with_local_executor(|local_executor| {
244    ///     local_executor.try_tick();
245    /// });
246    /// ```
247    pub fn with_local_executor<F, R>(&self, f: F) -> R
248    where
249        F: FnOnce(&LocalExecutor) -> R,
250    {
251        #[cfg(feature = "std")]
252        return LOCAL_EXECUTOR.with(f);
253
254        #[cfg(not(feature = "std"))]
255        return f(&LOCAL_EXECUTOR);
256    }
257}
258
259/// A `TaskPool` scope for running one or more non-`'static` futures.
260///
261/// For more information, see [`TaskPool::scope`].
262#[derive(Debug)]
263pub struct Scope<'scope, 'env: 'scope, T> {
264    executor: &'scope LocalExecutor<'scope>,
265    // Vector to gather results of all futures spawned during scope run
266    results: &'env RefCell<Vec<ScopeResult<T>>>,
267
268    // make `Scope` invariant over 'scope and 'env
269    scope: PhantomData<&'scope mut &'scope ()>,
270    env: PhantomData<&'env mut &'env ()>,
271}
272
273impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
274    /// Spawns a scoped future onto the executor. The scope *must* outlive
275    /// the provided future. The results of the future will be returned as a part of
276    /// [`TaskPool::scope`]'s return value.
277    ///
278    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
279    ///
280    /// For more information, see [`TaskPool::scope`].
281    pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
282        self.spawn_on_scope(f);
283    }
284
285    /// Spawns a scoped future onto the executor. The scope *must* outlive
286    /// the provided future. The results of the future will be returned as a part of
287    /// [`TaskPool::scope`]'s return value.
288    ///
289    /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
290    ///
291    /// For more information, see [`TaskPool::scope`].
292    pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
293        self.spawn_on_scope(f);
294    }
295
296    /// Spawns a scoped future that runs on the thread the scope called from. The
297    /// scope *must* outlive the provided future. The results of the future will be
298    /// returned as a part of [`TaskPool::scope`]'s return value.
299    ///
300    /// For more information, see [`TaskPool::scope`].
301    pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
302        let result = ScopeResult::<T>::default();
303        self.results.borrow_mut().push(result.clone());
304        let f = async move {
305            let temp_result = f.await;
306
307            #[cfg(feature = "std")]
308            result.borrow_mut().replace(temp_result);
309
310            #[cfg(not(feature = "std"))]
311            {
312                let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
313                *lock = Some(temp_result);
314            }
315        };
316        self.executor.spawn(f).detach();
317    }
318}
319
320#[cfg(feature = "std")]
321mod send_sync_bounds {
322    pub trait MaybeSend {}
323    impl<T> MaybeSend for T {}
324
325    pub trait MaybeSync {}
326    impl<T> MaybeSync for T {}
327}
328
329#[cfg(not(feature = "std"))]
330mod send_sync_bounds {
331    pub trait MaybeSend: Send {}
332    impl<T: Send> MaybeSend for T {}
333
334    pub trait MaybeSync: Sync {}
335    impl<T: Sync> MaybeSync for T {}
336}
337
338use send_sync_bounds::{MaybeSend, MaybeSync};