bevy_tasks/single_threaded_task_pool.rs
1use alloc::{rc::Rc, sync::Arc};
2use core::{cell::RefCell, future::Future, marker::PhantomData, mem};
3
4use crate::Task;
5
6thread_local! {
7 static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() };
8}
9
10/// Used to create a [`TaskPool`].
11#[derive(Debug, Default, Clone)]
12pub struct TaskPoolBuilder {}
13
14/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
15/// task pool. In the case of the multithreaded task pool this struct is used to spawn
16/// tasks on a specific thread. But the wasm task pool just calls
17/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
18/// and so the [`ThreadExecutor`] does nothing.
19#[derive(Default)]
20pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
21impl<'a> ThreadExecutor<'a> {
22 /// Creates a new `ThreadExecutor`
23 pub fn new() -> Self {
24 Self::default()
25 }
26}
27
28impl TaskPoolBuilder {
29 /// Creates a new `TaskPoolBuilder` instance
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 /// No op on the single threaded task pool
35 pub fn num_threads(self, _num_threads: usize) -> Self {
36 self
37 }
38
39 /// No op on the single threaded task pool
40 pub fn stack_size(self, _stack_size: usize) -> Self {
41 self
42 }
43
44 /// No op on the single threaded task pool
45 pub fn thread_name(self, _thread_name: String) -> Self {
46 self
47 }
48
49 /// Creates a new [`TaskPool`]
50 pub fn build(self) -> TaskPool {
51 TaskPool::new_internal()
52 }
53}
54
55/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
56/// the pool on threads owned by the pool. In this case - main thread only.
57#[derive(Debug, Default, Clone)]
58pub struct TaskPool {}
59
60impl TaskPool {
61 /// Just create a new `ThreadExecutor` for wasm
62 pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
63 Arc::new(ThreadExecutor::new())
64 }
65
66 /// Create a `TaskPool` with the default configuration.
67 pub fn new() -> Self {
68 TaskPoolBuilder::new().build()
69 }
70
71 fn new_internal() -> Self {
72 Self {}
73 }
74
75 /// Return the number of threads owned by the task pool
76 pub fn thread_num(&self) -> usize {
77 1
78 }
79
80 /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
81 /// passing a scope object into it. The scope object provided to the callback can be used
82 /// to spawn tasks. This function will await the completion of all tasks before returning.
83 ///
84 /// This is similar to `rayon::scope` and `crossbeam::scope`
85 pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
86 where
87 F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
88 T: Send + 'static,
89 {
90 self.scope_with_executor(false, None, f)
91 }
92
93 /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
94 /// passing a scope object into it. The scope object provided to the callback can be used
95 /// to spawn tasks. This function will await the completion of all tasks before returning.
96 ///
97 /// This is similar to `rayon::scope` and `crossbeam::scope`
98 #[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
99 pub fn scope_with_executor<'env, F, T>(
100 &self,
101 _tick_task_pool_executor: bool,
102 _thread_executor: Option<&ThreadExecutor>,
103 f: F,
104 ) -> Vec<T>
105 where
106 F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
107 T: Send + 'static,
108 {
109 // SAFETY: This safety comment applies to all references transmuted to 'env.
110 // Any futures spawned with these references need to return before this function completes.
111 // This is guaranteed because we drive all the futures spawned onto the Scope
112 // to completion in this function. However, rust has no way of knowing this so we
113 // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
114 // Any usages of the references passed into `Scope` must be accessed through
115 // the transmuted reference for the rest of this function.
116
117 let executor = &async_executor::LocalExecutor::new();
118 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
119 let executor: &'env async_executor::LocalExecutor<'env> =
120 unsafe { mem::transmute(executor) };
121
122 let results: RefCell<Vec<Rc<RefCell<Option<T>>>>> = RefCell::new(Vec::new());
123 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
124 let results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>> =
125 unsafe { mem::transmute(&results) };
126
127 let mut scope = Scope {
128 executor,
129 results,
130 scope: PhantomData,
131 env: PhantomData,
132 };
133
134 // SAFETY: As above, all futures must complete in this function so we can change the lifetime
135 let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
136
137 f(scope_ref);
138
139 // Loop until all tasks are done
140 while executor.try_tick() {}
141
142 let results = scope.results.borrow();
143 results
144 .iter()
145 .map(|result| result.borrow_mut().take().unwrap())
146 .collect()
147 }
148
149 /// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
150 /// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
151 /// It can also be "detached", allowing it to continue running without having to be polled by the
152 /// end-user.
153 ///
154 /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
155 pub fn spawn<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
156 where
157 T: 'static,
158 {
159 #[cfg(target_arch = "wasm32")]
160 return Task::wrap_future(future);
161
162 #[cfg(not(target_arch = "wasm32"))]
163 {
164 LOCAL_EXECUTOR.with(|executor| {
165 let task = executor.spawn(future);
166 // Loop until all tasks are done
167 while executor.try_tick() {}
168
169 Task::new(task)
170 })
171 }
172 }
173
174 /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
175 pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> Task<T>
176 where
177 T: 'static,
178 {
179 self.spawn(future)
180 }
181
182 /// Runs a function with the local executor. Typically used to tick
183 /// the local executor on the main thread as it needs to share time with
184 /// other things.
185 ///
186 /// ```
187 /// use bevy_tasks::TaskPool;
188 ///
189 /// TaskPool::new().with_local_executor(|local_executor| {
190 /// local_executor.try_tick();
191 /// });
192 /// ```
193 pub fn with_local_executor<F, R>(&self, f: F) -> R
194 where
195 F: FnOnce(&async_executor::LocalExecutor) -> R,
196 {
197 LOCAL_EXECUTOR.with(f)
198 }
199}
200
201/// A `TaskPool` scope for running one or more non-`'static` futures.
202///
203/// For more information, see [`TaskPool::scope`].
204#[derive(Debug)]
205pub struct Scope<'scope, 'env: 'scope, T> {
206 executor: &'scope async_executor::LocalExecutor<'scope>,
207 // Vector to gather results of all futures spawned during scope run
208 results: &'env RefCell<Vec<Rc<RefCell<Option<T>>>>>,
209
210 // make `Scope` invariant over 'scope and 'env
211 scope: PhantomData<&'scope mut &'scope ()>,
212 env: PhantomData<&'env mut &'env ()>,
213}
214
215impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
216 /// Spawns a scoped future onto the executor. The scope *must* outlive
217 /// the provided future. The results of the future will be returned as a part of
218 /// [`TaskPool::scope`]'s return value.
219 ///
220 /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
221 ///
222 /// For more information, see [`TaskPool::scope`].
223 pub fn spawn<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
224 self.spawn_on_scope(f);
225 }
226
227 /// Spawns a scoped future onto the executor. The scope *must* outlive
228 /// the provided future. The results of the future will be returned as a part of
229 /// [`TaskPool::scope`]'s return value.
230 ///
231 /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
232 ///
233 /// For more information, see [`TaskPool::scope`].
234 pub fn spawn_on_external<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
235 self.spawn_on_scope(f);
236 }
237
238 /// Spawns a scoped future that runs on the thread the scope called from. The
239 /// scope *must* outlive the provided future. The results of the future will be
240 /// returned as a part of [`TaskPool::scope`]'s return value.
241 ///
242 /// For more information, see [`TaskPool::scope`].
243 pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
244 let result = Rc::new(RefCell::new(None));
245 self.results.borrow_mut().push(result.clone());
246 let f = async move {
247 let temp_result = f.await;
248 result.borrow_mut().replace(temp_result);
249 };
250 self.executor.spawn(f).detach();
251 }
252}