bevy_ecs/message/
mut_iterators.rs1#[cfg(feature = "multi_threaded")]
2use crate::batching::BatchingStrategy;
3use crate::message::{Message, MessageCursor, MessageId, MessageInstance, Messages};
4use core::{iter::Chain, slice::IterMut};
5
6#[derive(Debug)]
10pub struct MessageMutIterator<'a, E: Message> {
11 iter: MessageMutIteratorWithId<'a, E>,
12}
13
14impl<'a, E: Message> Iterator for MessageMutIterator<'a, E> {
15 type Item = &'a mut E;
16 fn next(&mut self) -> Option<Self::Item> {
17 self.iter.next().map(|(message, _)| message)
18 }
19
20 fn size_hint(&self) -> (usize, Option<usize>) {
21 self.iter.size_hint()
22 }
23
24 fn count(self) -> usize {
25 self.iter.count()
26 }
27
28 fn last(self) -> Option<Self::Item>
29 where
30 Self: Sized,
31 {
32 self.iter.last().map(|(message, _)| message)
33 }
34
35 fn nth(&mut self, n: usize) -> Option<Self::Item> {
36 self.iter.nth(n).map(|(message, _)| message)
37 }
38}
39
40impl<'a, E: Message> ExactSizeIterator for MessageMutIterator<'a, E> {
41 fn len(&self) -> usize {
42 self.iter.len()
43 }
44}
45
46#[derive(Debug)]
50pub struct MessageMutIteratorWithId<'a, E: Message> {
51 mutator: &'a mut MessageCursor<E>,
52 chain: Chain<IterMut<'a, MessageInstance<E>>, IterMut<'a, MessageInstance<E>>>,
53 unread: usize,
54}
55
56impl<'a, E: Message> MessageMutIteratorWithId<'a, E> {
57 pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
59 let a_index = mutator
60 .last_message_count
61 .saturating_sub(messages.messages_a.start_message_count);
62 let b_index = mutator
63 .last_message_count
64 .saturating_sub(messages.messages_b.start_message_count);
65 let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
66 let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
67
68 let unread_count = a.len() + b.len();
69
70 mutator.last_message_count = messages.message_count - unread_count;
71 let chain = a.iter_mut().chain(b.iter_mut());
73
74 Self {
75 mutator,
76 chain,
77 unread: unread_count,
78 }
79 }
80
81 pub fn without_id(self) -> MessageMutIterator<'a, E> {
83 MessageMutIterator { iter: self }
84 }
85}
86
87impl<'a, E: Message> Iterator for MessageMutIteratorWithId<'a, E> {
88 type Item = (&'a mut E, MessageId<E>);
89 fn next(&mut self) -> Option<Self::Item> {
90 match self
91 .chain
92 .next()
93 .map(|instance| (&mut instance.message, instance.message_id))
94 {
95 Some(item) => {
96 #[cfg(feature = "detailed_trace")]
97 tracing::trace!("MessageMutator::iter() -> {}", item.1);
98 self.mutator.last_message_count += 1;
99 self.unread -= 1;
100 Some(item)
101 }
102 None => None,
103 }
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 self.chain.size_hint()
108 }
109
110 fn count(self) -> usize {
111 self.mutator.last_message_count += self.unread;
112 self.unread
113 }
114
115 fn last(self) -> Option<Self::Item>
116 where
117 Self: Sized,
118 {
119 let MessageInstance {
120 message_id,
121 message,
122 } = self.chain.last()?;
123 self.mutator.last_message_count += self.unread;
124 Some((message, *message_id))
125 }
126
127 fn nth(&mut self, n: usize) -> Option<Self::Item> {
128 if let Some(MessageInstance {
129 message_id,
130 message,
131 }) = self.chain.nth(n)
132 {
133 self.mutator.last_message_count += n + 1;
134 self.unread -= n + 1;
135 Some((message, *message_id))
136 } else {
137 self.mutator.last_message_count += self.unread;
138 self.unread = 0;
139 None
140 }
141 }
142}
143
144impl<'a, E: Message> ExactSizeIterator for MessageMutIteratorWithId<'a, E> {
145 fn len(&self) -> usize {
146 self.unread
147 }
148}
149
150#[derive(Debug)]
152#[cfg(feature = "multi_threaded")]
153pub struct MessageMutParIter<'a, E: Message> {
154 mutator: &'a mut MessageCursor<E>,
155 slices: [&'a mut [MessageInstance<E>]; 2],
156 batching_strategy: BatchingStrategy,
157 #[cfg(not(target_arch = "wasm32"))]
158 unread: usize,
159}
160
161#[cfg(feature = "multi_threaded")]
162impl<'a, E: Message> MessageMutParIter<'a, E> {
163 pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
165 let a_index = mutator
166 .last_message_count
167 .saturating_sub(messages.messages_a.start_message_count);
168 let b_index = mutator
169 .last_message_count
170 .saturating_sub(messages.messages_b.start_message_count);
171 let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
172 let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
173
174 let unread_count = a.len() + b.len();
175 mutator.last_message_count = messages.message_count - unread_count;
176
177 Self {
178 mutator,
179 slices: [a, b],
180 batching_strategy: BatchingStrategy::default(),
181 #[cfg(not(target_arch = "wasm32"))]
182 unread: unread_count,
183 }
184 }
185
186 pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
191 self.batching_strategy = strategy;
192 self
193 }
194
195 pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
205 self.for_each_with_id(move |e, _| func(e));
206 }
207
208 #[cfg_attr(
219 target_arch = "wasm32",
220 expect(unused_mut, reason = "not mutated on this target")
221 )]
222 pub fn for_each_with_id<FN: Fn(&'a mut E, MessageId<E>) + Send + Sync + Clone>(
223 mut self,
224 func: FN,
225 ) {
226 #[cfg(target_arch = "wasm32")]
227 {
228 self.into_iter().for_each(|(e, i)| func(e, i));
229 }
230
231 #[cfg(not(target_arch = "wasm32"))]
232 {
233 let pool = bevy_tasks::ComputeTaskPool::get();
234 let thread_count = pool.thread_num();
235 if thread_count <= 1 {
236 return self.into_iter().for_each(|(e, i)| func(e, i));
237 }
238
239 let batch_size = self
240 .batching_strategy
241 .calc_batch_size(|| self.len(), thread_count);
242 let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
243
244 pool.scope(|scope| {
245 for batch in chunks.into_iter().flatten() {
246 let func = func.clone();
247 scope.spawn(async move {
248 for message_instance in batch {
249 func(&mut message_instance.message, message_instance.message_id);
250 }
251 });
252 }
253 });
254
255 self.mutator.last_message_count += self.unread;
257 self.unread = 0;
258 }
259 }
260
261 pub fn len(&self) -> usize {
263 self.slices.iter().map(|s| s.len()).sum()
264 }
265
266 pub fn is_empty(&self) -> bool {
268 self.slices.iter().all(|x| x.is_empty())
269 }
270}
271
272#[cfg(feature = "multi_threaded")]
273impl<'a, E: Message> IntoIterator for MessageMutParIter<'a, E> {
274 type IntoIter = MessageMutIteratorWithId<'a, E>;
275 type Item = <Self::IntoIter as Iterator>::Item;
276
277 fn into_iter(self) -> Self::IntoIter {
278 let MessageMutParIter {
279 mutator: reader,
280 slices: [a, b],
281 ..
282 } = self;
283 let unread = a.len() + b.len();
284 let chain = a.iter_mut().chain(b);
285 MessageMutIteratorWithId {
286 mutator: reader,
287 chain,
288 unread,
289 }
290 }
291}