bevy_ecs/event/
iterators.rs

1#[cfg(feature = "multi_threaded")]
2use bevy_ecs::batching::BatchingStrategy;
3use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
4use core::{iter::Chain, slice::Iter};
5
6/// An iterator that yields any unread events from an [`EventReader`](super::EventReader) or [`EventCursor`].
7#[derive(Debug)]
8pub struct EventIterator<'a, E: Event> {
9    iter: EventIteratorWithId<'a, E>,
10}
11
12impl<'a, E: Event> Iterator for EventIterator<'a, E> {
13    type Item = &'a E;
14    fn next(&mut self) -> Option<Self::Item> {
15        self.iter.next().map(|(event, _)| event)
16    }
17
18    fn size_hint(&self) -> (usize, Option<usize>) {
19        self.iter.size_hint()
20    }
21
22    fn count(self) -> usize {
23        self.iter.count()
24    }
25
26    fn last(self) -> Option<Self::Item>
27    where
28        Self: Sized,
29    {
30        self.iter.last().map(|(event, _)| event)
31    }
32
33    fn nth(&mut self, n: usize) -> Option<Self::Item> {
34        self.iter.nth(n).map(|(event, _)| event)
35    }
36}
37
38impl<'a, E: Event> ExactSizeIterator for EventIterator<'a, E> {
39    fn len(&self) -> usize {
40        self.iter.len()
41    }
42}
43
44/// An iterator that yields any unread events (and their IDs) from an [`EventReader`](super::EventReader) or [`EventCursor`].
45#[derive(Debug)]
46pub struct EventIteratorWithId<'a, E: Event> {
47    reader: &'a mut EventCursor<E>,
48    chain: Chain<Iter<'a, EventInstance<E>>, Iter<'a, EventInstance<E>>>,
49    unread: usize,
50}
51
52impl<'a, E: Event> EventIteratorWithId<'a, E> {
53    /// Creates a new iterator that yields any `events` that have not yet been seen by `reader`.
54    pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
55        let a_index = reader
56            .last_event_count
57            .saturating_sub(events.events_a.start_event_count);
58        let b_index = reader
59            .last_event_count
60            .saturating_sub(events.events_b.start_event_count);
61        let a = events.events_a.get(a_index..).unwrap_or_default();
62        let b = events.events_b.get(b_index..).unwrap_or_default();
63
64        let unread_count = a.len() + b.len();
65        // Ensure `len` is implemented correctly
66        debug_assert_eq!(unread_count, reader.len(events));
67        reader.last_event_count = events.event_count - unread_count;
68        // Iterate the oldest first, then the newer events
69        let chain = a.iter().chain(b.iter());
70
71        Self {
72            reader,
73            chain,
74            unread: unread_count,
75        }
76    }
77
78    /// Iterate over only the events.
79    pub fn without_id(self) -> EventIterator<'a, E> {
80        EventIterator { iter: self }
81    }
82}
83
84impl<'a, E: Event> Iterator for EventIteratorWithId<'a, E> {
85    type Item = (&'a E, EventId<E>);
86    fn next(&mut self) -> Option<Self::Item> {
87        match self
88            .chain
89            .next()
90            .map(|instance| (&instance.event, instance.event_id))
91        {
92            Some(item) => {
93                #[cfg(feature = "detailed_trace")]
94                tracing::trace!("EventReader::iter() -> {}", item.1);
95                self.reader.last_event_count += 1;
96                self.unread -= 1;
97                Some(item)
98            }
99            None => None,
100        }
101    }
102
103    fn size_hint(&self) -> (usize, Option<usize>) {
104        self.chain.size_hint()
105    }
106
107    fn count(self) -> usize {
108        self.reader.last_event_count += self.unread;
109        self.unread
110    }
111
112    fn last(self) -> Option<Self::Item>
113    where
114        Self: Sized,
115    {
116        let EventInstance { event_id, event } = self.chain.last()?;
117        self.reader.last_event_count += self.unread;
118        Some((event, *event_id))
119    }
120
121    fn nth(&mut self, n: usize) -> Option<Self::Item> {
122        if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
123            self.reader.last_event_count += n + 1;
124            self.unread -= n + 1;
125            Some((event, *event_id))
126        } else {
127            self.reader.last_event_count += self.unread;
128            self.unread = 0;
129            None
130        }
131    }
132}
133
134impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
135    fn len(&self) -> usize {
136        self.unread
137    }
138}
139
140/// A parallel iterator over `Event`s.
141#[cfg(feature = "multi_threaded")]
142#[derive(Debug)]
143pub struct EventParIter<'a, E: Event> {
144    reader: &'a mut EventCursor<E>,
145    slices: [&'a [EventInstance<E>]; 2],
146    batching_strategy: BatchingStrategy,
147    #[cfg(not(target_arch = "wasm32"))]
148    unread: usize,
149}
150
151#[cfg(feature = "multi_threaded")]
152impl<'a, E: Event> EventParIter<'a, E> {
153    /// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
154    pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
155        let a_index = reader
156            .last_event_count
157            .saturating_sub(events.events_a.start_event_count);
158        let b_index = reader
159            .last_event_count
160            .saturating_sub(events.events_b.start_event_count);
161        let a = events.events_a.get(a_index..).unwrap_or_default();
162        let b = events.events_b.get(b_index..).unwrap_or_default();
163
164        let unread_count = a.len() + b.len();
165        // Ensure `len` is implemented correctly
166        debug_assert_eq!(unread_count, reader.len(events));
167        reader.last_event_count = events.event_count - unread_count;
168
169        Self {
170            reader,
171            slices: [a, b],
172            batching_strategy: BatchingStrategy::default(),
173            #[cfg(not(target_arch = "wasm32"))]
174            unread: unread_count,
175        }
176    }
177
178    /// Changes the batching strategy used when iterating.
179    ///
180    /// For more information on how this affects the resultant iteration, see
181    /// [`BatchingStrategy`].
182    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
183        self.batching_strategy = strategy;
184        self
185    }
186
187    /// Runs the provided closure for each unread event in parallel.
188    ///
189    /// Unlike normal iteration, the event order is not guaranteed in any form.
190    ///
191    /// # Panics
192    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
193    /// initialized and run from the ECS scheduler, this should never panic.
194    ///
195    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
196    pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
197        self.for_each_with_id(move |e, _| func(e));
198    }
199
200    /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
201    /// but additionally provides the `EventId` to the closure.
202    ///
203    /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
204    ///
205    /// # Panics
206    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
207    /// initialized and run from the ECS scheduler, this should never panic.
208    ///
209    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
210    #[cfg_attr(
211        target_arch = "wasm32",
212        expect(unused_mut, reason = "not mutated on this target")
213    )]
214    pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
215        #[cfg(target_arch = "wasm32")]
216        {
217            self.into_iter().for_each(|(e, i)| func(e, i));
218        }
219
220        #[cfg(not(target_arch = "wasm32"))]
221        {
222            let pool = bevy_tasks::ComputeTaskPool::get();
223            let thread_count = pool.thread_num();
224            if thread_count <= 1 {
225                return self.into_iter().for_each(|(e, i)| func(e, i));
226            }
227
228            let batch_size = self
229                .batching_strategy
230                .calc_batch_size(|| self.len(), thread_count);
231            let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
232            let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);
233
234            pool.scope(|scope| {
235                for batch in chunks.into_iter().flatten().chain(remainders) {
236                    let func = func.clone();
237                    scope.spawn(async move {
238                        for event in batch {
239                            func(&event.event, event.event_id);
240                        }
241                    });
242                }
243            });
244
245            // Events are guaranteed to be read at this point.
246            self.reader.last_event_count += self.unread;
247            self.unread = 0;
248        }
249    }
250
251    /// Returns the number of [`Event`]s to be iterated.
252    pub fn len(&self) -> usize {
253        self.slices.iter().map(|s| s.len()).sum()
254    }
255
256    /// Returns [`true`] if there are no events remaining in this iterator.
257    pub fn is_empty(&self) -> bool {
258        self.slices.iter().all(|x| x.is_empty())
259    }
260}
261
262#[cfg(feature = "multi_threaded")]
263impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
264    type IntoIter = EventIteratorWithId<'a, E>;
265    type Item = <Self::IntoIter as Iterator>::Item;
266
267    fn into_iter(self) -> Self::IntoIter {
268        let EventParIter {
269            reader,
270            slices: [a, b],
271            ..
272        } = self;
273        let unread = a.len() + b.len();
274        let chain = a.iter().chain(b);
275        EventIteratorWithId {
276            reader,
277            chain,
278            unread,
279        }
280    }
281}