bevy_ecs/event/
iterators.rs

1use crate as bevy_ecs;
2#[cfg(feature = "multi_threaded")]
3use bevy_ecs::batching::BatchingStrategy;
4use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
5use bevy_utils::detailed_trace;
6use core::{iter::Chain, slice::Iter};
7
8/// An iterator that yields any unread events from an [`EventReader`](super::EventReader) or [`EventCursor`].
9#[derive(Debug)]
10pub struct EventIterator<'a, E: Event> {
11    iter: EventIteratorWithId<'a, E>,
12}
13
14impl<'a, E: Event> Iterator for EventIterator<'a, E> {
15    type Item = &'a E;
16    fn next(&mut self) -> Option<Self::Item> {
17        self.iter.next().map(|(event, _)| event)
18    }
19
20    fn size_hint(&self) -> (usize, Option<usize>) {
21        self.iter.size_hint()
22    }
23
24    fn count(self) -> usize {
25        self.iter.count()
26    }
27
28    fn last(self) -> Option<Self::Item>
29    where
30        Self: Sized,
31    {
32        self.iter.last().map(|(event, _)| event)
33    }
34
35    fn nth(&mut self, n: usize) -> Option<Self::Item> {
36        self.iter.nth(n).map(|(event, _)| event)
37    }
38}
39
40impl<'a, E: Event> ExactSizeIterator for EventIterator<'a, E> {
41    fn len(&self) -> usize {
42        self.iter.len()
43    }
44}
45
46/// An iterator that yields any unread events (and their IDs) from an [`EventReader`](super::EventReader) or [`EventCursor`].
47#[derive(Debug)]
48pub struct EventIteratorWithId<'a, E: Event> {
49    reader: &'a mut EventCursor<E>,
50    chain: Chain<Iter<'a, EventInstance<E>>, Iter<'a, EventInstance<E>>>,
51    unread: usize,
52}
53
54impl<'a, E: Event> EventIteratorWithId<'a, E> {
55    /// Creates a new iterator that yields any `events` that have not yet been seen by `reader`.
56    pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
57        let a_index = reader
58            .last_event_count
59            .saturating_sub(events.events_a.start_event_count);
60        let b_index = reader
61            .last_event_count
62            .saturating_sub(events.events_b.start_event_count);
63        let a = events.events_a.get(a_index..).unwrap_or_default();
64        let b = events.events_b.get(b_index..).unwrap_or_default();
65
66        let unread_count = a.len() + b.len();
67        // Ensure `len` is implemented correctly
68        debug_assert_eq!(unread_count, reader.len(events));
69        reader.last_event_count = events.event_count - unread_count;
70        // Iterate the oldest first, then the newer events
71        let chain = a.iter().chain(b.iter());
72
73        Self {
74            reader,
75            chain,
76            unread: unread_count,
77        }
78    }
79
80    /// Iterate over only the events.
81    pub fn without_id(self) -> EventIterator<'a, E> {
82        EventIterator { iter: self }
83    }
84}
85
86impl<'a, E: Event> Iterator for EventIteratorWithId<'a, E> {
87    type Item = (&'a E, EventId<E>);
88    fn next(&mut self) -> Option<Self::Item> {
89        match self
90            .chain
91            .next()
92            .map(|instance| (&instance.event, instance.event_id))
93        {
94            Some(item) => {
95                detailed_trace!("EventReader::iter() -> {}", item.1);
96                self.reader.last_event_count += 1;
97                self.unread -= 1;
98                Some(item)
99            }
100            None => None,
101        }
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        self.chain.size_hint()
106    }
107
108    fn count(self) -> usize {
109        self.reader.last_event_count += self.unread;
110        self.unread
111    }
112
113    fn last(self) -> Option<Self::Item>
114    where
115        Self: Sized,
116    {
117        let EventInstance { event_id, event } = self.chain.last()?;
118        self.reader.last_event_count += self.unread;
119        Some((event, *event_id))
120    }
121
122    fn nth(&mut self, n: usize) -> Option<Self::Item> {
123        if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
124            self.reader.last_event_count += n + 1;
125            self.unread -= n + 1;
126            Some((event, *event_id))
127        } else {
128            self.reader.last_event_count += self.unread;
129            self.unread = 0;
130            None
131        }
132    }
133}
134
135impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
136    fn len(&self) -> usize {
137        self.unread
138    }
139}
140
141/// A parallel iterator over `Event`s.
142#[cfg(feature = "multi_threaded")]
143#[derive(Debug)]
144pub struct EventParIter<'a, E: Event> {
145    reader: &'a mut EventCursor<E>,
146    slices: [&'a [EventInstance<E>]; 2],
147    batching_strategy: BatchingStrategy,
148    unread: usize,
149}
150
151#[cfg(feature = "multi_threaded")]
152impl<'a, E: Event> EventParIter<'a, E> {
153    /// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
154    pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
155        let a_index = reader
156            .last_event_count
157            .saturating_sub(events.events_a.start_event_count);
158        let b_index = reader
159            .last_event_count
160            .saturating_sub(events.events_b.start_event_count);
161        let a = events.events_a.get(a_index..).unwrap_or_default();
162        let b = events.events_b.get(b_index..).unwrap_or_default();
163
164        let unread_count = a.len() + b.len();
165        // Ensure `len` is implemented correctly
166        debug_assert_eq!(unread_count, reader.len(events));
167        reader.last_event_count = events.event_count - unread_count;
168
169        Self {
170            reader,
171            slices: [a, b],
172            batching_strategy: BatchingStrategy::default(),
173            unread: unread_count,
174        }
175    }
176
177    /// Changes the batching strategy used when iterating.
178    ///
179    /// For more information on how this affects the resultant iteration, see
180    /// [`BatchingStrategy`].
181    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
182        self.batching_strategy = strategy;
183        self
184    }
185
186    /// Runs the provided closure for each unread event in parallel.
187    ///
188    /// Unlike normal iteration, the event order is not guaranteed in any form.
189    ///
190    /// # Panics
191    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
192    /// initialized and run from the ECS scheduler, this should never panic.
193    ///
194    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
195    pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
196        self.for_each_with_id(move |e, _| func(e));
197    }
198
199    /// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
200    /// but additionally provides the `EventId` to the closure.
201    ///
202    /// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
203    ///
204    /// # Panics
205    /// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
206    /// initialized and run from the ECS scheduler, this should never panic.
207    ///
208    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
209    pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
210        #[cfg(target_arch = "wasm32")]
211        {
212            self.into_iter().for_each(|(e, i)| func(e, i));
213        }
214
215        #[cfg(not(target_arch = "wasm32"))]
216        {
217            let pool = bevy_tasks::ComputeTaskPool::get();
218            let thread_count = pool.thread_num();
219            if thread_count <= 1 {
220                return self.into_iter().for_each(|(e, i)| func(e, i));
221            }
222
223            let batch_size = self
224                .batching_strategy
225                .calc_batch_size(|| self.len(), thread_count);
226            let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
227            let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);
228
229            pool.scope(|scope| {
230                for batch in chunks.into_iter().flatten().chain(remainders) {
231                    let func = func.clone();
232                    scope.spawn(async move {
233                        for event in batch {
234                            func(&event.event, event.event_id);
235                        }
236                    });
237                }
238            });
239
240            // Events are guaranteed to be read at this point.
241            self.reader.last_event_count += self.unread;
242            self.unread = 0;
243        }
244    }
245
246    /// Returns the number of [`Event`]s to be iterated.
247    pub fn len(&self) -> usize {
248        self.slices.iter().map(|s| s.len()).sum()
249    }
250
251    /// Returns [`true`] if there are no events remaining in this iterator.
252    pub fn is_empty(&self) -> bool {
253        self.slices.iter().all(|x| x.is_empty())
254    }
255}
256
257#[cfg(feature = "multi_threaded")]
258impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
259    type IntoIter = EventIteratorWithId<'a, E>;
260    type Item = <Self::IntoIter as Iterator>::Item;
261
262    fn into_iter(self) -> Self::IntoIter {
263        let EventParIter {
264            reader,
265            slices: [a, b],
266            ..
267        } = self;
268        let unread = a.len() + b.len();
269        let chain = a.iter().chain(b);
270        EventIteratorWithId {
271            reader,
272            chain,
273            unread,
274        }
275    }
276}