bevy_ecs/message/
messages.rs

1use crate::{
2    change_detection::MaybeLocation,
3    message::{Message, MessageCursor, MessageId, MessageInstance},
4    resource::Resource,
5};
6use alloc::vec::Vec;
7use core::{
8    marker::PhantomData,
9    ops::{Deref, DerefMut},
10};
11#[cfg(feature = "bevy_reflect")]
12use {
13    crate::reflect::ReflectResource,
14    bevy_reflect::{std_traits::ReflectDefault, Reflect},
15};
16
17/// A message collection that represents the messages that occurred within the last two
18/// [`Messages::update`] calls.
19/// Messages can be written to using a [`MessageWriter`]
20/// and are typically cheaply read using a [`MessageReader`].
21///
22/// Each message can be consumed by multiple systems, in parallel,
23/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24///
25/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26/// is applied between writing and reading systems, there is a risk of a race condition.
27/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28///
29/// This collection is meant to be paired with a system that calls
30/// [`Messages::update`] exactly once per update/frame.
31///
32/// [`message_update_system`] is a system that does this, typically initialized automatically using
33/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35/// Messages will persist across a single frame boundary and so ordering of message producers and
36/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37/// If messages are not handled by the end of the frame after they are updated, they will be
38/// dropped silently.
39///
40/// # Example
41///
42/// ```
43/// use bevy_ecs::message::{Message, Messages};
44///
45/// #[derive(Message)]
46/// struct MyMessage {
47///     value: usize
48/// }
49///
50/// // setup
51/// let mut messages = Messages::<MyMessage>::default();
52/// let mut cursor = messages.get_cursor();
53///
54/// // run this once per update/frame
55/// messages.update();
56///
57/// // somewhere else: write a message
58/// messages.write(MyMessage { value: 1 });
59///
60/// // somewhere else: read the messages
61/// for message in cursor.read(&messages) {
62///     assert_eq!(message.value, 1)
63/// }
64///
65/// // messages are only processed once per reader
66/// assert_eq!(cursor.read(&messages).count(), 0);
67/// ```
68///
69/// # Details
70///
71/// [`Messages`] is implemented using a variation of a double buffer strategy.
72/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73/// - [`MessageReader`]s will read messages from both buffers.
74/// - [`MessageReader`]s that read at least once per update will never drop messages.
75/// - [`MessageReader`]s that read once within two updates might still receive some messages
76/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77///   before those updates.
78///
79/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80///
81/// An alternative call pattern would be to call [`update`](Messages::update)
82/// manually across frames to control when messages are cleared.
83/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84/// but can be done by adding your message as a resource instead of using
85/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86///
87/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89///
90/// [`MessageReader`]: super::MessageReader
91/// [`MessageWriter`]: super::MessageWriter
92/// [`message_update_system`]: super::message_update_system
93#[derive(Debug, Resource)]
94#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95pub struct Messages<E: Message> {
96    /// Holds the oldest still active messages.
97    /// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98    pub(crate) messages_a: MessageSequence<E>,
99    /// Holds the newer messages.
100    pub(crate) messages_b: MessageSequence<E>,
101    pub(crate) message_count: usize,
102}
103
104// Derived Default impl would incorrectly require E: Default
105impl<E: Message> Default for Messages<E> {
106    fn default() -> Self {
107        Self {
108            messages_a: Default::default(),
109            messages_b: Default::default(),
110            message_count: Default::default(),
111        }
112    }
113}
114
115impl<M: Message> Messages<M> {
116    /// Returns the index of the oldest message stored in the message buffer.
117    pub fn oldest_message_count(&self) -> usize {
118        self.messages_a.start_message_count
119    }
120
121    /// Writes an `message` to the current message buffer.
122    /// [`MessageReader`](super::MessageReader)s can then read the message.
123    /// This method returns the [ID](`MessageId`) of the written `message`.
124    #[track_caller]
125    pub fn write(&mut self, message: M) -> MessageId<M> {
126        self.write_with_caller(message, MaybeLocation::caller())
127    }
128
129    pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130        let message_id = MessageId {
131            id: self.message_count,
132            caller,
133            _marker: PhantomData,
134        };
135        #[cfg(feature = "detailed_trace")]
136        tracing::trace!("Messages::write() -> id: {}", message_id);
137
138        let message_instance = MessageInstance {
139            message_id,
140            message,
141        };
142
143        self.messages_b.push(message_instance);
144        self.message_count += 1;
145
146        message_id
147    }
148
149    /// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150    /// This is more efficient than writing each message individually.
151    /// This method returns the [IDs](`MessageId`) of the written `messages`.
152    #[track_caller]
153    pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154        let last_count = self.message_count;
155
156        self.extend(messages);
157
158        WriteBatchIds {
159            last_count,
160            message_count: self.message_count,
161            _marker: PhantomData,
162        }
163    }
164
165    /// Writes the default value of the message. Useful when the message is an empty struct.
166    /// This method returns the [ID](`MessageId`) of the written `message`.
167    #[track_caller]
168    pub fn write_default(&mut self) -> MessageId<M>
169    where
170        M: Default,
171    {
172        self.write(Default::default())
173    }
174
175    /// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
176    pub fn get_cursor(&self) -> MessageCursor<M> {
177        MessageCursor::default()
178    }
179
180    /// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
181    /// It will read all future messages.
182    pub fn get_cursor_current(&self) -> MessageCursor<M> {
183        MessageCursor {
184            last_message_count: self.message_count,
185            ..Default::default()
186        }
187    }
188
189    /// Swaps the message buffers and clears the oldest message buffer. In general, this should be
190    /// called once per frame/update.
191    ///
192    /// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
193    pub fn update(&mut self) {
194        core::mem::swap(&mut self.messages_a, &mut self.messages_b);
195        self.messages_b.clear();
196        self.messages_b.start_message_count = self.message_count;
197        debug_assert_eq!(
198            self.messages_a.start_message_count + self.messages_a.len(),
199            self.messages_b.start_message_count
200        );
201    }
202
203    /// Swaps the message buffers and drains the oldest message buffer, returning an iterator
204    /// of all messages that were removed. In general, this should be called once per frame/update.
205    ///
206    /// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
207    #[must_use = "If you do not need the returned messages, call .update() instead."]
208    pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
209        core::mem::swap(&mut self.messages_a, &mut self.messages_b);
210        let iter = self.messages_b.messages.drain(..);
211        self.messages_b.start_message_count = self.message_count;
212        debug_assert_eq!(
213            self.messages_a.start_message_count + self.messages_a.len(),
214            self.messages_b.start_message_count
215        );
216
217        iter.map(|e| e.message)
218    }
219
220    #[inline]
221    fn reset_start_message_count(&mut self) {
222        self.messages_a.start_message_count = self.message_count;
223        self.messages_b.start_message_count = self.message_count;
224    }
225
226    /// Removes all messages.
227    #[inline]
228    pub fn clear(&mut self) {
229        self.reset_start_message_count();
230        self.messages_a.clear();
231        self.messages_b.clear();
232    }
233
234    /// Returns the number of messages currently stored in the message buffer.
235    #[inline]
236    pub fn len(&self) -> usize {
237        self.messages_a.len() + self.messages_b.len()
238    }
239
240    /// Returns true if there are no messages currently stored in the message buffer.
241    #[inline]
242    pub fn is_empty(&self) -> bool {
243        self.len() == 0
244    }
245
246    /// Creates a draining iterator that removes all messages.
247    pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
248        self.reset_start_message_count();
249
250        // Drain the oldest messages first, then the newest
251        self.messages_a
252            .drain(..)
253            .chain(self.messages_b.drain(..))
254            .map(|i| i.message)
255    }
256
257    /// Iterates over messages that happened since the last "update" call.
258    /// WARNING: You probably don't want to use this call. In most cases you should use an
259    /// [`MessageReader`]. You should only use this if you know you only need to consume messages
260    /// between the last `update()` call and your call to `iter_current_update_messages`.
261    /// If messages happen outside that window, they will not be handled. For example, any messages that
262    /// happen after this call and before the next `update()` call will be dropped.
263    ///
264    /// [`MessageReader`]: super::MessageReader
265    pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
266        self.messages_b.iter().map(|i| &i.message)
267    }
268
269    /// Get a specific message by id if it still exists in the messages buffer.
270    pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
271        if id < self.oldest_message_count() {
272            return None;
273        }
274
275        let sequence = self.sequence(id);
276        let index = id.saturating_sub(sequence.start_message_count);
277
278        sequence
279            .get(index)
280            .map(|instance| (&instance.message, instance.message_id))
281    }
282
283    /// Which message buffer is this message id a part of.
284    fn sequence(&self, id: usize) -> &MessageSequence<M> {
285        if id < self.messages_b.start_message_count {
286            &self.messages_a
287        } else {
288            &self.messages_b
289        }
290    }
291}
292
293impl<E: Message> Extend<E> for Messages<E> {
294    #[track_caller]
295    fn extend<I>(&mut self, iter: I)
296    where
297        I: IntoIterator<Item = E>,
298    {
299        let old_count = self.message_count;
300        let mut message_count = self.message_count;
301        let messages = iter.into_iter().map(|message| {
302            let message_id = MessageId {
303                id: message_count,
304                caller: MaybeLocation::caller(),
305                _marker: PhantomData,
306            };
307            message_count += 1;
308            MessageInstance {
309                message_id,
310                message,
311            }
312        });
313
314        self.messages_b.extend(messages);
315
316        if old_count != message_count {
317            #[cfg(feature = "detailed_trace")]
318            tracing::trace!(
319                "Messages::extend() -> ids: ({}..{})",
320                self.message_count,
321                message_count
322            );
323        }
324
325        self.message_count = message_count;
326    }
327}
328
329#[derive(Debug)]
330#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
331pub(crate) struct MessageSequence<E: Message> {
332    pub(crate) messages: Vec<MessageInstance<E>>,
333    pub(crate) start_message_count: usize,
334}
335
336// Derived Default impl would incorrectly require E: Default
337impl<E: Message> Default for MessageSequence<E> {
338    fn default() -> Self {
339        Self {
340            messages: Default::default(),
341            start_message_count: Default::default(),
342        }
343    }
344}
345
346impl<E: Message> Deref for MessageSequence<E> {
347    type Target = Vec<MessageInstance<E>>;
348
349    fn deref(&self) -> &Self::Target {
350        &self.messages
351    }
352}
353
354impl<E: Message> DerefMut for MessageSequence<E> {
355    fn deref_mut(&mut self) -> &mut Self::Target {
356        &mut self.messages
357    }
358}
359
360/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
361pub struct WriteBatchIds<E> {
362    last_count: usize,
363    message_count: usize,
364    _marker: PhantomData<E>,
365}
366
367impl<E: Message> Iterator for WriteBatchIds<E> {
368    type Item = MessageId<E>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        if self.last_count >= self.message_count {
372            return None;
373        }
374
375        let result = Some(MessageId {
376            id: self.last_count,
377            caller: MaybeLocation::caller(),
378            _marker: PhantomData,
379        });
380
381        self.last_count += 1;
382
383        result
384    }
385
386    fn size_hint(&self) -> (usize, Option<usize>) {
387        let len = <Self as ExactSizeIterator>::len(self);
388        (len, Some(len))
389    }
390}
391
392impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {
393    fn len(&self) -> usize {
394        self.message_count.saturating_sub(self.last_count)
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use crate::message::{Message, Messages};
401
402    #[test]
403    fn iter_current_update_messages_iterates_over_current_messages() {
404        #[derive(Message, Clone)]
405        struct TestMessage;
406
407        let mut test_messages = Messages::<TestMessage>::default();
408
409        // Starting empty
410        assert_eq!(test_messages.len(), 0);
411        assert_eq!(test_messages.iter_current_update_messages().count(), 0);
412        test_messages.update();
413
414        // Writing one message
415        test_messages.write(TestMessage);
416
417        assert_eq!(test_messages.len(), 1);
418        assert_eq!(test_messages.iter_current_update_messages().count(), 1);
419        test_messages.update();
420
421        // Writing two messages on the next frame
422        test_messages.write(TestMessage);
423        test_messages.write(TestMessage);
424
425        assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
426        assert_eq!(test_messages.iter_current_update_messages().count(), 2);
427        test_messages.update();
428
429        // Writing zero messages
430        assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
431        assert_eq!(test_messages.iter_current_update_messages().count(), 0);
432    }
433
434    #[test]
435    fn write_batch_iter_size_hint() {
436        #[derive(Message, Clone, Copy)]
437        struct TestMessage;
438
439        let mut test_messages = Messages::<TestMessage>::default();
440        let write_batch_ids = test_messages.write_batch([TestMessage; 4]);
441        let expected_len = 4;
442        assert_eq!(write_batch_ids.len(), expected_len);
443        assert_eq!(
444            write_batch_ids.size_hint(),
445            (expected_len, Some(expected_len))
446        );
447    }
448}