bevy_ecs/message/
mut_iterators.rs

1#[cfg(feature = "multi_threaded")]
2use crate::batching::BatchingStrategy;
3use crate::message::{Message, MessageCursor, MessageId, MessageInstance, Messages};
4use core::{iter::Chain, slice::IterMut};
5
6/// An iterator that yields any unread messages from an [`MessageMutator`] or [`MessageCursor`].
7///
8/// [`MessageMutator`]: super::MessageMutator
9#[derive(Debug)]
10pub struct MessageMutIterator<'a, E: Message> {
11    iter: MessageMutIteratorWithId<'a, E>,
12}
13
14impl<'a, E: Message> Iterator for MessageMutIterator<'a, E> {
15    type Item = &'a mut E;
16    fn next(&mut self) -> Option<Self::Item> {
17        self.iter.next().map(|(message, _)| message)
18    }
19
20    fn size_hint(&self) -> (usize, Option<usize>) {
21        self.iter.size_hint()
22    }
23
24    fn count(self) -> usize {
25        self.iter.count()
26    }
27
28    fn last(self) -> Option<Self::Item>
29    where
30        Self: Sized,
31    {
32        self.iter.last().map(|(message, _)| message)
33    }
34
35    fn nth(&mut self, n: usize) -> Option<Self::Item> {
36        self.iter.nth(n).map(|(message, _)| message)
37    }
38}
39
40impl<'a, E: Message> ExactSizeIterator for MessageMutIterator<'a, E> {
41    fn len(&self) -> usize {
42        self.iter.len()
43    }
44}
45
46/// An iterator that yields any unread messages (and their IDs) from an [`MessageMutator`] or [`MessageCursor`].
47///
48/// [`MessageMutator`]: super::MessageMutator
49#[derive(Debug)]
50pub struct MessageMutIteratorWithId<'a, E: Message> {
51    mutator: &'a mut MessageCursor<E>,
52    chain: Chain<IterMut<'a, MessageInstance<E>>, IterMut<'a, MessageInstance<E>>>,
53    unread: usize,
54}
55
56impl<'a, E: Message> MessageMutIteratorWithId<'a, E> {
57    /// Creates a new iterator that yields any `messages` that have not yet been seen by `mutator`.
58    pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
59        let a_index = mutator
60            .last_message_count
61            .saturating_sub(messages.messages_a.start_message_count);
62        let b_index = mutator
63            .last_message_count
64            .saturating_sub(messages.messages_b.start_message_count);
65        let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
66        let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
67
68        let unread_count = a.len() + b.len();
69
70        mutator.last_message_count = messages.message_count - unread_count;
71        // Iterate the oldest first, then the newer messages
72        let chain = a.iter_mut().chain(b.iter_mut());
73
74        Self {
75            mutator,
76            chain,
77            unread: unread_count,
78        }
79    }
80
81    /// Iterate over only the messages.
82    pub fn without_id(self) -> MessageMutIterator<'a, E> {
83        MessageMutIterator { iter: self }
84    }
85}
86
87impl<'a, E: Message> Iterator for MessageMutIteratorWithId<'a, E> {
88    type Item = (&'a mut E, MessageId<E>);
89    fn next(&mut self) -> Option<Self::Item> {
90        match self
91            .chain
92            .next()
93            .map(|instance| (&mut instance.message, instance.message_id))
94        {
95            Some(item) => {
96                #[cfg(feature = "detailed_trace")]
97                tracing::trace!("MessageMutator::iter() -> {}", item.1);
98                self.mutator.last_message_count += 1;
99                self.unread -= 1;
100                Some(item)
101            }
102            None => None,
103        }
104    }
105
106    fn size_hint(&self) -> (usize, Option<usize>) {
107        self.chain.size_hint()
108    }
109
110    fn count(self) -> usize {
111        self.mutator.last_message_count += self.unread;
112        self.unread
113    }
114
115    fn last(self) -> Option<Self::Item>
116    where
117        Self: Sized,
118    {
119        let MessageInstance {
120            message_id,
121            message,
122        } = self.chain.last()?;
123        self.mutator.last_message_count += self.unread;
124        Some((message, *message_id))
125    }
126
127    fn nth(&mut self, n: usize) -> Option<Self::Item> {
128        if let Some(MessageInstance {
129            message_id,
130            message,
131        }) = self.chain.nth(n)
132        {
133            self.mutator.last_message_count += n + 1;
134            self.unread -= n + 1;
135            Some((message, *message_id))
136        } else {
137            self.mutator.last_message_count += self.unread;
138            self.unread = 0;
139            None
140        }
141    }
142}
143
144impl<'a, E: Message> ExactSizeIterator for MessageMutIteratorWithId<'a, E> {
145    fn len(&self) -> usize {
146        self.unread
147    }
148}
149
150/// A parallel iterator over `Message`s.
151#[derive(Debug)]
152#[cfg(feature = "multi_threaded")]
153pub struct MessageMutParIter<'a, E: Message> {
154    mutator: &'a mut MessageCursor<E>,
155    slices: [&'a mut [MessageInstance<E>]; 2],
156    batching_strategy: BatchingStrategy,
157    #[cfg(not(target_arch = "wasm32"))]
158    unread: usize,
159}
160
161#[cfg(feature = "multi_threaded")]
162impl<'a, E: Message> MessageMutParIter<'a, E> {
163    /// Creates a new parallel iterator over `messages` that have not yet been seen by `mutator`.
164    pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
165        let a_index = mutator
166            .last_message_count
167            .saturating_sub(messages.messages_a.start_message_count);
168        let b_index = mutator
169            .last_message_count
170            .saturating_sub(messages.messages_b.start_message_count);
171        let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
172        let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
173
174        let unread_count = a.len() + b.len();
175        mutator.last_message_count = messages.message_count - unread_count;
176
177        Self {
178            mutator,
179            slices: [a, b],
180            batching_strategy: BatchingStrategy::default(),
181            #[cfg(not(target_arch = "wasm32"))]
182            unread: unread_count,
183        }
184    }
185
186    /// Changes the batching strategy used when iterating.
187    ///
188    /// For more information on how this affects the resultant iteration, see
189    /// [`BatchingStrategy`].
190    pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
191        self.batching_strategy = strategy;
192        self
193    }
194
195    /// Runs the provided closure for each unread message in parallel.
196    ///
197    /// Unlike normal iteration, the message order is not guaranteed in any form.
198    ///
199    /// # Panics
200    /// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being
201    /// initialized and run from the ECS scheduler, this should never panic.
202    ///
203    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
204    pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
205        self.for_each_with_id(move |e, _| func(e));
206    }
207
208    /// Runs the provided closure for each unread message in parallel, like [`for_each`](Self::for_each),
209    /// but additionally provides the `MessageId` to the closure.
210    ///
211    /// Note that the order of iteration is not guaranteed, but `MessageId`s are ordered by send order.
212    ///
213    /// # Panics
214    /// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being
215    /// initialized and run from the ECS scheduler, this should never panic.
216    ///
217    /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
218    #[cfg_attr(
219        target_arch = "wasm32",
220        expect(unused_mut, reason = "not mutated on this target")
221    )]
222    pub fn for_each_with_id<FN: Fn(&'a mut E, MessageId<E>) + Send + Sync + Clone>(
223        mut self,
224        func: FN,
225    ) {
226        #[cfg(target_arch = "wasm32")]
227        {
228            self.into_iter().for_each(|(e, i)| func(e, i));
229        }
230
231        #[cfg(not(target_arch = "wasm32"))]
232        {
233            let pool = bevy_tasks::ComputeTaskPool::get();
234            let thread_count = pool.thread_num();
235            if thread_count <= 1 {
236                return self.into_iter().for_each(|(e, i)| func(e, i));
237            }
238
239            let batch_size = self
240                .batching_strategy
241                .calc_batch_size(|| self.len(), thread_count);
242            let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
243
244            pool.scope(|scope| {
245                for batch in chunks.into_iter().flatten() {
246                    let func = func.clone();
247                    scope.spawn(async move {
248                        for message_instance in batch {
249                            func(&mut message_instance.message, message_instance.message_id);
250                        }
251                    });
252                }
253            });
254
255            // Messages are guaranteed to be read at this point.
256            self.mutator.last_message_count += self.unread;
257            self.unread = 0;
258        }
259    }
260
261    /// Returns the number of [`Message`]s to be iterated.
262    pub fn len(&self) -> usize {
263        self.slices.iter().map(|s| s.len()).sum()
264    }
265
266    /// Returns [`true`] if there are no messages remaining in this iterator.
267    pub fn is_empty(&self) -> bool {
268        self.slices.iter().all(|x| x.is_empty())
269    }
270}
271
272#[cfg(feature = "multi_threaded")]
273impl<'a, E: Message> IntoIterator for MessageMutParIter<'a, E> {
274    type IntoIter = MessageMutIteratorWithId<'a, E>;
275    type Item = <Self::IntoIter as Iterator>::Item;
276
277    fn into_iter(self) -> Self::IntoIter {
278        let MessageMutParIter {
279            mutator: reader,
280            slices: [a, b],
281            ..
282        } = self;
283        let unread = a.len() + b.len();
284        let chain = a.iter_mut().chain(b);
285        MessageMutIteratorWithId {
286            mutator: reader,
287            chain,
288            unread,
289        }
290    }
291}