bevy_ecs/query/par_iter.rs
1use crate::{
2 batching::BatchingStrategy,
3 component::Tick,
4 entity::{EntityEquivalent, UniqueEntityEquivalentVec},
5 world::unsafe_world_cell::UnsafeWorldCell,
6};
7
8use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9
10use alloc::vec::Vec;
11
12/// A parallel iterator over query results of a [`Query`](crate::system::Query).
13///
14/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
15/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
16pub struct QueryParIter<'w, 's, D: QueryData, F: QueryFilter> {
17 pub(crate) world: UnsafeWorldCell<'w>,
18 pub(crate) state: &'s QueryState<D, F>,
19 pub(crate) last_run: Tick,
20 pub(crate) this_run: Tick,
21 pub(crate) batching_strategy: BatchingStrategy,
22}
23
24impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
25 /// Changes the batching strategy used when iterating.
26 ///
27 /// For more information on how this affects the resultant iteration, see
28 /// [`BatchingStrategy`].
29 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
30 self.batching_strategy = strategy;
31 self
32 }
33
34 /// Runs `func` on each query result in parallel.
35 ///
36 /// # Panics
37 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
38 /// initialized and run from the ECS scheduler, this should never panic.
39 ///
40 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
41 #[inline]
42 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
43 self.for_each_init(|| {}, |_, item| func(item));
44 }
45
46 /// Runs `func` on each query result in parallel on a value returned by `init`.
47 ///
48 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
49 /// Callers should avoid using this function as if it were a parallel version
50 /// of [`Iterator::fold`].
51 ///
52 /// # Example
53 ///
54 /// ```
55 /// use bevy_utils::Parallel;
56 /// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};
57 /// #[derive(Component)]
58 /// struct T;
59 /// fn system(query: Query<&T>){
60 /// let mut queue: Parallel<usize> = Parallel::default();
61 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
62 /// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
63 /// **local_queue += 1;
64 /// });
65 ///
66 /// // collect value from every thread
67 /// let entity_count: usize = queue.iter_mut().map(|v| *v).sum();
68 /// }
69 /// ```
70 ///
71 /// # Panics
72 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
73 /// initialized and run from the ECS scheduler, this should never panic.
74 ///
75 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
76 #[inline]
77 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
78 where
79 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
80 INIT: Fn() -> T + Sync + Send + Clone,
81 {
82 let func = |mut init, item| {
83 func(&mut init, item);
84 init
85 };
86 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
87 {
88 let init = init();
89 // SAFETY:
90 // This method can only be called once per instance of QueryParIter,
91 // which ensures that mutable queries cannot be executed multiple times at once.
92 // Mutable instances of QueryParIter can only be created via an exclusive borrow of a
93 // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
94 // at the same time.
95 unsafe {
96 self.state
97 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
98 .into_iter()
99 .fold(init, func);
100 }
101 }
102 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
103 {
104 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
105 if thread_count <= 1 {
106 let init = init();
107 // SAFETY: See the safety comment above.
108 unsafe {
109 self.state
110 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
111 .into_iter()
112 .fold(init, func);
113 }
114 } else {
115 // Need a batch size of at least 1.
116 let batch_size = self.get_batch_size(thread_count).max(1);
117 // SAFETY: See the safety comment above.
118 unsafe {
119 self.state.par_fold_init_unchecked_manual(
120 init,
121 self.world,
122 batch_size,
123 func,
124 self.last_run,
125 self.this_run,
126 );
127 }
128 }
129 }
130 }
131
132 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
133 fn get_batch_size(&self, thread_count: usize) -> u32 {
134 let max_items = || {
135 let id_iter = self.state.matched_storage_ids.iter();
136 if self.state.is_dense {
137 // SAFETY: We only access table metadata.
138 let tables = unsafe { &self.world.world_metadata().storages().tables };
139 id_iter
140 // SAFETY: The if check ensures that matched_storage_ids stores TableIds
141 .map(|id| unsafe { tables[id.table_id].entity_count() })
142 .max()
143 } else {
144 let archetypes = &self.world.archetypes();
145 id_iter
146 // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds
147 .map(|id| unsafe { archetypes[id.archetype_id].len() })
148 .max()
149 }
150 .map(|v| v as usize)
151 .unwrap_or(0)
152 };
153 self.batching_strategy
154 .calc_batch_size(max_items, thread_count) as u32
155 }
156}
157
158/// A parallel iterator over the unique query items generated from an [`Entity`] list.
159///
160/// This struct is created by the [`Query::par_iter_many`] method.
161///
162/// [`Entity`]: crate::entity::Entity
163/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
164pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent> {
165 pub(crate) world: UnsafeWorldCell<'w>,
166 pub(crate) state: &'s QueryState<D, F>,
167 pub(crate) entity_list: Vec<E>,
168 pub(crate) last_run: Tick,
169 pub(crate) this_run: Tick,
170 pub(crate) batching_strategy: BatchingStrategy,
171}
172
173impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
174 QueryParManyIter<'w, 's, D, F, E>
175{
176 /// Changes the batching strategy used when iterating.
177 ///
178 /// For more information on how this affects the resultant iteration, see
179 /// [`BatchingStrategy`].
180 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
181 self.batching_strategy = strategy;
182 self
183 }
184
185 /// Runs `func` on each query result in parallel.
186 ///
187 /// # Panics
188 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
189 /// initialized and run from the ECS scheduler, this should never panic.
190 ///
191 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
192 #[inline]
193 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
194 self.for_each_init(|| {}, |_, item| func(item));
195 }
196
197 /// Runs `func` on each query result in parallel on a value returned by `init`.
198 ///
199 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
200 /// Callers should avoid using this function as if it were a parallel version
201 /// of [`Iterator::fold`].
202 ///
203 /// # Example
204 ///
205 /// ```
206 /// use bevy_utils::Parallel;
207 /// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
208 /// # use core::slice;
209 /// use bevy_platform::prelude::Vec;
210 /// # fn some_expensive_operation(_item: &T) -> usize {
211 /// # 0
212 /// # }
213 ///
214 /// #[derive(Component)]
215 /// struct T;
216 ///
217 /// #[derive(Resource)]
218 /// struct V(Vec<Entity>);
219 ///
220 /// impl<'a> IntoIterator for &'a V {
221 /// // ...
222 /// # type Item = &'a Entity;
223 /// # type IntoIter = slice::Iter<'a, Entity>;
224 /// #
225 /// # fn into_iter(self) -> Self::IntoIter {
226 /// # self.0.iter()
227 /// # }
228 /// }
229 ///
230 /// fn system(query: Query<&T>, entities: Res<V>){
231 /// let mut queue: Parallel<usize> = Parallel::default();
232 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
233 /// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
234 /// **local_queue += some_expensive_operation(item);
235 /// });
236 ///
237 /// // collect value from every thread
238 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
239 /// }
240 /// ```
241 ///
242 /// # Panics
243 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
244 /// initialized and run from the ECS scheduler, this should never panic.
245 ///
246 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
247 #[inline]
248 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
249 where
250 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
251 INIT: Fn() -> T + Sync + Send + Clone,
252 {
253 let func = |mut init, item| {
254 func(&mut init, item);
255 init
256 };
257 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
258 {
259 let init = init();
260 // SAFETY:
261 // This method can only be called once per instance of QueryParManyIter,
262 // which ensures that mutable queries cannot be executed multiple times at once.
263 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
264 // Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
265 // at the same time.
266 unsafe {
267 self.state
268 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
269 .iter_many_inner(&self.entity_list)
270 .fold(init, func);
271 }
272 }
273 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
274 {
275 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
276 if thread_count <= 1 {
277 let init = init();
278 // SAFETY: See the safety comment above.
279 unsafe {
280 self.state
281 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
282 .iter_many_inner(&self.entity_list)
283 .fold(init, func);
284 }
285 } else {
286 // Need a batch size of at least 1.
287 let batch_size = self.get_batch_size(thread_count).max(1);
288 // SAFETY: See the safety comment above.
289 unsafe {
290 self.state.par_many_fold_init_unchecked_manual(
291 init,
292 self.world,
293 &self.entity_list,
294 batch_size,
295 func,
296 self.last_run,
297 self.this_run,
298 );
299 }
300 }
301 }
302 }
303
304 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
305 fn get_batch_size(&self, thread_count: usize) -> u32 {
306 self.batching_strategy
307 .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
308 }
309}
310
311/// A parallel iterator over the unique query items generated from an [`EntitySet`].
312///
313/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
314///
315/// [`EntitySet`]: crate::entity::EntitySet
316/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
317/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
318pub struct QueryParManyUniqueIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
319{
320 pub(crate) world: UnsafeWorldCell<'w>,
321 pub(crate) state: &'s QueryState<D, F>,
322 pub(crate) entity_list: UniqueEntityEquivalentVec<E>,
323 pub(crate) last_run: Tick,
324 pub(crate) this_run: Tick,
325 pub(crate) batching_strategy: BatchingStrategy,
326}
327
328impl<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
329 QueryParManyUniqueIter<'w, 's, D, F, E>
330{
331 /// Changes the batching strategy used when iterating.
332 ///
333 /// For more information on how this affects the resultant iteration, see
334 /// [`BatchingStrategy`].
335 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
336 self.batching_strategy = strategy;
337 self
338 }
339
340 /// Runs `func` on each query result in parallel.
341 ///
342 /// # Panics
343 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
344 /// initialized and run from the ECS scheduler, this should never panic.
345 ///
346 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
347 #[inline]
348 pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
349 self.for_each_init(|| {}, |_, item| func(item));
350 }
351
352 /// Runs `func` on each query result in parallel on a value returned by `init`.
353 ///
354 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
355 /// Callers should avoid using this function as if it were a parallel version
356 /// of [`Iterator::fold`].
357 ///
358 /// # Example
359 ///
360 /// ```
361 /// use bevy_utils::Parallel;
362 /// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
363 /// # use core::slice;
364 /// # use crate::bevy_ecs::entity::UniqueEntityIter;
365 /// # fn some_expensive_operation(_item: &T) -> usize {
366 /// # 0
367 /// # }
368 ///
369 /// #[derive(Component)]
370 /// struct T;
371 ///
372 /// #[derive(Resource)]
373 /// struct V(UniqueEntityVec);
374 ///
375 /// impl<'a> IntoIterator for &'a V {
376 /// // ...
377 /// # type Item = &'a Entity;
378 /// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
379 /// #
380 /// # fn into_iter(self) -> Self::IntoIter {
381 /// # self.0.iter()
382 /// # }
383 /// }
384 ///
385 /// fn system(query: Query<&T>, entities: Res<V>){
386 /// let mut queue: Parallel<usize> = Parallel::default();
387 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
388 /// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
389 /// **local_queue += some_expensive_operation(item);
390 /// });
391 ///
392 /// // collect value from every thread
393 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
394 /// }
395 /// ```
396 ///
397 /// # Panics
398 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
399 /// initialized and run from the ECS scheduler, this should never panic.
400 ///
401 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
402 #[inline]
403 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
404 where
405 FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
406 INIT: Fn() -> T + Sync + Send + Clone,
407 {
408 let func = |mut init, item| {
409 func(&mut init, item);
410 init
411 };
412 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
413 {
414 let init = init();
415 // SAFETY:
416 // This method can only be called once per instance of QueryParManyUniqueIter,
417 // which ensures that mutable queries cannot be executed multiple times at once.
418 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
419 // Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
420 // at the same time.
421 unsafe {
422 self.state
423 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
424 .iter_many_unique_inner(self.entity_list)
425 .fold(init, func);
426 }
427 }
428 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
429 {
430 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
431 if thread_count <= 1 {
432 let init = init();
433 // SAFETY: See the safety comment above.
434 unsafe {
435 self.state
436 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
437 .iter_many_unique_inner(self.entity_list)
438 .fold(init, func);
439 }
440 } else {
441 // Need a batch size of at least 1.
442 let batch_size = self.get_batch_size(thread_count).max(1);
443 // SAFETY: See the safety comment above.
444 unsafe {
445 self.state.par_many_unique_fold_init_unchecked_manual(
446 init,
447 self.world,
448 &self.entity_list,
449 batch_size,
450 func,
451 self.last_run,
452 self.this_run,
453 );
454 }
455 }
456 }
457 }
458
459 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
460 fn get_batch_size(&self, thread_count: usize) -> u32 {
461 self.batching_strategy
462 .calc_batch_size(|| self.entity_list.len(), thread_count) as u32
463 }
464}