bevy_ecs/query/par_iter.rs
1use crate::{
2 batching::BatchingStrategy,
3 component::Tick,
4 entity::{EntityEquivalent, UniqueEntityEquivalentVec},
5 world::unsafe_world_cell::UnsafeWorldCell,
6};
7
8use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9
10use alloc::vec::Vec;
11
12/// A parallel iterator over query results of a [`Query`](crate::system::Query).
13///
14/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
15/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
16pub struct QueryParIter<'w, 's, D: QueryData, F: QueryFilter> {
17 pub(crate) world: UnsafeWorldCell<'w>,
18 pub(crate) state: &'s QueryState<D, F>,
19 pub(crate) last_run: Tick,
20 pub(crate) this_run: Tick,
21 pub(crate) batching_strategy: BatchingStrategy,
22}
23
24impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
25 /// Changes the batching strategy used when iterating.
26 ///
27 /// For more information on how this affects the resultant iteration, see
28 /// [`BatchingStrategy`].
29 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
30 self.batching_strategy = strategy;
31 self
32 }
33
34 /// Runs `func` on each query result in parallel.
35 ///
36 /// # Panics
37 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
38 /// initialized and run from the ECS scheduler, this should never panic.
39 ///
40 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
41 #[inline]
42 pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
43 self.for_each_init(|| {}, |_, item| func(item));
44 }
45
46 /// Runs `func` on each query result in parallel on a value returned by `init`.
47 ///
48 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
49 /// Callers should avoid using this function as if it were a parallel version
50 /// of [`Iterator::fold`].
51 ///
52 /// # Example
53 ///
54 /// ```
55 /// use bevy_utils::Parallel;
56 /// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};
57 /// #[derive(Component)]
58 /// struct T;
59 /// fn system(query: Query<&T>){
60 /// let mut queue: Parallel<usize> = Parallel::default();
61 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
62 /// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
63 /// **local_queue += 1;
64 /// });
65 ///
66 /// // collect value from every thread
67 /// let entity_count: usize = queue.iter_mut().map(|v| *v).sum();
68 /// }
69 /// ```
70 ///
71 /// # Panics
72 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
73 /// initialized and run from the ECS scheduler, this should never panic.
74 ///
75 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
76 #[inline]
77 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
78 where
79 FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
80 INIT: Fn() -> T + Sync + Send + Clone,
81 {
82 let func = |mut init, item| {
83 func(&mut init, item);
84 init
85 };
86 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
87 {
88 let init = init();
89 // SAFETY:
90 // This method can only be called once per instance of QueryParIter,
91 // which ensures that mutable queries cannot be executed multiple times at once.
92 // Mutable instances of QueryParIter can only be created via an exclusive borrow of a
93 // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
94 // at the same time.
95 unsafe {
96 self.state
97 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
98 .into_iter()
99 .fold(init, func);
100 }
101 }
102 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
103 {
104 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
105 if thread_count <= 1 {
106 let init = init();
107 // SAFETY: See the safety comment above.
108 unsafe {
109 self.state
110 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
111 .into_iter()
112 .fold(init, func);
113 }
114 } else {
115 // Need a batch size of at least 1.
116 let batch_size = self.get_batch_size(thread_count).max(1);
117 // SAFETY: See the safety comment above.
118 unsafe {
119 self.state.par_fold_init_unchecked_manual(
120 init,
121 self.world,
122 batch_size,
123 func,
124 self.last_run,
125 self.this_run,
126 );
127 }
128 }
129 }
130 }
131
132 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
133 fn get_batch_size(&self, thread_count: usize) -> usize {
134 let max_items = || {
135 let id_iter = self.state.matched_storage_ids.iter();
136 if self.state.is_dense {
137 // SAFETY: We only access table metadata.
138 let tables = unsafe { &self.world.world_metadata().storages().tables };
139 id_iter
140 // SAFETY: The if check ensures that matched_storage_ids stores TableIds
141 .map(|id| unsafe { tables[id.table_id].entity_count() })
142 .max()
143 } else {
144 let archetypes = &self.world.archetypes();
145 id_iter
146 // SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds
147 .map(|id| unsafe { archetypes[id.archetype_id].len() })
148 .max()
149 }
150 .unwrap_or(0)
151 };
152 self.batching_strategy
153 .calc_batch_size(max_items, thread_count)
154 }
155}
156
157/// A parallel iterator over the unique query items generated from an [`Entity`] list.
158///
159/// This struct is created by the [`Query::par_iter_many`] method.
160///
161/// [`Entity`]: crate::entity::Entity
162/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
163pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent> {
164 pub(crate) world: UnsafeWorldCell<'w>,
165 pub(crate) state: &'s QueryState<D, F>,
166 pub(crate) entity_list: Vec<E>,
167 pub(crate) last_run: Tick,
168 pub(crate) this_run: Tick,
169 pub(crate) batching_strategy: BatchingStrategy,
170}
171
172impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
173 QueryParManyIter<'w, 's, D, F, E>
174{
175 /// Changes the batching strategy used when iterating.
176 ///
177 /// For more information on how this affects the resultant iteration, see
178 /// [`BatchingStrategy`].
179 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
180 self.batching_strategy = strategy;
181 self
182 }
183
184 /// Runs `func` on each query result in parallel.
185 ///
186 /// # Panics
187 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
188 /// initialized and run from the ECS scheduler, this should never panic.
189 ///
190 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
191 #[inline]
192 pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
193 self.for_each_init(|| {}, |_, item| func(item));
194 }
195
196 /// Runs `func` on each query result in parallel on a value returned by `init`.
197 ///
198 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
199 /// Callers should avoid using this function as if it were a parallel version
200 /// of [`Iterator::fold`].
201 ///
202 /// # Example
203 ///
204 /// ```
205 /// use bevy_utils::Parallel;
206 /// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
207 /// # use core::slice;
208 /// use bevy_platform::prelude::Vec;
209 /// # fn some_expensive_operation(_item: &T) -> usize {
210 /// # 0
211 /// # }
212 ///
213 /// #[derive(Component)]
214 /// struct T;
215 ///
216 /// #[derive(Resource)]
217 /// struct V(Vec<Entity>);
218 ///
219 /// impl<'a> IntoIterator for &'a V {
220 /// // ...
221 /// # type Item = &'a Entity;
222 /// # type IntoIter = slice::Iter<'a, Entity>;
223 /// #
224 /// # fn into_iter(self) -> Self::IntoIter {
225 /// # self.0.iter()
226 /// # }
227 /// }
228 ///
229 /// fn system(query: Query<&T>, entities: Res<V>){
230 /// let mut queue: Parallel<usize> = Parallel::default();
231 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
232 /// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
233 /// **local_queue += some_expensive_operation(item);
234 /// });
235 ///
236 /// // collect value from every thread
237 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
238 /// }
239 /// ```
240 ///
241 /// # Panics
242 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
243 /// initialized and run from the ECS scheduler, this should never panic.
244 ///
245 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
246 #[inline]
247 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
248 where
249 FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
250 INIT: Fn() -> T + Sync + Send + Clone,
251 {
252 let func = |mut init, item| {
253 func(&mut init, item);
254 init
255 };
256 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
257 {
258 let init = init();
259 // SAFETY:
260 // This method can only be called once per instance of QueryParManyIter,
261 // which ensures that mutable queries cannot be executed multiple times at once.
262 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
263 // Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
264 // at the same time.
265 unsafe {
266 self.state
267 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
268 .iter_many_inner(&self.entity_list)
269 .fold(init, func);
270 }
271 }
272 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
273 {
274 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
275 if thread_count <= 1 {
276 let init = init();
277 // SAFETY: See the safety comment above.
278 unsafe {
279 self.state
280 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
281 .iter_many_inner(&self.entity_list)
282 .fold(init, func);
283 }
284 } else {
285 // Need a batch size of at least 1.
286 let batch_size = self.get_batch_size(thread_count).max(1);
287 // SAFETY: See the safety comment above.
288 unsafe {
289 self.state.par_many_fold_init_unchecked_manual(
290 init,
291 self.world,
292 &self.entity_list,
293 batch_size,
294 func,
295 self.last_run,
296 self.this_run,
297 );
298 }
299 }
300 }
301 }
302
303 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
304 fn get_batch_size(&self, thread_count: usize) -> usize {
305 self.batching_strategy
306 .calc_batch_size(|| self.entity_list.len(), thread_count)
307 }
308}
309
310/// A parallel iterator over the unique query items generated from an [`EntitySet`].
311///
312/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
313///
314/// [`EntitySet`]: crate::entity::EntitySet
315/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
316/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
317pub struct QueryParManyUniqueIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
318{
319 pub(crate) world: UnsafeWorldCell<'w>,
320 pub(crate) state: &'s QueryState<D, F>,
321 pub(crate) entity_list: UniqueEntityEquivalentVec<E>,
322 pub(crate) last_run: Tick,
323 pub(crate) this_run: Tick,
324 pub(crate) batching_strategy: BatchingStrategy,
325}
326
327impl<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
328 QueryParManyUniqueIter<'w, 's, D, F, E>
329{
330 /// Changes the batching strategy used when iterating.
331 ///
332 /// For more information on how this affects the resultant iteration, see
333 /// [`BatchingStrategy`].
334 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
335 self.batching_strategy = strategy;
336 self
337 }
338
339 /// Runs `func` on each query result in parallel.
340 ///
341 /// # Panics
342 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
343 /// initialized and run from the ECS scheduler, this should never panic.
344 ///
345 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
346 #[inline]
347 pub fn for_each<FN: Fn(QueryItem<'w, D>) + Send + Sync + Clone>(self, func: FN) {
348 self.for_each_init(|| {}, |_, item| func(item));
349 }
350
351 /// Runs `func` on each query result in parallel on a value returned by `init`.
352 ///
353 /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
354 /// Callers should avoid using this function as if it were a parallel version
355 /// of [`Iterator::fold`].
356 ///
357 /// # Example
358 ///
359 /// ```
360 /// use bevy_utils::Parallel;
361 /// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
362 /// # use core::slice;
363 /// # use crate::bevy_ecs::entity::UniqueEntityIter;
364 /// # fn some_expensive_operation(_item: &T) -> usize {
365 /// # 0
366 /// # }
367 ///
368 /// #[derive(Component)]
369 /// struct T;
370 ///
371 /// #[derive(Resource)]
372 /// struct V(UniqueEntityVec);
373 ///
374 /// impl<'a> IntoIterator for &'a V {
375 /// // ...
376 /// # type Item = &'a Entity;
377 /// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
378 /// #
379 /// # fn into_iter(self) -> Self::IntoIter {
380 /// # self.0.iter()
381 /// # }
382 /// }
383 ///
384 /// fn system(query: Query<&T>, entities: Res<V>){
385 /// let mut queue: Parallel<usize> = Parallel::default();
386 /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
387 /// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
388 /// **local_queue += some_expensive_operation(item);
389 /// });
390 ///
391 /// // collect value from every thread
392 /// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
393 /// }
394 /// ```
395 ///
396 /// # Panics
397 /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
398 /// initialized and run from the ECS scheduler, this should never panic.
399 ///
400 /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
401 #[inline]
402 pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
403 where
404 FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone,
405 INIT: Fn() -> T + Sync + Send + Clone,
406 {
407 let func = |mut init, item| {
408 func(&mut init, item);
409 init
410 };
411 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
412 {
413 let init = init();
414 // SAFETY:
415 // This method can only be called once per instance of QueryParManyUniqueIter,
416 // which ensures that mutable queries cannot be executed multiple times at once.
417 // Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
418 // Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
419 // at the same time.
420 unsafe {
421 self.state
422 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
423 .iter_many_unique_inner(self.entity_list)
424 .fold(init, func);
425 }
426 }
427 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
428 {
429 let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
430 if thread_count <= 1 {
431 let init = init();
432 // SAFETY: See the safety comment above.
433 unsafe {
434 self.state
435 .query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
436 .iter_many_unique_inner(self.entity_list)
437 .fold(init, func);
438 }
439 } else {
440 // Need a batch size of at least 1.
441 let batch_size = self.get_batch_size(thread_count).max(1);
442 // SAFETY: See the safety comment above.
443 unsafe {
444 self.state.par_many_unique_fold_init_unchecked_manual(
445 init,
446 self.world,
447 &self.entity_list,
448 batch_size,
449 func,
450 self.last_run,
451 self.this_run,
452 );
453 }
454 }
455 }
456 }
457
458 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
459 fn get_batch_size(&self, thread_count: usize) -> usize {
460 self.batching_strategy
461 .calc_batch_size(|| self.entity_list.len(), thread_count)
462 }
463}