Skip to main content

bevy_ecs/query/
par_iter.rs

1use crate::{
2    batching::BatchingStrategy,
3    change_detection::Tick,
4    entity::{EntityEquivalent, UniqueEntityEquivalentVec},
5    world::unsafe_world_cell::UnsafeWorldCell,
6};
7
8use super::{IterQueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9
10use alloc::vec::Vec;
11
12/// A parallel iterator over query results of a [`Query`](crate::system::Query).
13///
14/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
15/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
16pub struct QueryParIter<'w, 's, D: IterQueryData, F: QueryFilter> {
17    pub(crate) world: UnsafeWorldCell<'w>,
18    pub(crate) state: &'s QueryState<D, F>,
19    pub(crate) last_run: Tick,
20    pub(crate) this_run: Tick,
21    pub(crate) batching_strategy: BatchingStrategy,
22}
23
24impl<'w, 's, D: IterQueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
25    /// Changes the batching strategy used when iterating.
26    ///
27    /// For more information on how this affects the resultant iteration, see
28    /// [`BatchingStrategy`].
29    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
30        self.batching_strategy = strategy;
31        self
32    }
33
34    /// Runs `func` on each query result in parallel.
35    ///
36    /// # Panics
37    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
38    /// initialized and run from the ECS scheduler, this should never panic.
39    ///
40    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
41    #[inline]
42    pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
43        self.for_each_init(|| {}, |_, item| func(item));
44    }
45
46    /// Runs `func` on each query result in parallel on a value returned by `init`.
47    ///
48    /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
49    /// Callers should avoid using this function as if it were a parallel version
50    /// of [`Iterator::fold`].
51    ///
52    /// # Example
53    ///
54    /// ```
55    /// use bevy_utils::Parallel;
56    /// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};
57    /// #[derive(Component)]
58    /// struct T;
59    /// fn system(query: Query<&T>){
60    ///     let mut queue: Parallel<usize> = Parallel::default();
61    ///     // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
62    ///     query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
63    ///         **local_queue += 1;
64    ///      });
65    ///
66    ///     // collect value from every thread
67    ///     let entity_count: usize = queue.iter_mut().map(|v| *v).sum();
68    /// }
69    /// ```
70    ///
71    /// # Panics
72    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
73    /// initialized and run from the ECS scheduler, this should never panic.
74    ///
75    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
76    #[inline]
77    pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
78    where
79        FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
80        INIT: Fn() -> T + Sync + Send + Clone,
81    {
82        let func = |mut init, item| {
83            func(&mut init, item);
84            init
85        };
86        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
87        {
88            let init = init();
89            // SAFETY:
90            // This method can only be called once per instance of QueryParIter,
91            // which ensures that mutable queries cannot be executed multiple times at once.
92            // Mutable instances of QueryParIter can only be created via an exclusive borrow of a
93            // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
94            // at the same time.
95            unsafe {
96                self.state
97                    .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
98                    .into_iter()
99                    .fold(init, func);
100            }
101        }
102        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
103        {
104            let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
105            if thread_count <= 1 {
106                let init = init();
107                // SAFETY: See the safety comment above.
108                unsafe {
109                    self.state
110                        .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
111                        .into_iter()
112                        .fold(init, func);
113                }
114            } else {
115                // Need a batch size of at least 1.
116                let batch_size = self.get_batch_size(thread_count).max(1);
117                // SAFETY: See the safety comment above.
118                unsafe {
119                    self.state.par_fold_init_unchecked_manual(
120                        init,
121                        self.world,
122                        batch_size,
123                        func,
124                        self.last_run,
125                        self.this_run,
126                    );
127                }
128            }
129        }
130    }
131
132    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
133    fn get_batch_size(&self, thread_count: usize) -> u32 {
134        let max_items = || {
135            let id_iter = self.state.matched_storage_ids.iter();
136            if self.state.is_dense {
137                // SAFETY: We only access table metadata.
138                let tables = unsafe { &self.world.world_metadata().storages().tables };
139                id_iter
140                    // SAFETY: The if check ensures that matched_storage_ids stores TableIds
141                    .map(|id| unsafe { tables[id.table_id].entity_count() })
142                    .max()
143            } else {
144                let archetypes = &self.world.archetypes();
145                id_iter
146                    // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds
147                    .map(|id| unsafe { archetypes[id.archetype_id].len() })
148                    .max()
149            }
150            .map(|v| v as usize)
151            .unwrap_or(0)
152        };
153        self.batching_strategy
154            .calc_batch_size(max_items, thread_count) as u32
155    }
156}
157
158/// A parallel iterator over the unique query items generated from an [`Entity`] list.
159///
160/// This struct is created by the [`Query::par_iter_many`] method.
161///
162/// [`Entity`]: crate::entity::Entity
163/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
164pub struct QueryParManyIter<'w, 's, D: IterQueryData, F: QueryFilter, E: EntityEquivalent> {
165    pub(crate) world: UnsafeWorldCell<'w>,
166    pub(crate) state: &'s QueryState<D, F>,
167    pub(crate) entity_list: Vec<E>,
168    pub(crate) last_run: Tick,
169    pub(crate) this_run: Tick,
170    pub(crate) batching_strategy: BatchingStrategy,
171}
172
173impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
174    QueryParManyIter<'w, 's, D, F, E>
175{
176    /// Changes the batching strategy used when iterating.
177    ///
178    /// For more information on how this affects the resultant iteration, see
179    /// [`BatchingStrategy`].
180    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
181        self.batching_strategy = strategy;
182        self
183    }
184
185    /// Runs `func` on each query result in parallel.
186    ///
187    /// # Panics
188    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
189    /// initialized and run from the ECS scheduler, this should never panic.
190    ///
191    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
192    #[inline]
193    pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
194        self.for_each_init(|| {}, |_, item| func(item));
195    }
196
197    /// Runs `func` on each query result in parallel on a value returned by `init`.
198    ///
199    /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
200    /// Callers should avoid using this function as if it were a parallel version
201    /// of [`Iterator::fold`].
202    ///
203    /// # Example
204    ///
205    /// ```
206    /// use bevy_utils::Parallel;
207    /// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
208    /// # use core::slice;
209    /// use bevy_platform::prelude::Vec;
210    /// # fn some_expensive_operation(_item: &T) -> usize {
211    /// #     0
212    /// # }
213    ///
214    /// #[derive(Component)]
215    /// struct T;
216    ///
217    /// #[derive(Resource)]
218    /// struct V(Vec<Entity>);
219    ///
220    /// impl<'a> IntoIterator for &'a V {
221    /// // ...
222    /// #   type Item = &'a Entity;
223    /// #   type IntoIter = slice::Iter<'a, Entity>;
224    /// #
225    /// #    fn into_iter(self) -> Self::IntoIter {
226    /// #        self.0.iter()
227    /// #    }
228    /// }
229    ///
230    /// fn system(query: Query<&T>, entities: Res<V>){
231    ///     let mut queue: Parallel<usize> = Parallel::default();
232    ///     // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
233    ///     query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
234    ///         **local_queue += some_expensive_operation(item);
235    ///     });
236    ///
237    ///     // collect value from every thread
238    ///     let final_value: usize = queue.iter_mut().map(|v| *v).sum();
239    /// }
240    /// ```
241    ///
242    /// # Panics
243    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
244    /// initialized and run from the ECS scheduler, this should never panic.
245    ///
246    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
247    #[inline]
248    pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
249    where
250        FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
251        INIT: Fn() -> T + Sync + Send + Clone,
252    {
253        let func = |mut init, item| {
254            func(&mut init, item);
255            init
256        };
257        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
258        {
259            let init = init();
260            // SAFETY:
261            // This method can only be called once per instance of QueryParManyIter,
262            // which ensures that mutable queries cannot be executed multiple times at once.
263            // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
264            // Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
265            // at the same time.
266            unsafe {
267                self.state
268                    .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
269                    .iter_many_inner(&self.entity_list)
270                    .fold(init, func);
271            }
272        }
273        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
274        {
275            let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
276            if thread_count <= 1 {
277                let init = init();
278                // SAFETY: See the safety comment above.
279                unsafe {
280                    self.state
281                        .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
282                        .iter_many_inner(&self.entity_list)
283                        .fold(init, func);
284                }
285            } else {
286                // Need a batch size of at least 1.
287                let batch_size = self.get_batch_size(thread_count).max(1);
288                // SAFETY: See the safety comment above.
289                unsafe {
290                    self.state.par_many_fold_init_unchecked_manual(
291                        init,
292                        self.world,
293                        &self.entity_list,
294                        batch_size,
295                        func,
296                        self.last_run,
297                        self.this_run,
298                    );
299                }
300            }
301        }
302    }
303
304    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
305    fn get_batch_size(&self, thread_count: usize) -> u32 {
306        self.batching_strategy
307            .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
308    }
309}
310
311/// A parallel iterator over the unique query items generated from an [`EntitySet`].
312///
313/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
314///
315/// [`EntitySet`]: crate::entity::EntitySet
316/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
317/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
318pub struct QueryParManyUniqueIter<
319    'w,
320    's,
321    D: IterQueryData,
322    F: QueryFilter,
323    E: EntityEquivalent + Sync,
324> {
325    pub(crate) world: UnsafeWorldCell<'w>,
326    pub(crate) state: &'s QueryState<D, F>,
327    pub(crate) entity_list: UniqueEntityEquivalentVec<E>,
328    pub(crate) last_run: Tick,
329    pub(crate) this_run: Tick,
330    pub(crate) batching_strategy: BatchingStrategy,
331}
332
333impl<'w, 's, D: IterQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
334    QueryParManyUniqueIter<'w, 's, D, F, E>
335{
336    /// Changes the batching strategy used when iterating.
337    ///
338    /// For more information on how this affects the resultant iteration, see
339    /// [`BatchingStrategy`].
340    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
341        self.batching_strategy = strategy;
342        self
343    }
344
345    /// Runs `func` on each query result in parallel.
346    ///
347    /// # Panics
348    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
349    /// initialized and run from the ECS scheduler, this should never panic.
350    ///
351    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
352    #[inline]
353    pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
354        self.for_each_init(|| {}, |_, item| func(item));
355    }
356
357    /// Runs `func` on each query result in parallel on a value returned by `init`.
358    ///
359    /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
360    /// Callers should avoid using this function as if it were a parallel version
361    /// of [`Iterator::fold`].
362    ///
363    /// # Example
364    ///
365    /// ```
366    /// use bevy_utils::Parallel;
367    /// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
368    /// # use core::slice;
369    /// # use crate::bevy_ecs::entity::UniqueEntityIter;
370    /// # fn some_expensive_operation(_item: &T) -> usize {
371    /// #     0
372    /// # }
373    ///
374    /// #[derive(Component)]
375    /// struct T;
376    ///
377    /// #[derive(Resource)]
378    /// struct V(UniqueEntityVec);
379    ///
380    /// impl<'a> IntoIterator for &'a V {
381    /// // ...
382    /// #   type Item = &'a Entity;
383    /// #   type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
384    /// #
385    /// #    fn into_iter(self) -> Self::IntoIter {
386    /// #        self.0.iter()
387    /// #    }
388    /// }
389    ///
390    /// fn system(query: Query<&T>, entities: Res<V>){
391    ///     let mut queue: Parallel<usize> = Parallel::default();
392    ///     // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
393    ///     query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
394    ///         **local_queue += some_expensive_operation(item);
395    ///     });
396    ///
397    ///     // collect value from every thread
398    ///     let final_value: usize = queue.iter_mut().map(|v| *v).sum();
399    /// }
400    /// ```
401    ///
402    /// # Panics
403    /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
404    /// initialized and run from the ECS scheduler, this should never panic.
405    ///
406    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
407    #[inline]
408    pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
409    where
410        FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
411        INIT: Fn() -> T + Sync + Send + Clone,
412    {
413        let func = |mut init, item| {
414            func(&mut init, item);
415            init
416        };
417        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
418        {
419            let init = init();
420            // SAFETY:
421            // This method can only be called once per instance of QueryParManyUniqueIter,
422            // which ensures that mutable queries cannot be executed multiple times at once.
423            // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
424            // Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
425            // at the same time.
426            unsafe {
427                self.state
428                    .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
429                    .iter_many_unique_inner(self.entity_list)
430                    .fold(init, func);
431            }
432        }
433        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
434        {
435            let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
436            if thread_count <= 1 {
437                let init = init();
438                // SAFETY: See the safety comment above.
439                unsafe {
440                    self.state
441                        .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
442                        .iter_many_unique_inner(self.entity_list)
443                        .fold(init, func);
444                }
445            } else {
446                // Need a batch size of at least 1.
447                let batch_size = self.get_batch_size(thread_count).max(1);
448                // SAFETY: See the safety comment above.
449                unsafe {
450                    self.state.par_many_unique_fold_init_unchecked_manual(
451                        init,
452                        self.world,
453                        &self.entity_list,
454                        batch_size,
455                        func,
456                        self.last_run,
457                        self.this_run,
458                    );
459                }
460            }
461        }
462    }
463
464    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
465    fn get_batch_size(&self, thread_count: usize) -> u32 {
466        self.batching_strategy
467            .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
468    }
469}