bevy_ecs/query/par_iter.rs
1use crate::{
2 batching::BatchingStrategy,
3 change_detection::Tick,
4 entity::{EntityEquivalent, UniqueEntityEquivalentVec},
5 world::unsafe_world_cell::UnsafeWorldCell,
6};
7
8use super::{IterQueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9
10use alloc::vec::Vec;
11
12/// A parallel iterator over query results of a [`Query`](crate::system::Query).
13///
14/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
15/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
16pub struct QueryParIter<'w, 's, D: IterQueryData, F: QueryFilter> {
17 pub(crate) world: UnsafeWorldCell<'w>,
18 pub(crate) state: &'s QueryState<D, F>,
19 pub(crate) last_run: Tick,
20 pub(crate) this_run: Tick,
21 pub(crate) batching_strategy: BatchingStrategy,
22}
23
24impl<'w, 's, D: IterQueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
25 /// Changes the batching strategy used when iterating.
26 ///
27 /// For more information on how this affects the resultant iteration, see
28 /// [`BatchingStrategy`].
29 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
30 self.batching_strategy = strategy;
31 self
32 }
33
34 /// Runs `func` on each query result in parallel.
35 ///
36 /// # Panics
37 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
38 /// initialized and run from the ECS scheduler, this should never panic.
39 ///
40 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
41 #[inline]
42 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
43 self.for_each_init(|| {}, |_, item| func(item));
44 }
45
46 /// Runs `func` on each query result in parallel on a value returned by `init`.
47 ///
48 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
49 /// Callers should avoid using this function as if it were a parallel version
50 /// of [`Iterator::fold`].
51 ///
52 /// # Example
53 ///
54 /// ```
55 /// use bevy_utils::Parallel;
56 /// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};
57 /// #[derive(Component)]
58 /// struct T;
59 /// fn system(query: Query<&T>){
60 /// let mut queue: Parallel<usize> = Parallel::default();
61 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
62 /// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
63 /// **local_queue += 1;
64 /// });
65 ///
66 /// // collect value from every thread
67 /// let entity_count: usize = queue.iter_mut().map(|v| *v).sum();
68 /// }
69 /// ```
70 ///
71 /// # Panics
72 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
73 /// initialized and run from the ECS scheduler, this should never panic.
74 ///
75 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
76 #[inline]
77 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
78 where
79 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
80 INIT: Fn() -> T + Sync + Send + Clone,
81 {
82 let func = |mut init, item| {
83 func(&mut init, item);
84 init
85 };
86 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
87 {
88 let init = init();
89 // SAFETY:
90 // This method can only be called once per instance of QueryParIter,
91 // which ensures that mutable queries cannot be executed multiple times at once.
92 // Mutable instances of QueryParIter can only be created via an exclusive borrow of a
93 // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
94 // at the same time.
95 unsafe {
96 self.state
97 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
98 .into_iter()
99 .fold(init, func);
100 }
101 }
102 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
103 {
104 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
105 if thread_count <= 1 {
106 let init = init();
107 // SAFETY: See the safety comment above.
108 unsafe {
109 self.state
110 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
111 .into_iter()
112 .fold(init, func);
113 }
114 } else {
115 // Need a batch size of at least 1.
116 let batch_size = self.get_batch_size(thread_count).max(1);
117 // SAFETY: See the safety comment above.
118 unsafe {
119 self.state.par_fold_init_unchecked_manual(
120 init,
121 self.world,
122 batch_size,
123 func,
124 self.last_run,
125 self.this_run,
126 );
127 }
128 }
129 }
130 }
131
132 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
133 fn get_batch_size(&self, thread_count: usize) -> u32 {
134 let max_items = || {
135 let id_iter = self.state.matched_storage_ids.iter();
136 if self.state.is_dense {
137 // SAFETY: We only access table metadata.
138 let tables = unsafe { &self.world.world_metadata().storages().tables };
139 id_iter
140 // SAFETY: The if check ensures that matched_storage_ids stores TableIds
141 .map(|id| unsafe { tables[id.table_id].entity_count() })
142 .max()
143 } else {
144 let archetypes = &self.world.archetypes();
145 id_iter
146 // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds
147 .map(|id| unsafe { archetypes[id.archetype_id].len() })
148 .max()
149 }
150 .map(|v| v as usize)
151 .unwrap_or(0)
152 };
153 self.batching_strategy
154 .calc_batch_size(max_items, thread_count) as u32
155 }
156}
157
158/// A parallel iterator over the unique query items generated from an [`Entity`] list.
159///
160/// This struct is created by the [`Query::par_iter_many`] method.
161///
162/// [`Entity`]: crate::entity::Entity
163/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
164pub struct QueryParManyIter<'w, 's, D: IterQueryData, F: QueryFilter, E: EntityEquivalent> {
165 pub(crate) world: UnsafeWorldCell<'w>,
166 pub(crate) state: &'s QueryState<D, F>,
167 pub(crate) entity_list: Vec<E>,
168 pub(crate) last_run: Tick,
169 pub(crate) this_run: Tick,
170 pub(crate) batching_strategy: BatchingStrategy,
171}
172
173impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
174 QueryParManyIter<'w, 's, D, F, E>
175{
176 /// Changes the batching strategy used when iterating.
177 ///
178 /// For more information on how this affects the resultant iteration, see
179 /// [`BatchingStrategy`].
180 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
181 self.batching_strategy = strategy;
182 self
183 }
184
185 /// Runs `func` on each query result in parallel.
186 ///
187 /// # Panics
188 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
189 /// initialized and run from the ECS scheduler, this should never panic.
190 ///
191 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
192 #[inline]
193 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
194 self.for_each_init(|| {}, |_, item| func(item));
195 }
196
197 /// Runs `func` on each query result in parallel on a value returned by `init`.
198 ///
199 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
200 /// Callers should avoid using this function as if it were a parallel version
201 /// of [`Iterator::fold`].
202 ///
203 /// # Example
204 ///
205 /// ```
206 /// use bevy_utils::Parallel;
207 /// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
208 /// # use core::slice;
209 /// use bevy_platform::prelude::Vec;
210 /// # fn some_expensive_operation(_item: &T) -> usize {
211 /// # 0
212 /// # }
213 ///
214 /// #[derive(Component)]
215 /// struct T;
216 ///
217 /// #[derive(Resource)]
218 /// struct V(Vec<Entity>);
219 ///
220 /// impl<'a> IntoIterator for &'a V {
221 /// // ...
222 /// # type Item = &'a Entity;
223 /// # type IntoIter = slice::Iter<'a, Entity>;
224 /// #
225 /// # fn into_iter(self) -> Self::IntoIter {
226 /// # self.0.iter()
227 /// # }
228 /// }
229 ///
230 /// fn system(query: Query<&T>, entities: Res<V>){
231 /// let mut queue: Parallel<usize> = Parallel::default();
232 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
233 /// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
234 /// **local_queue += some_expensive_operation(item);
235 /// });
236 ///
237 /// // collect value from every thread
238 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
239 /// }
240 /// ```
241 ///
242 /// # Panics
243 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
244 /// initialized and run from the ECS scheduler, this should never panic.
245 ///
246 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
247 #[inline]
248 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
249 where
250 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
251 INIT: Fn() -> T + Sync + Send + Clone,
252 {
253 let func = |mut init, item| {
254 func(&mut init, item);
255 init
256 };
257 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
258 {
259 let init = init();
260 // SAFETY:
261 // This method can only be called once per instance of QueryParManyIter,
262 // which ensures that mutable queries cannot be executed multiple times at once.
263 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
264 // Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
265 // at the same time.
266 unsafe {
267 self.state
268 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
269 .iter_many_inner(&self.entity_list)
270 .fold(init, func);
271 }
272 }
273 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
274 {
275 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
276 if thread_count <= 1 {
277 let init = init();
278 // SAFETY: See the safety comment above.
279 unsafe {
280 self.state
281 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
282 .iter_many_inner(&self.entity_list)
283 .fold(init, func);
284 }
285 } else {
286 // Need a batch size of at least 1.
287 let batch_size = self.get_batch_size(thread_count).max(1);
288 // SAFETY: See the safety comment above.
289 unsafe {
290 self.state.par_many_fold_init_unchecked_manual(
291 init,
292 self.world,
293 &self.entity_list,
294 batch_size,
295 func,
296 self.last_run,
297 self.this_run,
298 );
299 }
300 }
301 }
302 }
303
304 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
305 fn get_batch_size(&self, thread_count: usize) -> u32 {
306 self.batching_strategy
307 .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
308 }
309}
310
311/// A parallel iterator over the unique query items generated from an [`EntitySet`].
312///
313/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
314///
315/// [`EntitySet`]: crate::entity::EntitySet
316/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
317/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
318pub struct QueryParManyUniqueIter<
319 'w,
320 's,
321 D: IterQueryData,
322 F: QueryFilter,
323 E: EntityEquivalent + Sync,
324> {
325 pub(crate) world: UnsafeWorldCell<'w>,
326 pub(crate) state: &'s QueryState<D, F>,
327 pub(crate) entity_list: UniqueEntityEquivalentVec<E>,
328 pub(crate) last_run: Tick,
329 pub(crate) this_run: Tick,
330 pub(crate) batching_strategy: BatchingStrategy,
331}
332
333impl<'w, 's, D: IterQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
334 QueryParManyUniqueIter<'w, 's, D, F, E>
335{
336 /// Changes the batching strategy used when iterating.
337 ///
338 /// For more information on how this affects the resultant iteration, see
339 /// [`BatchingStrategy`].
340 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
341 self.batching_strategy = strategy;
342 self
343 }
344
345 /// Runs `func` on each query result in parallel.
346 ///
347 /// # Panics
348 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
349 /// initialized and run from the ECS scheduler, this should never panic.
350 ///
351 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
352 #[inline]
353 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
354 self.for_each_init(|| {}, |_, item| func(item));
355 }
356
357 /// Runs `func` on each query result in parallel on a value returned by `init`.
358 ///
359 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
360 /// Callers should avoid using this function as if it were a parallel version
361 /// of [`Iterator::fold`].
362 ///
363 /// # Example
364 ///
365 /// ```
366 /// use bevy_utils::Parallel;
367 /// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
368 /// # use core::slice;
369 /// # use crate::bevy_ecs::entity::UniqueEntityIter;
370 /// # fn some_expensive_operation(_item: &T) -> usize {
371 /// # 0
372 /// # }
373 ///
374 /// #[derive(Component)]
375 /// struct T;
376 ///
377 /// #[derive(Resource)]
378 /// struct V(UniqueEntityVec);
379 ///
380 /// impl<'a> IntoIterator for &'a V {
381 /// // ...
382 /// # type Item = &'a Entity;
383 /// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
384 /// #
385 /// # fn into_iter(self) -> Self::IntoIter {
386 /// # self.0.iter()
387 /// # }
388 /// }
389 ///
390 /// fn system(query: Query<&T>, entities: Res<V>){
391 /// let mut queue: Parallel<usize> = Parallel::default();
392 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
393 /// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
394 /// **local_queue += some_expensive_operation(item);
395 /// });
396 ///
397 /// // collect value from every thread
398 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
399 /// }
400 /// ```
401 ///
402 /// # Panics
403 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
404 /// initialized and run from the ECS scheduler, this should never panic.
405 ///
406 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
407 #[inline]
408 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
409 where
410 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
411 INIT: Fn() -> T + Sync + Send + Clone,
412 {
413 let func = |mut init, item| {
414 func(&mut init, item);
415 init
416 };
417 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
418 {
419 let init = init();
420 // SAFETY:
421 // This method can only be called once per instance of QueryParManyUniqueIter,
422 // which ensures that mutable queries cannot be executed multiple times at once.
423 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
424 // Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
425 // at the same time.
426 unsafe {
427 self.state
428 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
429 .iter_many_unique_inner(self.entity_list)
430 .fold(init, func);
431 }
432 }
433 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
434 {
435 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
436 if thread_count <= 1 {
437 let init = init();
438 // SAFETY: See the safety comment above.
439 unsafe {
440 self.state
441 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
442 .iter_many_unique_inner(self.entity_list)
443 .fold(init, func);
444 }
445 } else {
446 // Need a batch size of at least 1.
447 let batch_size = self.get_batch_size(thread_count).max(1);
448 // SAFETY: See the safety comment above.
449 unsafe {
450 self.state.par_many_unique_fold_init_unchecked_manual(
451 init,
452 self.world,
453 &self.entity_list,
454 batch_size,
455 func,
456 self.last_run,
457 self.this_run,
458 );
459 }
460 }
461 }
462 }
463
464 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
465 fn get_batch_size(&self, thread_count: usize) -> u32 {
466 self.batching_strategy
467 .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
468 }
469}