bevy_ecs/event/
mut_iterators.rs

1#[cfg(feature = "multi_threaded")]
2use bevy_ecs::batching::BatchingStrategy;
3use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
4use core::{iter::Chain, slice::IterMut};
5
6/// An iterator that yields any unread events from an [`EventMutator`] or [`EventCursor`].
7///
8/// [`EventMutator`]: super::EventMutator
9#[derive(Debug)]
10pub struct EventMutIterator<'a, E: Event> {
11    iter: EventMutIteratorWithId<'a, E>,
12}
13
14impl<'a, E: Event> Iterator for EventMutIterator<'a, E> {
15    type Item = &'a mut E;
16    fn next(&mut self) -> Option<Self::Item> {
17        self.iter.next().map(|(event, _)| event)
18    }
19
20    fn size_hint(&self) -> (usize, Option<usize>) {
21        self.iter.size_hint()
22    }
23
24    fn count(self) -> usize {
25        self.iter.count()
26    }
27
28    fn last(self) -> Option<Self::Item>
29    where
30        Self: Sized,
31    {
32        self.iter.last().map(|(event, _)| event)
33    }
34
35    fn nth(&mut self, n: usize) -> Option<Self::Item> {
36        self.iter.nth(n).map(|(event, _)| event)
37    }
38}
39
40impl<'a, E: Event> ExactSizeIterator for EventMutIterator<'a, E> {
41    fn len(&self) -> usize {
42        self.iter.len()
43    }
44}
45
46/// An iterator that yields any unread events (and their IDs) from an [`EventMutator`] or [`EventCursor`].
47///
48/// [`EventMutator`]: super::EventMutator
49#[derive(Debug)]
50pub struct EventMutIteratorWithId<'a, E: Event> {
51    mutator: &'a mut EventCursor<E>,
52    chain: Chain<IterMut<'a, EventInstance<E>>, IterMut<'a, EventInstance<E>>>,
53    unread: usize,
54}
55
56impl<'a, E: Event> EventMutIteratorWithId<'a, E> {
57    /// Creates a new iterator that yields any `events` that have not yet been seen by `mutator`.
58    pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
59        let a_index = mutator
60            .last_event_count
61            .saturating_sub(events.events_a.start_event_count);
62        let b_index = mutator
63            .last_event_count
64            .saturating_sub(events.events_b.start_event_count);
65        let a = events.events_a.get_mut(a_index..).unwrap_or_default();
66        let b = events.events_b.get_mut(b_index..).unwrap_or_default();
67
68        let unread_count = a.len() + b.len();
69
70        mutator.last_event_count = events.event_count - unread_count;
71        // Iterate the oldest first, then the newer events
72        let chain = a.iter_mut().chain(b.iter_mut());
73
74        Self {
75            mutator,
76            chain,
77            unread: unread_count,
78        }
79    }
80
81    /// Iterate over only the events.
82    pub fn without_id(self) -> EventMutIterator<'a, E> {
83        EventMutIterator { iter: self }
84    }
85}
86
87impl<'a, E: Event> Iterator for EventMutIteratorWithId<'a, E> {
88    type Item = (&'a mut E, EventId<E>);
89    fn next(&mut self) -> Option<Self::Item> {
90        match self
91            .chain
92            .next()
93            .map(|instance| (&mut instance.event, instance.event_id))
94        {
95            Some(item) => {
96                #[cfg(feature = "detailed_trace")]
97                tracing::trace!("EventMutator::iter() -> {}", item.1);
98                self.mutator.last_event_count += 1;
99                self.unread -= 1;
100                Some(item)
101            }
102            None => None,
103        }
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        self.chain.size_hint()
108    }
109
110    fn count(self) -> usize {
111        self.mutator.last_event_count += self.unread;
112        self.unread
113    }
114
115    fn last(self) -> Option<Self::Item>
116    where
117        Self: Sized,
118    {
119        let EventInstance { event_id, event } = self.chain.last()?;
120        self.mutator.last_event_count += self.unread;
121        Some((event, *event_id))
122    }
123
124    fn nth(&mut self, n: usize) -> Option<Self::Item> {
125        if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
126            self.mutator.last_event_count += n + 1;
127            self.unread -= n + 1;
128            Some((event, *event_id))
129        } else {
130            self.mutator.last_event_count += self.unread;
131            self.unread = 0;
132            None
133        }
134    }
135}
136
137impl<'a, E: Event> ExactSizeIterator for EventMutIteratorWithId<'a, E> {
138    fn len(&self) -> usize {
139        self.unread
140    }
141}
142
143/// A parallel iterator over `Event`s.
144#[derive(Debug)]
145#[cfg(feature = "multi_threaded")]
146pub struct EventMutParIter<'a, E: Event> {
147    mutator: &'a mut EventCursor<E>,
148    slices: [&'a mut [EventInstance<E>]; 2],
149    batching_strategy: BatchingStrategy,
150    #[cfg(not(target_arch = "wasm32"))]
151    unread: usize,
152}
153
154#[cfg(feature = "multi_threaded")]
155impl<'a, E: Event> EventMutParIter<'a, E> {
156    /// Creates a new parallel iterator over `events` that have not yet been seen by `mutator`.
157    pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
158        let a_index = mutator
159            .last_event_count
160            .saturating_sub(events.events_a.start_event_count);
161        let b_index = mutator
162            .last_event_count
163            .saturating_sub(events.events_b.start_event_count);
164        let a = events.events_a.get_mut(a_index..).unwrap_or_default();
165        let b = events.events_b.get_mut(b_index..).unwrap_or_default();
166
167        let unread_count = a.len() + b.len();
168        mutator.last_event_count = events.event_count - unread_count;
169
170        Self {
171            mutator,
172            slices: [a, b],
173            batching_strategy: BatchingStrategy::default(),
174            #[cfg(not(target_arch = "wasm32"))]
175            unread: unread_count,
176        }
177    }
178
179    /// Changes the batching strategy used when iterating.
180    ///
181    /// For more information on how this affects the resultant iteration, see
182    /// [`BatchingStrategy`].
183    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
184        self.batching_strategy = strategy;
185        self
186    }
187
188    /// Runs the provided closure for each unread event in parallel.
189    ///
190    /// Unlike normal iteration, the event order is not guaranteed in any form.
191    ///
192    /// # Panics
193    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
194    /// initialized and run from the ECS scheduler, this should never panic.
195    ///
196    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
197    pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
198        self.for_each_with_id(move |e, _| func(e));
199    }
200
201    /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
202    /// but additionally provides the `EventId` to the closure.
203    ///
204    /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
205    ///
206    /// # Panics
207    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
208    /// initialized and run from the ECS scheduler, this should never panic.
209    ///
210    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
211    #[cfg_attr(
212        target_arch = "wasm32",
213        expect(unused_mut, reason = "not mutated on this target")
214    )]
215    pub fn for_each_with_id<FN: Fn(&'a mut E, EventId<E>) + Send + Sync + Clone>(
216        mut self,
217        func: FN,
218    ) {
219        #[cfg(target_arch = "wasm32")]
220        {
221            self.into_iter().for_each(|(e, i)| func(e, i));
222        }
223
224        #[cfg(not(target_arch = "wasm32"))]
225        {
226            let pool = bevy_tasks::ComputeTaskPool::get();
227            let thread_count = pool.thread_num();
228            if thread_count <= 1 {
229                return self.into_iter().for_each(|(e, i)| func(e, i));
230            }
231
232            let batch_size = self
233                .batching_strategy
234                .calc_batch_size(|| self.len(), thread_count);
235            let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
236
237            pool.scope(|scope| {
238                for batch in chunks.into_iter().flatten() {
239                    let func = func.clone();
240                    scope.spawn(async move {
241                        for event in batch {
242                            func(&mut event.event, event.event_id);
243                        }
244                    });
245                }
246            });
247
248            // Events are guaranteed to be read at this point.
249            self.mutator.last_event_count += self.unread;
250            self.unread = 0;
251        }
252    }
253
254    /// Returns the number of [`Event`]s to be iterated.
255    pub fn len(&self) -> usize {
256        self.slices.iter().map(|s| s.len()).sum()
257    }
258
259    /// Returns [`true`] if there are no events remaining in this iterator.
260    pub fn is_empty(&self) -> bool {
261        self.slices.iter().all(|x| x.is_empty())
262    }
263}
264
265#[cfg(feature = "multi_threaded")]
266impl<'a, E: Event> IntoIterator for EventMutParIter<'a, E> {
267    type IntoIter = EventMutIteratorWithId<'a, E>;
268    type Item = <Self::IntoIter as Iterator>::Item;
269
270    fn into_iter(self) -> Self::IntoIter {
271        let EventMutParIter {
272            mutator: reader,
273            slices: [a, b],
274            ..
275        } = self;
276        let unread = a.len() + b.len();
277        let chain = a.iter_mut().chain(b);
278        EventMutIteratorWithId {
279            mutator: reader,
280            chain,
281            unread,
282        }
283    }
284}