bevy_ecs/world/
command_queue.rs

1use crate::system::{SystemBuffer, SystemMeta};
2
3use core::{
4    fmt::Debug,
5    mem::{size_of, MaybeUninit},
6    panic::AssertUnwindSafe,
7    ptr::{addr_of_mut, NonNull},
8};
9
10use bevy_ptr::{OwningPtr, Unaligned};
11use bevy_utils::tracing::warn;
12
13use crate::world::{Command, World};
14
15use super::DeferredWorld;
16
17struct CommandMeta {
18    /// SAFETY: The `value` must point to a value of type `T: Command`,
19    /// where `T` is some specific type that was used to produce this metadata.
20    ///
21    /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
22    ///
23    /// Advances `cursor` by the size of `T` in bytes.
24    consume_command_and_get_size:
25        unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
26}
27
28/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
29// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
30// as an optimization. Since commands are used frequently in systems as a way to spawn
31// entities/components/resources, and it's not currently possible to parallelize these
32// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
33// preferred to simplicity of implementation.
34#[derive(Default)]
35pub struct CommandQueue {
36    // This buffer densely stores all queued commands.
37    //
38    // For each command, one `CommandMeta` is stored, followed by zero or more bytes
39    // to store the command itself. To interpret these bytes, a pointer must
40    // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
41    pub(crate) bytes: Vec<MaybeUninit<u8>>,
42    pub(crate) cursor: usize,
43    pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
44}
45
46/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
47/// partially applying the world's command queue recursively
48#[derive(Clone)]
49pub(crate) struct RawCommandQueue {
50    pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
51    pub(crate) cursor: NonNull<usize>,
52    pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
53}
54
55// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
56// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
57// which gets extremely verbose very quickly, while also providing no useful information.
58// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
59// So instead, the manual impl just prints the length of vec.
60impl Debug for CommandQueue {
61    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
62        f.debug_struct("CommandQueue")
63            .field("len_bytes", &self.bytes.len())
64            .finish_non_exhaustive()
65    }
66}
67
68// SAFETY: All commands [`Command`] implement [`Send`]
69unsafe impl Send for CommandQueue {}
70
71// SAFETY: `&CommandQueue` never gives access to the inner commands.
72unsafe impl Sync for CommandQueue {}
73
74impl CommandQueue {
75    /// Push a [`Command`] onto the queue.
76    #[inline]
77    pub fn push<C>(&mut self, command: C)
78    where
79        C: Command,
80    {
81        // SAFETY: self is guaranteed to live for the lifetime of this method
82        unsafe {
83            self.get_raw().push(command);
84        }
85    }
86
87    /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
88    /// This clears the queue.
89    #[inline]
90    pub fn apply(&mut self, world: &mut World) {
91        // flush the previously queued entities
92        world.flush_entities();
93
94        // flush the world's internal queue
95        world.flush_commands();
96
97        // SAFETY: A reference is always a valid pointer
98        unsafe {
99            self.get_raw().apply_or_drop_queued(Some(world.into()));
100        }
101    }
102
103    /// Take all commands from `other` and append them to `self`, leaving `other` empty
104    pub fn append(&mut self, other: &mut CommandQueue) {
105        self.bytes.append(&mut other.bytes);
106    }
107
108    /// Returns false if there are any commands in the queue
109    #[inline]
110    pub fn is_empty(&self) -> bool {
111        self.cursor >= self.bytes.len()
112    }
113
114    /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
115    pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
116        // SAFETY: self is always valid memory
117        unsafe {
118            RawCommandQueue {
119                bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
120                cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
121                panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
122            }
123        }
124    }
125}
126
127impl RawCommandQueue {
128    /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
129    pub(crate) fn new() -> Self {
130        // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
131        unsafe {
132            Self {
133                bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
134                cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
135                panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
136            }
137        }
138    }
139
140    /// Returns true if the queue is empty.
141    ///
142    /// # Safety
143    ///
144    /// * Caller ensures that `bytes` and `cursor` point to valid memory
145    pub unsafe fn is_empty(&self) -> bool {
146        // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
147        (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
148    }
149
150    /// Push a [`Command`] onto the queue.
151    ///
152    /// # Safety
153    ///
154    /// * Caller ensures that `self` has not outlived the underlying queue
155    #[inline]
156    pub unsafe fn push<C>(&mut self, command: C)
157    where
158        C: Command,
159    {
160        // Stores a command alongside its metadata.
161        // `repr(C)` prevents the compiler from reordering the fields,
162        // while `repr(packed)` prevents the compiler from inserting padding bytes.
163        #[repr(C, packed)]
164        struct Packed<T: Command> {
165            meta: CommandMeta,
166            command: T,
167        }
168
169        let meta = CommandMeta {
170            consume_command_and_get_size: |command, world, cursor| {
171                *cursor += size_of::<C>();
172
173                // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
174                // `command` must point to a value of type `C`.
175                let command: C = unsafe { command.read_unaligned() };
176                match world {
177                    // Apply command to the provided world...
178                    Some(mut world) => {
179                        // SAFETY: Caller ensures pointer is not null
180                        let world = unsafe { world.as_mut() };
181                        command.apply(world);
182                        // The command may have queued up world commands, which we flush here to ensure they are also picked up.
183                        // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
184                        // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
185                        world.flush();
186                    }
187                    // ...or discard it.
188                    None => drop(command),
189                }
190            },
191        };
192
193        // SAFETY: There are no outstanding references to self.bytes
194        let bytes = unsafe { self.bytes.as_mut() };
195
196        let old_len = bytes.len();
197
198        // Reserve enough bytes for both the metadata and the command itself.
199        bytes.reserve(size_of::<Packed<C>>());
200
201        // Pointer to the bytes at the end of the buffer.
202        // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
203        let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
204
205        // Write the metadata into the buffer, followed by the command.
206        // We are using a packed struct to write them both as one operation.
207        // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
208        // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
209        // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
210        unsafe {
211            ptr.cast::<Packed<C>>()
212                .write_unaligned(Packed { meta, command });
213        }
214
215        // Extend the length of the buffer to include the data we just wrote.
216        // SAFETY: The new length is guaranteed to fit in the vector's capacity,
217        // due to the call to `.reserve()` above.
218        unsafe {
219            bytes.set_len(old_len + size_of::<Packed<C>>());
220        }
221    }
222
223    /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
224    /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
225    /// This clears the queue.
226    ///
227    /// # Safety
228    ///
229    /// * Caller ensures that `self` has not outlived the underlying queue
230    #[inline]
231    pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
232        // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
233        // If this is not the command queue on world we have exclusive ownership and self will not be mutated
234        let start = *self.cursor.as_ref();
235        let stop = self.bytes.as_ref().len();
236        let mut local_cursor = start;
237        // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
238        // the remaining commands currently in this list. This is safe.
239        *self.cursor.as_mut() = stop;
240
241        while local_cursor < stop {
242            // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
243            // Since we know that the cursor is in bounds, it must point to the start of a new command.
244            let meta = unsafe {
245                self.bytes
246                    .as_mut()
247                    .as_mut_ptr()
248                    .add(local_cursor)
249                    .cast::<CommandMeta>()
250                    .read_unaligned()
251            };
252
253            // Advance to the bytes just after `meta`, which represent a type-erased command.
254            local_cursor += size_of::<CommandMeta>();
255            // Construct an owned pointer to the command.
256            // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
257            // guarantees that nothing stored in the buffer will get observed after this function ends.
258            // `cmd` points to a valid address of a stored command, so it must be non-null.
259            let cmd = unsafe {
260                OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
261                    self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
262                ))
263            };
264            let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
265                // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
266                // since they were stored next to each other by `.push()`.
267                // For ZSTs, the type doesn't matter as long as the pointer is non-null.
268                // This also advances the cursor past the command. For ZSTs, the cursor will not move.
269                // At this point, it will either point to the next `CommandMeta`,
270                // or the cursor will be out of bounds and the loop will end.
271                unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
272            }));
273
274            if let Err(payload) = result {
275                // local_cursor now points to the location _after_ the panicked command.
276                // Add the remaining commands that _would have_ been applied to the
277                // panic_recovery queue.
278                //
279                // This uses `current_stop` instead of `stop` to account for any commands
280                // that were queued _during_ this panic.
281                //
282                // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
283                // an applied Command, the correct command order will be retained.
284                let panic_recovery = self.panic_recovery.as_mut();
285                let bytes = self.bytes.as_mut();
286                let current_stop = bytes.len();
287                panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
288                bytes.set_len(start);
289                *self.cursor.as_mut() = start;
290
291                // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
292                // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
293                // until we reach the top.
294                if start == 0 {
295                    bytes.append(panic_recovery);
296                }
297                std::panic::resume_unwind(payload);
298            }
299        }
300
301        // Reset the buffer: all commands past the original `start` cursor have been applied.
302        // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
303        // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
304        unsafe {
305            self.bytes.as_mut().set_len(start);
306            *self.cursor.as_mut() = start;
307        };
308    }
309}
310
311impl Drop for CommandQueue {
312    fn drop(&mut self) {
313        if !self.bytes.is_empty() {
314            warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
315        }
316        // SAFETY: A reference is always a valid pointer
317        unsafe { self.get_raw().apply_or_drop_queued(None) };
318    }
319}
320
321impl SystemBuffer for CommandQueue {
322    #[inline]
323    fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
324        #[cfg(feature = "trace")]
325        let _span_guard = _system_meta.commands_span.enter();
326        self.apply(world);
327    }
328
329    #[inline]
330    fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
331        world.commands().append(self);
332    }
333}
334
335#[cfg(test)]
336mod test {
337    use super::*;
338    use crate as bevy_ecs;
339    use crate::system::Resource;
340    use alloc::sync::Arc;
341    use core::{
342        panic::AssertUnwindSafe,
343        sync::atomic::{AtomicU32, Ordering},
344    };
345
346    struct DropCheck(Arc<AtomicU32>);
347
348    impl DropCheck {
349        fn new() -> (Self, Arc<AtomicU32>) {
350            let drops = Arc::new(AtomicU32::new(0));
351            (Self(drops.clone()), drops)
352        }
353    }
354
355    impl Drop for DropCheck {
356        fn drop(&mut self) {
357            self.0.fetch_add(1, Ordering::Relaxed);
358        }
359    }
360
361    impl Command for DropCheck {
362        fn apply(self, _: &mut World) {}
363    }
364
365    #[test]
366    fn test_command_queue_inner_drop() {
367        let mut queue = CommandQueue::default();
368
369        let (dropcheck_a, drops_a) = DropCheck::new();
370        let (dropcheck_b, drops_b) = DropCheck::new();
371
372        queue.push(dropcheck_a);
373        queue.push(dropcheck_b);
374
375        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
376        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
377
378        let mut world = World::new();
379        queue.apply(&mut world);
380
381        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
382        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
383    }
384
385    /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
386    /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
387    #[test]
388    fn test_command_queue_inner_drop_early() {
389        let mut queue = CommandQueue::default();
390
391        let (dropcheck_a, drops_a) = DropCheck::new();
392        let (dropcheck_b, drops_b) = DropCheck::new();
393
394        queue.push(dropcheck_a);
395        queue.push(dropcheck_b);
396
397        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
398        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
399
400        drop(queue);
401
402        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
403        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
404    }
405
406    struct SpawnCommand;
407
408    impl Command for SpawnCommand {
409        fn apply(self, world: &mut World) {
410            world.spawn_empty();
411        }
412    }
413
414    #[test]
415    fn test_command_queue_inner() {
416        let mut queue = CommandQueue::default();
417
418        queue.push(SpawnCommand);
419        queue.push(SpawnCommand);
420
421        let mut world = World::new();
422        queue.apply(&mut world);
423
424        assert_eq!(world.entities().len(), 2);
425
426        // The previous call to `apply` cleared the queue.
427        // This call should do nothing.
428        queue.apply(&mut world);
429        assert_eq!(world.entities().len(), 2);
430    }
431
432    // This has an arbitrary value `String` stored to ensure
433    // when then command gets pushed, the `bytes` vector gets
434    // some data added to it.
435    #[allow(dead_code)]
436    struct PanicCommand(String);
437    impl Command for PanicCommand {
438        fn apply(self, _: &mut World) {
439            panic!("command is panicking");
440        }
441    }
442
443    #[test]
444    fn test_command_queue_inner_panic_safe() {
445        std::panic::set_hook(Box::new(|_| {}));
446
447        let mut queue = CommandQueue::default();
448
449        queue.push(PanicCommand("I panic!".to_owned()));
450        queue.push(SpawnCommand);
451
452        let mut world = World::new();
453
454        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
455            queue.apply(&mut world);
456        }));
457
458        // Even though the first command panicked, it's still ok to push
459        // more commands.
460        queue.push(SpawnCommand);
461        queue.push(SpawnCommand);
462        queue.apply(&mut world);
463        assert_eq!(world.entities().len(), 3);
464    }
465
466    #[test]
467    fn test_command_queue_inner_nested_panic_safe() {
468        std::panic::set_hook(Box::new(|_| {}));
469
470        #[derive(Resource, Default)]
471        struct Order(Vec<usize>);
472
473        let mut world = World::new();
474        world.init_resource::<Order>();
475
476        fn add_index(index: usize) -> impl Command {
477            move |world: &mut World| world.resource_mut::<Order>().0.push(index)
478        }
479        world.commands().queue(add_index(1));
480        world.commands().queue(|world: &mut World| {
481            world.commands().queue(add_index(2));
482            world.commands().queue(PanicCommand("I panic!".to_owned()));
483            world.commands().queue(add_index(3));
484            world.flush_commands();
485        });
486        world.commands().queue(add_index(4));
487
488        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
489            world.flush_commands();
490        }));
491
492        world.commands().queue(add_index(5));
493        world.flush_commands();
494        assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
495    }
496
497    // NOTE: `CommandQueue` is `Send` because `Command` is send.
498    // If the `Command` trait gets reworked to be non-send, `CommandQueue`
499    // should be reworked.
500    // This test asserts that Command types are send.
501    fn assert_is_send_impl(_: impl Send) {}
502    fn assert_is_send(command: impl Command) {
503        assert_is_send_impl(command);
504    }
505
506    #[test]
507    fn test_command_is_send() {
508        assert_is_send(SpawnCommand);
509    }
510
511    #[allow(dead_code)]
512    struct CommandWithPadding(u8, u16);
513    impl Command for CommandWithPadding {
514        fn apply(self, _: &mut World) {}
515    }
516
517    #[cfg(miri)]
518    #[test]
519    fn test_uninit_bytes() {
520        let mut queue = CommandQueue::default();
521        queue.push(CommandWithPadding(0, 0));
522        let _ = format!("{:?}", queue.bytes);
523    }
524}