bevy_ecs/event/
iterators.rs1use crate as bevy_ecs;
2#[cfg(feature = "multi_threaded")]
3use bevy_ecs::batching::BatchingStrategy;
4use bevy_ecs::event::{Event, EventCursor, EventId, EventInstance, Events};
5use bevy_utils::detailed_trace;
6use core::{iter::Chain, slice::Iter};
7
8#[derive(Debug)]
10pub struct EventIterator<'a, E: Event> {
11 iter: EventIteratorWithId<'a, E>,
12}
13
14impl<'a, E: Event> Iterator for EventIterator<'a, E> {
15 type Item = &'a E;
16 fn next(&mut self) -> Option<Self::Item> {
17 self.iter.next().map(|(event, _)| event)
18 }
19
20 fn size_hint(&self) -> (usize, Option<usize>) {
21 self.iter.size_hint()
22 }
23
24 fn count(self) -> usize {
25 self.iter.count()
26 }
27
28 fn last(self) -> Option<Self::Item>
29 where
30 Self: Sized,
31 {
32 self.iter.last().map(|(event, _)| event)
33 }
34
35 fn nth(&mut self, n: usize) -> Option<Self::Item> {
36 self.iter.nth(n).map(|(event, _)| event)
37 }
38}
39
40impl<'a, E: Event> ExactSizeIterator for EventIterator<'a, E> {
41 fn len(&self) -> usize {
42 self.iter.len()
43 }
44}
45
46#[derive(Debug)]
48pub struct EventIteratorWithId<'a, E: Event> {
49 reader: &'a mut EventCursor<E>,
50 chain: Chain<Iter<'a, EventInstance<E>>, Iter<'a, EventInstance<E>>>,
51 unread: usize,
52}
53
54impl<'a, E: Event> EventIteratorWithId<'a, E> {
55 pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
57 let a_index = reader
58 .last_event_count
59 .saturating_sub(events.events_a.start_event_count);
60 let b_index = reader
61 .last_event_count
62 .saturating_sub(events.events_b.start_event_count);
63 let a = events.events_a.get(a_index..).unwrap_or_default();
64 let b = events.events_b.get(b_index..).unwrap_or_default();
65
66 let unread_count = a.len() + b.len();
67 debug_assert_eq!(unread_count, reader.len(events));
69 reader.last_event_count = events.event_count - unread_count;
70 let chain = a.iter().chain(b.iter());
72
73 Self {
74 reader,
75 chain,
76 unread: unread_count,
77 }
78 }
79
80 pub fn without_id(self) -> EventIterator<'a, E> {
82 EventIterator { iter: self }
83 }
84}
85
86impl<'a, E: Event> Iterator for EventIteratorWithId<'a, E> {
87 type Item = (&'a E, EventId<E>);
88 fn next(&mut self) -> Option<Self::Item> {
89 match self
90 .chain
91 .next()
92 .map(|instance| (&instance.event, instance.event_id))
93 {
94 Some(item) => {
95 detailed_trace!("EventReader::iter() -> {}", item.1);
96 self.reader.last_event_count += 1;
97 self.unread -= 1;
98 Some(item)
99 }
100 None => None,
101 }
102 }
103
104 fn size_hint(&self) -> (usize, Option<usize>) {
105 self.chain.size_hint()
106 }
107
108 fn count(self) -> usize {
109 self.reader.last_event_count += self.unread;
110 self.unread
111 }
112
113 fn last(self) -> Option<Self::Item>
114 where
115 Self: Sized,
116 {
117 let EventInstance { event_id, event } = self.chain.last()?;
118 self.reader.last_event_count += self.unread;
119 Some((event, *event_id))
120 }
121
122 fn nth(&mut self, n: usize) -> Option<Self::Item> {
123 if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
124 self.reader.last_event_count += n + 1;
125 self.unread -= n + 1;
126 Some((event, *event_id))
127 } else {
128 self.reader.last_event_count += self.unread;
129 self.unread = 0;
130 None
131 }
132 }
133}
134
135impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
136 fn len(&self) -> usize {
137 self.unread
138 }
139}
140
141#[cfg(feature = "multi_threaded")]
143#[derive(Debug)]
144pub struct EventParIter<'a, E: Event> {
145 reader: &'a mut EventCursor<E>,
146 slices: [&'a [EventInstance<E>]; 2],
147 batching_strategy: BatchingStrategy,
148 unread: usize,
149}
150
151#[cfg(feature = "multi_threaded")]
152impl<'a, E: Event> EventParIter<'a, E> {
153 pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
155 let a_index = reader
156 .last_event_count
157 .saturating_sub(events.events_a.start_event_count);
158 let b_index = reader
159 .last_event_count
160 .saturating_sub(events.events_b.start_event_count);
161 let a = events.events_a.get(a_index..).unwrap_or_default();
162 let b = events.events_b.get(b_index..).unwrap_or_default();
163
164 let unread_count = a.len() + b.len();
165 debug_assert_eq!(unread_count, reader.len(events));
167 reader.last_event_count = events.event_count - unread_count;
168
169 Self {
170 reader,
171 slices: [a, b],
172 batching_strategy: BatchingStrategy::default(),
173 unread: unread_count,
174 }
175 }
176
177 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
182 self.batching_strategy = strategy;
183 self
184 }
185
186 pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
196 self.for_each_with_id(move |e, _| func(e));
197 }
198
199 pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
210 #[cfg(target_arch = "wasm32")]
211 {
212 self.into_iter().for_each(|(e, i)| func(e, i));
213 }
214
215 #[cfg(not(target_arch = "wasm32"))]
216 {
217 let pool = bevy_tasks::ComputeTaskPool::get();
218 let thread_count = pool.thread_num();
219 if thread_count <= 1 {
220 return self.into_iter().for_each(|(e, i)| func(e, i));
221 }
222
223 let batch_size = self
224 .batching_strategy
225 .calc_batch_size(|| self.len(), thread_count);
226 let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
227 let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);
228
229 pool.scope(|scope| {
230 for batch in chunks.into_iter().flatten().chain(remainders) {
231 let func = func.clone();
232 scope.spawn(async move {
233 for event in batch {
234 func(&event.event, event.event_id);
235 }
236 });
237 }
238 });
239
240 self.reader.last_event_count += self.unread;
242 self.unread = 0;
243 }
244 }
245
246 pub fn len(&self) -> usize {
248 self.slices.iter().map(|s| s.len()).sum()
249 }
250
251 pub fn is_empty(&self) -> bool {
253 self.slices.iter().all(|x| x.is_empty())
254 }
255}
256
257#[cfg(feature = "multi_threaded")]
258impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
259 type IntoIter = EventIteratorWithId<'a, E>;
260 type Item = <Self::IntoIter as Iterator>::Item;
261
262 fn into_iter(self) -> Self::IntoIter {
263 let EventParIter {
264 reader,
265 slices: [a, b],
266 ..
267 } = self;
268 let unread = a.len() + b.len();
269 let chain = a.iter().chain(b);
270 EventIteratorWithId {
271 reader,
272 chain,
273 unread,
274 }
275 }
276}