bevy_ecs/event/
iterators.rs1#[cfg(feature = "multi_threaded")]
2use bevy_ecs::batching::BatchingStrategy;
3use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
4use core::{iter::Chain, slice::Iter};
5
6#[derive(Debug)]
8pub struct EventIterator<'a, E: Event> {
9 iter: EventIteratorWithId<'a, E>,
10}
11
12impl<'a, E: Event> Iterator for EventIterator<'a, E> {
13 type Item = &'a E;
14 fn next(&mut self) -> Option<Self::Item> {
15 self.iter.next().map(|(event, _)| event)
16 }
17
18 fn size_hint(&self) -> (usize, Option<usize>) {
19 self.iter.size_hint()
20 }
21
22 fn count(self) -> usize {
23 self.iter.count()
24 }
25
26 fn last(self) -> Option<Self::Item>
27 where
28 Self: Sized,
29 {
30 self.iter.last().map(|(event, _)| event)
31 }
32
33 fn nth(&mut self, n: usize) -> Option<Self::Item> {
34 self.iter.nth(n).map(|(event, _)| event)
35 }
36}
37
38impl<'a, E: Event> ExactSizeIterator for EventIterator<'a, E> {
39 fn len(&self) -> usize {
40 self.iter.len()
41 }
42}
43
44#[derive(Debug)]
46pub struct EventIteratorWithId<'a, E: Event> {
47 reader: &'a mut EventCursor<E>,
48 chain: Chain<Iter<'a, EventInstance<E>>, Iter<'a, EventInstance<E>>>,
49 unread: usize,
50}
51
52impl<'a, E: Event> EventIteratorWithId<'a, E> {
53 pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
55 let a_index = reader
56 .last_event_count
57 .saturating_sub(events.events_a.start_event_count);
58 let b_index = reader
59 .last_event_count
60 .saturating_sub(events.events_b.start_event_count);
61 let a = events.events_a.get(a_index..).unwrap_or_default();
62 let b = events.events_b.get(b_index..).unwrap_or_default();
63
64 let unread_count = a.len() + b.len();
65 debug_assert_eq!(unread_count, reader.len(events));
67 reader.last_event_count = events.event_count - unread_count;
68 let chain = a.iter().chain(b.iter());
70
71 Self {
72 reader,
73 chain,
74 unread: unread_count,
75 }
76 }
77
78 pub fn without_id(self) -> EventIterator<'a, E> {
80 EventIterator { iter: self }
81 }
82}
83
84impl<'a, E: Event> Iterator for EventIteratorWithId<'a, E> {
85 type Item = (&'a E, EventId<E>);
86 fn next(&mut self) -> Option<Self::Item> {
87 match self
88 .chain
89 .next()
90 .map(|instance| (&instance.event, instance.event_id))
91 {
92 Some(item) => {
93 #[cfg(feature = "detailed_trace")]
94 tracing::trace!("EventReader::iter() -> {}", item.1);
95 self.reader.last_event_count += 1;
96 self.unread -= 1;
97 Some(item)
98 }
99 None => None,
100 }
101 }
102
103 fn size_hint(&self) -> (usize, Option<usize>) {
104 self.chain.size_hint()
105 }
106
107 fn count(self) -> usize {
108 self.reader.last_event_count += self.unread;
109 self.unread
110 }
111
112 fn last(self) -> Option<Self::Item>
113 where
114 Self: Sized,
115 {
116 let EventInstance { event_id, event } = self.chain.last()?;
117 self.reader.last_event_count += self.unread;
118 Some((event, *event_id))
119 }
120
121 fn nth(&mut self, n: usize) -> Option<Self::Item> {
122 if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
123 self.reader.last_event_count += n + 1;
124 self.unread -= n + 1;
125 Some((event, *event_id))
126 } else {
127 self.reader.last_event_count += self.unread;
128 self.unread = 0;
129 None
130 }
131 }
132}
133
134impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
135 fn len(&self) -> usize {
136 self.unread
137 }
138}
139
140#[cfg(feature = "multi_threaded")]
142#[derive(Debug)]
143pub struct EventParIter<'a, E: Event> {
144 reader: &'a mut EventCursor<E>,
145 slices: [&'a [EventInstance<E>]; 2],
146 batching_strategy: BatchingStrategy,
147 #[cfg(not(target_arch = "wasm32"))]
148 unread: usize,
149}
150
151#[cfg(feature = "multi_threaded")]
152impl<'a, E: Event> EventParIter<'a, E> {
153 pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
155 let a_index = reader
156 .last_event_count
157 .saturating_sub(events.events_a.start_event_count);
158 let b_index = reader
159 .last_event_count
160 .saturating_sub(events.events_b.start_event_count);
161 let a = events.events_a.get(a_index..).unwrap_or_default();
162 let b = events.events_b.get(b_index..).unwrap_or_default();
163
164 let unread_count = a.len() + b.len();
165 debug_assert_eq!(unread_count, reader.len(events));
167 reader.last_event_count = events.event_count - unread_count;
168
169 Self {
170 reader,
171 slices: [a, b],
172 batching_strategy: BatchingStrategy::default(),
173 #[cfg(not(target_arch = "wasm32"))]
174 unread: unread_count,
175 }
176 }
177
178 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
183 self.batching_strategy = strategy;
184 self
185 }
186
187 pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
197 self.for_each_with_id(move |e, _| func(e));
198 }
199
200 #[cfg_attr(
211 target_arch = "wasm32",
212 expect(unused_mut, reason = "not mutated on this target")
213 )]
214 pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
215 #[cfg(target_arch = "wasm32")]
216 {
217 self.into_iter().for_each(|(e, i)| func(e, i));
218 }
219
220 #[cfg(not(target_arch = "wasm32"))]
221 {
222 let pool = bevy_tasks::ComputeTaskPool::get();
223 let thread_count = pool.thread_num();
224 if thread_count <= 1 {
225 return self.into_iter().for_each(|(e, i)| func(e, i));
226 }
227
228 let batch_size = self
229 .batching_strategy
230 .calc_batch_size(|| self.len(), thread_count);
231 let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
232 let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);
233
234 pool.scope(|scope| {
235 for batch in chunks.into_iter().flatten().chain(remainders) {
236 let func = func.clone();
237 scope.spawn(async move {
238 for event in batch {
239 func(&event.event, event.event_id);
240 }
241 });
242 }
243 });
244
245 self.reader.last_event_count += self.unread;
247 self.unread = 0;
248 }
249 }
250
251 pub fn len(&self) -> usize {
253 self.slices.iter().map(|s| s.len()).sum()
254 }
255
256 pub fn is_empty(&self) -> bool {
258 self.slices.iter().all(|x| x.is_empty())
259 }
260}
261
262#[cfg(feature = "multi_threaded")]
263impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
264 type IntoIter = EventIteratorWithId<'a, E>;
265 type Item = <Self::IntoIter as Iterator>::Item;
266
267 fn into_iter(self) -> Self::IntoIter {
268 let EventParIter {
269 reader,
270 slices: [a, b],
271 ..
272 } = self;
273 let unread = a.len() + b.len();
274 let chain = a.iter().chain(b);
275 EventIteratorWithId {
276 reader,
277 chain,
278 unread,
279 }
280 }
281}