bevy_ecs/message/messages.rs
1use crate::{
2 change_detection::MaybeLocation,
3 message::{Message, MessageCursor, MessageId, MessageInstance},
4 resource::Resource,
5};
6use alloc::vec::Vec;
7use core::{
8 marker::PhantomData,
9 ops::{Deref, DerefMut},
10};
11#[cfg(feature = "bevy_reflect")]
12use {
13 crate::reflect::ReflectResource,
14 bevy_reflect::{std_traits::ReflectDefault, Reflect},
15};
16
17/// A message collection that represents the messages that occurred within the last two
18/// [`Messages::update`] calls.
19/// Messages can be written to using a [`MessageWriter`]
20/// and are typically cheaply read using a [`MessageReader`].
21///
22/// Each message can be consumed by multiple systems, in parallel,
23/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24///
25/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26/// is applied between writing and reading systems, there is a risk of a race condition.
27/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28///
29/// This collection is meant to be paired with a system that calls
30/// [`Messages::update`] exactly once per update/frame.
31///
32/// [`message_update_system`] is a system that does this, typically initialized automatically using
33/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35/// Messages will persist across a single frame boundary and so ordering of message producers and
36/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37/// If messages are not handled by the end of the frame after they are updated, they will be
38/// dropped silently.
39///
40/// # Example
41///
42/// ```
43/// use bevy_ecs::message::{Message, Messages};
44///
45/// #[derive(Message)]
46/// struct MyMessage {
47/// value: usize
48/// }
49///
50/// // setup
51/// let mut messages = Messages::<MyMessage>::default();
52/// let mut cursor = messages.get_cursor();
53///
54/// // run this once per update/frame
55/// messages.update();
56///
57/// // somewhere else: write a message
58/// messages.write(MyMessage { value: 1 });
59///
60/// // somewhere else: read the messages
61/// for message in cursor.read(&messages) {
62/// assert_eq!(message.value, 1)
63/// }
64///
65/// // messages are only processed once per reader
66/// assert_eq!(cursor.read(&messages).count(), 0);
67/// ```
68///
69/// # Details
70///
71/// [`Messages`] is implemented using a variation of a double buffer strategy.
72/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73/// - [`MessageReader`]s will read messages from both buffers.
74/// - [`MessageReader`]s that read at least once per update will never drop messages.
75/// - [`MessageReader`]s that read once within two updates might still receive some messages
76/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77/// before those updates.
78///
79/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80///
81/// An alternative call pattern would be to call [`update`](Messages::update)
82/// manually across frames to control when messages are cleared.
83/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84/// but can be done by adding your message as a resource instead of using
85/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86///
87/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89///
90/// [`MessageReader`]: super::MessageReader
91/// [`MessageWriter`]: super::MessageWriter
92/// [`message_update_system`]: super::message_update_system
93#[derive(Debug, Resource)]
94#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95pub struct Messages<E: Message> {
96 /// Holds the oldest still active messages.
97 /// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98 pub(crate) messages_a: MessageSequence<E>,
99 /// Holds the newer messages.
100 pub(crate) messages_b: MessageSequence<E>,
101 pub(crate) message_count: usize,
102}
103
104// Derived Default impl would incorrectly require E: Default
105impl<E: Message> Default for Messages<E> {
106 fn default() -> Self {
107 Self {
108 messages_a: Default::default(),
109 messages_b: Default::default(),
110 message_count: Default::default(),
111 }
112 }
113}
114
115impl<M: Message> Messages<M> {
116 /// Returns the index of the oldest message stored in the message buffer.
117 pub fn oldest_message_count(&self) -> usize {
118 self.messages_a.start_message_count
119 }
120
121 /// Writes an `message` to the current message buffer.
122 /// [`MessageReader`](super::MessageReader)s can then read the message.
123 /// This method returns the [ID](`MessageId`) of the written `message`.
124 #[track_caller]
125 pub fn write(&mut self, message: M) -> MessageId<M> {
126 self.write_with_caller(message, MaybeLocation::caller())
127 }
128
129 pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130 let message_id = MessageId {
131 id: self.message_count,
132 caller,
133 _marker: PhantomData,
134 };
135 #[cfg(feature = "detailed_trace")]
136 tracing::trace!("Messages::write() -> id: {}", message_id);
137
138 let message_instance = MessageInstance {
139 message_id,
140 message,
141 };
142
143 self.messages_b.push(message_instance);
144 self.message_count += 1;
145
146 message_id
147 }
148
149 /// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150 /// This is more efficient than writing each message individually.
151 /// This method returns the [IDs](`MessageId`) of the written `messages`.
152 #[track_caller]
153 pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154 let last_count = self.message_count;
155
156 self.extend(messages);
157
158 WriteBatchIds {
159 last_count,
160 message_count: self.message_count,
161 _marker: PhantomData,
162 }
163 }
164
165 /// Writes the default value of the message. Useful when the message is an empty struct.
166 /// This method returns the [ID](`MessageId`) of the written `message`.
167 #[track_caller]
168 pub fn write_default(&mut self) -> MessageId<M>
169 where
170 M: Default,
171 {
172 self.write(Default::default())
173 }
174
175 /// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
176 pub fn get_cursor(&self) -> MessageCursor<M> {
177 MessageCursor::default()
178 }
179
180 /// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
181 /// It will read all future messages.
182 pub fn get_cursor_current(&self) -> MessageCursor<M> {
183 MessageCursor {
184 last_message_count: self.message_count,
185 ..Default::default()
186 }
187 }
188
189 /// Swaps the message buffers and clears the oldest message buffer. In general, this should be
190 /// called once per frame/update.
191 ///
192 /// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
193 pub fn update(&mut self) {
194 core::mem::swap(&mut self.messages_a, &mut self.messages_b);
195 self.messages_b.clear();
196 self.messages_b.start_message_count = self.message_count;
197 debug_assert_eq!(
198 self.messages_a.start_message_count + self.messages_a.len(),
199 self.messages_b.start_message_count
200 );
201 }
202
203 /// Swaps the message buffers and drains the oldest message buffer, returning an iterator
204 /// of all messages that were removed. In general, this should be called once per frame/update.
205 ///
206 /// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
207 #[must_use = "If you do not need the returned messages, call .update() instead."]
208 pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
209 core::mem::swap(&mut self.messages_a, &mut self.messages_b);
210 let iter = self.messages_b.messages.drain(..);
211 self.messages_b.start_message_count = self.message_count;
212 debug_assert_eq!(
213 self.messages_a.start_message_count + self.messages_a.len(),
214 self.messages_b.start_message_count
215 );
216
217 iter.map(|e| e.message)
218 }
219
220 #[inline]
221 fn reset_start_message_count(&mut self) {
222 self.messages_a.start_message_count = self.message_count;
223 self.messages_b.start_message_count = self.message_count;
224 }
225
226 /// Removes all messages.
227 #[inline]
228 pub fn clear(&mut self) {
229 self.reset_start_message_count();
230 self.messages_a.clear();
231 self.messages_b.clear();
232 }
233
234 /// Returns the number of messages currently stored in the message buffer.
235 #[inline]
236 pub fn len(&self) -> usize {
237 self.messages_a.len() + self.messages_b.len()
238 }
239
240 /// Returns true if there are no messages currently stored in the message buffer.
241 #[inline]
242 pub fn is_empty(&self) -> bool {
243 self.len() == 0
244 }
245
246 /// Creates a draining iterator that removes all messages.
247 pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
248 self.reset_start_message_count();
249
250 // Drain the oldest messages first, then the newest
251 self.messages_a
252 .drain(..)
253 .chain(self.messages_b.drain(..))
254 .map(|i| i.message)
255 }
256
257 /// Iterates over messages that happened since the last "update" call.
258 /// WARNING: You probably don't want to use this call. In most cases you should use an
259 /// [`MessageReader`]. You should only use this if you know you only need to consume messages
260 /// between the last `update()` call and your call to `iter_current_update_messages`.
261 /// If messages happen outside that window, they will not be handled. For example, any messages that
262 /// happen after this call and before the next `update()` call will be dropped.
263 ///
264 /// [`MessageReader`]: super::MessageReader
265 pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
266 self.messages_b.iter().map(|i| &i.message)
267 }
268
269 /// Get a specific message by id if it still exists in the messages buffer.
270 pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
271 if id < self.oldest_message_count() {
272 return None;
273 }
274
275 let sequence = self.sequence(id);
276 let index = id.saturating_sub(sequence.start_message_count);
277
278 sequence
279 .get(index)
280 .map(|instance| (&instance.message, instance.message_id))
281 }
282
283 /// Which message buffer is this message id a part of.
284 fn sequence(&self, id: usize) -> &MessageSequence<M> {
285 if id < self.messages_b.start_message_count {
286 &self.messages_a
287 } else {
288 &self.messages_b
289 }
290 }
291}
292
293impl<E: Message> Extend<E> for Messages<E> {
294 #[track_caller]
295 fn extend<I>(&mut self, iter: I)
296 where
297 I: IntoIterator<Item = E>,
298 {
299 let old_count = self.message_count;
300 let mut message_count = self.message_count;
301 let messages = iter.into_iter().map(|message| {
302 let message_id = MessageId {
303 id: message_count,
304 caller: MaybeLocation::caller(),
305 _marker: PhantomData,
306 };
307 message_count += 1;
308 MessageInstance {
309 message_id,
310 message,
311 }
312 });
313
314 self.messages_b.extend(messages);
315
316 if old_count != message_count {
317 #[cfg(feature = "detailed_trace")]
318 tracing::trace!(
319 "Messages::extend() -> ids: ({}..{})",
320 self.message_count,
321 message_count
322 );
323 }
324
325 self.message_count = message_count;
326 }
327}
328
329#[derive(Debug)]
330#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
331pub(crate) struct MessageSequence<E: Message> {
332 pub(crate) messages: Vec<MessageInstance<E>>,
333 pub(crate) start_message_count: usize,
334}
335
336// Derived Default impl would incorrectly require E: Default
337impl<E: Message> Default for MessageSequence<E> {
338 fn default() -> Self {
339 Self {
340 messages: Default::default(),
341 start_message_count: Default::default(),
342 }
343 }
344}
345
346impl<E: Message> Deref for MessageSequence<E> {
347 type Target = Vec<MessageInstance<E>>;
348
349 fn deref(&self) -> &Self::Target {
350 &self.messages
351 }
352}
353
354impl<E: Message> DerefMut for MessageSequence<E> {
355 fn deref_mut(&mut self) -> &mut Self::Target {
356 &mut self.messages
357 }
358}
359
360/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
361pub struct WriteBatchIds<E> {
362 last_count: usize,
363 message_count: usize,
364 _marker: PhantomData<E>,
365}
366
367impl<E: Message> Iterator for WriteBatchIds<E> {
368 type Item = MessageId<E>;
369
370 fn next(&mut self) -> Option<Self::Item> {
371 if self.last_count >= self.message_count {
372 return None;
373 }
374
375 let result = Some(MessageId {
376 id: self.last_count,
377 caller: MaybeLocation::caller(),
378 _marker: PhantomData,
379 });
380
381 self.last_count += 1;
382
383 result
384 }
385
386 fn size_hint(&self) -> (usize, Option<usize>) {
387 let len = <Self as ExactSizeIterator>::len(self);
388 (len, Some(len))
389 }
390}
391
392impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {
393 fn len(&self) -> usize {
394 self.message_count.saturating_sub(self.last_count)
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use crate::message::{Message, Messages};
401
402 #[test]
403 fn iter_current_update_messages_iterates_over_current_messages() {
404 #[derive(Message, Clone)]
405 struct TestMessage;
406
407 let mut test_messages = Messages::<TestMessage>::default();
408
409 // Starting empty
410 assert_eq!(test_messages.len(), 0);
411 assert_eq!(test_messages.iter_current_update_messages().count(), 0);
412 test_messages.update();
413
414 // Writing one message
415 test_messages.write(TestMessage);
416
417 assert_eq!(test_messages.len(), 1);
418 assert_eq!(test_messages.iter_current_update_messages().count(), 1);
419 test_messages.update();
420
421 // Writing two messages on the next frame
422 test_messages.write(TestMessage);
423 test_messages.write(TestMessage);
424
425 assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
426 assert_eq!(test_messages.iter_current_update_messages().count(), 2);
427 test_messages.update();
428
429 // Writing zero messages
430 assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
431 assert_eq!(test_messages.iter_current_update_messages().count(), 0);
432 }
433
434 #[test]
435 fn write_batch_iter_size_hint() {
436 #[derive(Message, Clone, Copy)]
437 struct TestMessage;
438
439 let mut test_messages = Messages::<TestMessage>::default();
440 let write_batch_ids = test_messages.write_batch([TestMessage; 4]);
441 let expected_len = 4;
442 assert_eq!(write_batch_ids.len(), expected_len);
443 assert_eq!(
444 write_batch_ids.size_hint(),
445 (expected_len, Some(expected_len))
446 );
447 }
448}