bevy_ecs/event/
mut_iterators.rs

1use crate as bevy_ecs;
2#[cfg(feature = "multi_threaded")]
3use bevy_ecs::batching::BatchingStrategy;
4use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
5use bevy_utils::detailed_trace;
6use core::{iter::Chain, slice::IterMut};
7
8/// An iterator that yields any unread events from an [`EventMutator`] or [`EventCursor`].
9///
10/// [`EventMutator`]: super::EventMutator
11#[derive(Debug)]
12pub struct EventMutIterator<'a, E: Event> {
13    iter: EventMutIteratorWithId<'a, E>,
14}
15
16impl<'a, E: Event> Iterator for EventMutIterator<'a, E> {
17    type Item = &'a mut E;
18    fn next(&mut self) -> Option<Self::Item> {
19        self.iter.next().map(|(event, _)| event)
20    }
21
22    fn size_hint(&self) -> (usize, Option<usize>) {
23        self.iter.size_hint()
24    }
25
26    fn count(self) -> usize {
27        self.iter.count()
28    }
29
30    fn last(self) -> Option<Self::Item>
31    where
32        Self: Sized,
33    {
34        self.iter.last().map(|(event, _)| event)
35    }
36
37    fn nth(&mut self, n: usize) -> Option<Self::Item> {
38        self.iter.nth(n).map(|(event, _)| event)
39    }
40}
41
42impl<'a, E: Event> ExactSizeIterator for EventMutIterator<'a, E> {
43    fn len(&self) -> usize {
44        self.iter.len()
45    }
46}
47
48/// An iterator that yields any unread events (and their IDs) from an [`EventMutator`] or [`EventCursor`].
49///
50/// [`EventMutator`]: super::EventMutator
51#[derive(Debug)]
52pub struct EventMutIteratorWithId<'a, E: Event> {
53    mutator: &'a mut EventCursor<E>,
54    chain: Chain<IterMut<'a, EventInstance<E>>, IterMut<'a, EventInstance<E>>>,
55    unread: usize,
56}
57
58impl<'a, E: Event> EventMutIteratorWithId<'a, E> {
59    /// Creates a new iterator that yields any `events` that have not yet been seen by `mutator`.
60    pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
61        let a_index = mutator
62            .last_event_count
63            .saturating_sub(events.events_a.start_event_count);
64        let b_index = mutator
65            .last_event_count
66            .saturating_sub(events.events_b.start_event_count);
67        let a = events.events_a.get_mut(a_index..).unwrap_or_default();
68        let b = events.events_b.get_mut(b_index..).unwrap_or_default();
69
70        let unread_count = a.len() + b.len();
71
72        mutator.last_event_count = events.event_count - unread_count;
73        // Iterate the oldest first, then the newer events
74        let chain = a.iter_mut().chain(b.iter_mut());
75
76        Self {
77            mutator,
78            chain,
79            unread: unread_count,
80        }
81    }
82
83    /// Iterate over only the events.
84    pub fn without_id(self) -> EventMutIterator<'a, E> {
85        EventMutIterator { iter: self }
86    }
87}
88
89impl<'a, E: Event> Iterator for EventMutIteratorWithId<'a, E> {
90    type Item = (&'a mut E, EventId<E>);
91    fn next(&mut self) -> Option<Self::Item> {
92        match self
93            .chain
94            .next()
95            .map(|instance| (&mut instance.event, instance.event_id))
96        {
97            Some(item) => {
98                detailed_trace!("EventMutator::iter() -> {}", item.1);
99                self.mutator.last_event_count += 1;
100                self.unread -= 1;
101                Some(item)
102            }
103            None => None,
104        }
105    }
106
107    fn size_hint(&self) -> (usize, Option<usize>) {
108        self.chain.size_hint()
109    }
110
111    fn count(self) -> usize {
112        self.mutator.last_event_count += self.unread;
113        self.unread
114    }
115
116    fn last(self) -> Option<Self::Item>
117    where
118        Self: Sized,
119    {
120        let EventInstance { event_id, event } = self.chain.last()?;
121        self.mutator.last_event_count += self.unread;
122        Some((event, *event_id))
123    }
124
125    fn nth(&mut self, n: usize) -> Option<Self::Item> {
126        if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
127            self.mutator.last_event_count += n + 1;
128            self.unread -= n + 1;
129            Some((event, *event_id))
130        } else {
131            self.mutator.last_event_count += self.unread;
132            self.unread = 0;
133            None
134        }
135    }
136}
137
138impl<'a, E: Event> ExactSizeIterator for EventMutIteratorWithId<'a, E> {
139    fn len(&self) -> usize {
140        self.unread
141    }
142}
143
144/// A parallel iterator over `Event`s.
145#[derive(Debug)]
146#[cfg(feature = "multi_threaded")]
147pub struct EventMutParIter<'a, E: Event> {
148    mutator: &'a mut EventCursor<E>,
149    slices: [&'a mut [EventInstance<E>]; 2],
150    batching_strategy: BatchingStrategy,
151    unread: usize,
152}
153
154#[cfg(feature = "multi_threaded")]
155impl<'a, E: Event> EventMutParIter<'a, E> {
156    /// Creates a new parallel iterator over `events` that have not yet been seen by `mutator`.
157    pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
158        let a_index = mutator
159            .last_event_count
160            .saturating_sub(events.events_a.start_event_count);
161        let b_index = mutator
162            .last_event_count
163            .saturating_sub(events.events_b.start_event_count);
164        let a = events.events_a.get_mut(a_index..).unwrap_or_default();
165        let b = events.events_b.get_mut(b_index..).unwrap_or_default();
166
167        let unread_count = a.len() + b.len();
168        mutator.last_event_count = events.event_count - unread_count;
169
170        Self {
171            mutator,
172            slices: [a, b],
173            batching_strategy: BatchingStrategy::default(),
174            unread: unread_count,
175        }
176    }
177
178    /// Changes the batching strategy used when iterating.
179    ///
180    /// For more information on how this affects the resultant iteration, see
181    /// [`BatchingStrategy`].
182    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
183        self.batching_strategy = strategy;
184        self
185    }
186
187    /// Runs the provided closure for each unread event in parallel.
188    ///
189    /// Unlike normal iteration, the event order is not guaranteed in any form.
190    ///
191    /// # Panics
192    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
193    /// initialized and run from the ECS scheduler, this should never panic.
194    ///
195    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
196    pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
197        self.for_each_with_id(move |e, _| func(e));
198    }
199
200    /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
201    /// but additionally provides the `EventId` to the closure.
202    ///
203    /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
204    ///
205    /// # Panics
206    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
207    /// initialized and run from the ECS scheduler, this should never panic.
208    ///
209    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
210    pub fn for_each_with_id<FN: Fn(&'a mut E, EventId<E>) + Send + Sync + Clone>(
211        mut self,
212        func: FN,
213    ) {
214        #[cfg(target_arch = "wasm32")]
215        {
216            self.into_iter().for_each(|(e, i)| func(e, i));
217        }
218
219        #[cfg(not(target_arch = "wasm32"))]
220        {
221            let pool = bevy_tasks::ComputeTaskPool::get();
222            let thread_count = pool.thread_num();
223            if thread_count <= 1 {
224                return self.into_iter().for_each(|(e, i)| func(e, i));
225            }
226
227            let batch_size = self
228                .batching_strategy
229                .calc_batch_size(|| self.len(), thread_count);
230            let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
231
232            pool.scope(|scope| {
233                for batch in chunks.into_iter().flatten() {
234                    let func = func.clone();
235                    scope.spawn(async move {
236                        for event in batch {
237                            func(&mut event.event, event.event_id);
238                        }
239                    });
240                }
241            });
242
243            // Events are guaranteed to be read at this point.
244            self.mutator.last_event_count += self.unread;
245            self.unread = 0;
246        }
247    }
248
249    /// Returns the number of [`Event`]s to be iterated.
250    pub fn len(&self) -> usize {
251        self.slices.iter().map(|s| s.len()).sum()
252    }
253
254    /// Returns [`true`] if there are no events remaining in this iterator.
255    pub fn is_empty(&self) -> bool {
256        self.slices.iter().all(|x| x.is_empty())
257    }
258}
259
260#[cfg(feature = "multi_threaded")]
261impl<'a, E: Event> IntoIterator for EventMutParIter<'a, E> {
262    type IntoIter = EventMutIteratorWithId<'a, E>;
263    type Item = <Self::IntoIter as Iterator>::Item;
264
265    fn into_iter(self) -> Self::IntoIter {
266        let EventMutParIter {
267            mutator: reader,
268            slices: [a, b],
269            ..
270        } = self;
271        let unread = a.len() + b.len();
272        let chain = a.iter_mut().chain(b);
273        EventMutIteratorWithId {
274            mutator: reader,
275            chain,
276            unread,
277        }
278    }
279}