bevy_ecs/world/
command_queue.rs

1use crate::{
2    system::{Command, SystemBuffer, SystemMeta},
3    world::{DeferredWorld, World},
4};
5use alloc::{boxed::Box, vec::Vec};
6use bevy_ptr::{OwningPtr, Unaligned};
7use core::{
8    fmt::Debug,
9    mem::{size_of, MaybeUninit},
10    panic::AssertUnwindSafe,
11    ptr::{addr_of_mut, NonNull},
12};
13use log::warn;
14
15struct CommandMeta {
16    /// SAFETY: The `value` must point to a value of type `T: Command`,
17    /// where `T` is some specific type that was used to produce this metadata.
18    ///
19    /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
20    ///
21    /// Advances `cursor` by the size of `T` in bytes.
22    consume_command_and_get_size:
23        unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
24}
25
26/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
27// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
28// as an optimization. Since commands are used frequently in systems as a way to spawn
29// entities/components/resources, and it's not currently possible to parallelize these
30// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
31// preferred to simplicity of implementation.
32#[derive(Default)]
33pub struct CommandQueue {
34    // This buffer densely stores all queued commands.
35    //
36    // For each command, one `CommandMeta` is stored, followed by zero or more bytes
37    // to store the command itself. To interpret these bytes, a pointer must
38    // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
39    pub(crate) bytes: Vec<MaybeUninit<u8>>,
40    pub(crate) cursor: usize,
41    pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
42}
43
44/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
45/// partially applying the world's command queue recursively
46#[derive(Clone)]
47pub(crate) struct RawCommandQueue {
48    pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
49    pub(crate) cursor: NonNull<usize>,
50    pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
51}
52
53// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
54// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
55// which gets extremely verbose very quickly, while also providing no useful information.
56// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
57// So instead, the manual impl just prints the length of vec.
58impl Debug for CommandQueue {
59    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
60        f.debug_struct("CommandQueue")
61            .field("len_bytes", &self.bytes.len())
62            .finish_non_exhaustive()
63    }
64}
65
66// SAFETY: All commands [`Command`] implement [`Send`]
67unsafe impl Send for CommandQueue {}
68
69// SAFETY: `&CommandQueue` never gives access to the inner commands.
70unsafe impl Sync for CommandQueue {}
71
72impl CommandQueue {
73    /// Push a [`Command`] onto the queue.
74    #[inline]
75    pub fn push(&mut self, command: impl Command) {
76        // SAFETY: self is guaranteed to live for the lifetime of this method
77        unsafe {
78            self.get_raw().push(command);
79        }
80    }
81
82    /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
83    /// This clears the queue.
84    #[inline]
85    pub fn apply(&mut self, world: &mut World) {
86        // flush the previously queued entities
87        world.flush_entities();
88
89        // flush the world's internal queue
90        world.flush_commands();
91
92        // SAFETY: A reference is always a valid pointer
93        unsafe {
94            self.get_raw().apply_or_drop_queued(Some(world.into()));
95        }
96    }
97
98    /// Take all commands from `other` and append them to `self`, leaving `other` empty
99    pub fn append(&mut self, other: &mut CommandQueue) {
100        self.bytes.append(&mut other.bytes);
101    }
102
103    /// Returns false if there are any commands in the queue
104    #[inline]
105    pub fn is_empty(&self) -> bool {
106        self.cursor >= self.bytes.len()
107    }
108
109    /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
110    pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
111        // SAFETY: self is always valid memory
112        unsafe {
113            RawCommandQueue {
114                bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
115                cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
116                panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
117            }
118        }
119    }
120}
121
122impl RawCommandQueue {
123    /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
124    pub(crate) fn new() -> Self {
125        // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
126        unsafe {
127            Self {
128                bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
129                cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
130                panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
131            }
132        }
133    }
134
135    /// Returns true if the queue is empty.
136    ///
137    /// # Safety
138    ///
139    /// * Caller ensures that `bytes` and `cursor` point to valid memory
140    pub unsafe fn is_empty(&self) -> bool {
141        // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
142        (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
143    }
144
145    /// Push a [`Command`] onto the queue.
146    ///
147    /// # Safety
148    ///
149    /// * Caller ensures that `self` has not outlived the underlying queue
150    #[inline]
151    pub unsafe fn push<C: Command>(&mut self, command: C) {
152        // Stores a command alongside its metadata.
153        // `repr(C)` prevents the compiler from reordering the fields,
154        // while `repr(packed)` prevents the compiler from inserting padding bytes.
155        #[repr(C, packed)]
156        struct Packed<C: Command> {
157            meta: CommandMeta,
158            command: C,
159        }
160
161        let meta = CommandMeta {
162            consume_command_and_get_size: |command, world, cursor| {
163                *cursor += size_of::<C>();
164
165                // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
166                // `command` must point to a value of type `C`.
167                let command: C = unsafe { command.read_unaligned() };
168                match world {
169                    // Apply command to the provided world...
170                    Some(mut world) => {
171                        // SAFETY: Caller ensures pointer is not null
172                        let world = unsafe { world.as_mut() };
173                        command.apply(world);
174                        // The command may have queued up world commands, which we flush here to ensure they are also picked up.
175                        // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
176                        // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
177                        world.flush();
178                    }
179                    // ...or discard it.
180                    None => drop(command),
181                }
182            },
183        };
184
185        // SAFETY: There are no outstanding references to self.bytes
186        let bytes = unsafe { self.bytes.as_mut() };
187
188        let old_len = bytes.len();
189
190        // Reserve enough bytes for both the metadata and the command itself.
191        bytes.reserve(size_of::<Packed<C>>());
192
193        // Pointer to the bytes at the end of the buffer.
194        // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
195        let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
196
197        // Write the metadata into the buffer, followed by the command.
198        // We are using a packed struct to write them both as one operation.
199        // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
200        // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
201        // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
202        unsafe {
203            ptr.cast::<Packed<C>>()
204                .write_unaligned(Packed { meta, command });
205        }
206
207        // Extend the length of the buffer to include the data we just wrote.
208        // SAFETY: The new length is guaranteed to fit in the vector's capacity,
209        // due to the call to `.reserve()` above.
210        unsafe {
211            bytes.set_len(old_len + size_of::<Packed<C>>());
212        }
213    }
214
215    /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
216    /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
217    /// This clears the queue.
218    ///
219    /// # Safety
220    ///
221    /// * Caller ensures that `self` has not outlived the underlying queue
222    #[inline]
223    pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
224        // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
225        // If this is not the command queue on world we have exclusive ownership and self will not be mutated
226        let start = *self.cursor.as_ref();
227        let stop = self.bytes.as_ref().len();
228        let mut local_cursor = start;
229        // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
230        // the remaining commands currently in this list. This is safe.
231        *self.cursor.as_mut() = stop;
232
233        while local_cursor < stop {
234            // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
235            // Since we know that the cursor is in bounds, it must point to the start of a new command.
236            let meta = unsafe {
237                self.bytes
238                    .as_mut()
239                    .as_mut_ptr()
240                    .add(local_cursor)
241                    .cast::<CommandMeta>()
242                    .read_unaligned()
243            };
244
245            // Advance to the bytes just after `meta`, which represent a type-erased command.
246            local_cursor += size_of::<CommandMeta>();
247            // Construct an owned pointer to the command.
248            // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
249            // guarantees that nothing stored in the buffer will get observed after this function ends.
250            // `cmd` points to a valid address of a stored command, so it must be non-null.
251            let cmd = unsafe {
252                OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
253                    self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
254                ))
255            };
256            let f = AssertUnwindSafe(|| {
257                // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
258                // since they were stored next to each other by `.push()`.
259                // For ZSTs, the type doesn't matter as long as the pointer is non-null.
260                // This also advances the cursor past the command. For ZSTs, the cursor will not move.
261                // At this point, it will either point to the next `CommandMeta`,
262                // or the cursor will be out of bounds and the loop will end.
263                unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
264            });
265
266            #[cfg(feature = "std")]
267            {
268                let result = std::panic::catch_unwind(f);
269
270                if let Err(payload) = result {
271                    // local_cursor now points to the location _after_ the panicked command.
272                    // Add the remaining commands that _would have_ been applied to the
273                    // panic_recovery queue.
274                    //
275                    // This uses `current_stop` instead of `stop` to account for any commands
276                    // that were queued _during_ this panic.
277                    //
278                    // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
279                    // an applied Command, the correct command order will be retained.
280                    let panic_recovery = self.panic_recovery.as_mut();
281                    let bytes = self.bytes.as_mut();
282                    let current_stop = bytes.len();
283                    panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
284                    bytes.set_len(start);
285                    *self.cursor.as_mut() = start;
286
287                    // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
288                    // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
289                    // until we reach the top.
290                    if start == 0 {
291                        bytes.append(panic_recovery);
292                    }
293                    std::panic::resume_unwind(payload);
294                }
295            }
296
297            #[cfg(not(feature = "std"))]
298            (f)();
299        }
300
301        // Reset the buffer: all commands past the original `start` cursor have been applied.
302        // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
303        // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
304        unsafe {
305            self.bytes.as_mut().set_len(start);
306            *self.cursor.as_mut() = start;
307        };
308    }
309}
310
311impl Drop for CommandQueue {
312    fn drop(&mut self) {
313        if !self.bytes.is_empty() {
314            warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
315        }
316        // SAFETY: A reference is always a valid pointer
317        unsafe { self.get_raw().apply_or_drop_queued(None) };
318    }
319}
320
321impl SystemBuffer for CommandQueue {
322    #[inline]
323    fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
324        #[cfg(feature = "trace")]
325        let _span_guard = _system_meta.commands_span.enter();
326        self.apply(world);
327    }
328
329    #[inline]
330    fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
331        world.commands().append(self);
332    }
333}
334
335#[cfg(test)]
336mod test {
337    use super::*;
338    use crate::resource::Resource;
339    use alloc::{borrow::ToOwned, string::String, sync::Arc};
340    use core::{
341        panic::AssertUnwindSafe,
342        sync::atomic::{AtomicU32, Ordering},
343    };
344
345    #[cfg(miri)]
346    use alloc::format;
347
348    struct DropCheck(Arc<AtomicU32>);
349
350    impl DropCheck {
351        fn new() -> (Self, Arc<AtomicU32>) {
352            let drops = Arc::new(AtomicU32::new(0));
353            (Self(drops.clone()), drops)
354        }
355    }
356
357    impl Drop for DropCheck {
358        fn drop(&mut self) {
359            self.0.fetch_add(1, Ordering::Relaxed);
360        }
361    }
362
363    impl Command for DropCheck {
364        fn apply(self, _: &mut World) {}
365    }
366
367    #[test]
368    fn test_command_queue_inner_drop() {
369        let mut queue = CommandQueue::default();
370
371        let (dropcheck_a, drops_a) = DropCheck::new();
372        let (dropcheck_b, drops_b) = DropCheck::new();
373
374        queue.push(dropcheck_a);
375        queue.push(dropcheck_b);
376
377        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
378        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
379
380        let mut world = World::new();
381        queue.apply(&mut world);
382
383        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
384        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
385    }
386
387    /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
388    /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
389    #[test]
390    fn test_command_queue_inner_drop_early() {
391        let mut queue = CommandQueue::default();
392
393        let (dropcheck_a, drops_a) = DropCheck::new();
394        let (dropcheck_b, drops_b) = DropCheck::new();
395
396        queue.push(dropcheck_a);
397        queue.push(dropcheck_b);
398
399        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
400        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
401
402        drop(queue);
403
404        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
405        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
406    }
407
408    struct SpawnCommand;
409
410    impl Command for SpawnCommand {
411        fn apply(self, world: &mut World) {
412            world.spawn_empty();
413        }
414    }
415
416    #[test]
417    fn test_command_queue_inner() {
418        let mut queue = CommandQueue::default();
419
420        queue.push(SpawnCommand);
421        queue.push(SpawnCommand);
422
423        let mut world = World::new();
424        queue.apply(&mut world);
425
426        assert_eq!(world.entities().len(), 2);
427
428        // The previous call to `apply` cleared the queue.
429        // This call should do nothing.
430        queue.apply(&mut world);
431        assert_eq!(world.entities().len(), 2);
432    }
433
434    #[expect(
435        dead_code,
436        reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
437    )]
438    struct PanicCommand(String);
439    impl Command for PanicCommand {
440        fn apply(self, _: &mut World) {
441            panic!("command is panicking");
442        }
443    }
444
445    #[test]
446    fn test_command_queue_inner_panic_safe() {
447        std::panic::set_hook(Box::new(|_| {}));
448
449        let mut queue = CommandQueue::default();
450
451        queue.push(PanicCommand("I panic!".to_owned()));
452        queue.push(SpawnCommand);
453
454        let mut world = World::new();
455
456        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
457            queue.apply(&mut world);
458        }));
459
460        // Even though the first command panicked, it's still ok to push
461        // more commands.
462        queue.push(SpawnCommand);
463        queue.push(SpawnCommand);
464        queue.apply(&mut world);
465        assert_eq!(world.entities().len(), 3);
466    }
467
468    #[test]
469    fn test_command_queue_inner_nested_panic_safe() {
470        std::panic::set_hook(Box::new(|_| {}));
471
472        #[derive(Resource, Default)]
473        struct Order(Vec<usize>);
474
475        let mut world = World::new();
476        world.init_resource::<Order>();
477
478        fn add_index(index: usize) -> impl Command {
479            move |world: &mut World| world.resource_mut::<Order>().0.push(index)
480        }
481        world.commands().queue(add_index(1));
482        world.commands().queue(|world: &mut World| {
483            world.commands().queue(add_index(2));
484            world.commands().queue(PanicCommand("I panic!".to_owned()));
485            world.commands().queue(add_index(3));
486            world.flush_commands();
487        });
488        world.commands().queue(add_index(4));
489
490        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
491            world.flush_commands();
492        }));
493
494        world.commands().queue(add_index(5));
495        world.flush_commands();
496        assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
497    }
498
499    // NOTE: `CommandQueue` is `Send` because `Command` is send.
500    // If the `Command` trait gets reworked to be non-send, `CommandQueue`
501    // should be reworked.
502    // This test asserts that Command types are send.
503    fn assert_is_send_impl(_: impl Send) {}
504    fn assert_is_send(command: impl Command) {
505        assert_is_send_impl(command);
506    }
507
508    #[test]
509    fn test_command_is_send() {
510        assert_is_send(SpawnCommand);
511    }
512
513    #[expect(
514        dead_code,
515        reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
516    )]
517    struct CommandWithPadding(u8, u16);
518    impl Command for CommandWithPadding {
519        fn apply(self, _: &mut World) {}
520    }
521
522    #[cfg(miri)]
523    #[test]
524    fn test_uninit_bytes() {
525        let mut queue = CommandQueue::default();
526        queue.push(CommandWithPadding(0, 0));
527        let _ = format!("{:?}", queue.bytes);
528    }
529}