bevy_ecs/world/
command_queue.rs

1use crate::{
2    change_detection::MaybeLocation,
3    system::{Command, SystemBuffer, SystemMeta},
4    world::{DeferredWorld, World},
5};
6
7use alloc::{boxed::Box, vec::Vec};
8use bevy_ptr::{OwningPtr, Unaligned};
9use core::{
10    fmt::Debug,
11    mem::{size_of, MaybeUninit},
12    panic::AssertUnwindSafe,
13    ptr::{addr_of_mut, NonNull},
14};
15use log::warn;
16
17struct CommandMeta {
18    /// SAFETY: The `value` must point to a value of type `T: Command`,
19    /// where `T` is some specific type that was used to produce this metadata.
20    ///
21    /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
22    ///
23    /// Advances `cursor` by the size of `T` in bytes.
24    consume_command_and_get_size:
25        unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
26}
27
28/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
29// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
30// as an optimization. Since commands are used frequently in systems as a way to spawn
31// entities/components/resources, and it's not currently possible to parallelize these
32// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
33// preferred to simplicity of implementation.
34pub struct CommandQueue {
35    // This buffer densely stores all queued commands.
36    //
37    // For each command, one `CommandMeta` is stored, followed by zero or more bytes
38    // to store the command itself. To interpret these bytes, a pointer must
39    // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
40    pub(crate) bytes: Vec<MaybeUninit<u8>>,
41    pub(crate) cursor: usize,
42    pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
43    pub(crate) caller: MaybeLocation,
44}
45
46impl Default for CommandQueue {
47    #[track_caller]
48    fn default() -> Self {
49        Self {
50            bytes: Default::default(),
51            cursor: Default::default(),
52            panic_recovery: Default::default(),
53            caller: MaybeLocation::caller(),
54        }
55    }
56}
57
58/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
59/// partially applying the world's command queue recursively
60#[derive(Clone)]
61pub(crate) struct RawCommandQueue {
62    pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
63    pub(crate) cursor: NonNull<usize>,
64    pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
65}
66
67// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
68// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
69// which gets extremely verbose very quickly, while also providing no useful information.
70// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
71// So instead, the manual impl just prints the length of vec.
72impl Debug for CommandQueue {
73    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74        f.debug_struct("CommandQueue")
75            .field("len_bytes", &self.bytes.len())
76            .field("caller", &self.caller)
77            .finish_non_exhaustive()
78    }
79}
80
81// SAFETY: All commands [`Command`] implement [`Send`]
82unsafe impl Send for CommandQueue {}
83
84// SAFETY: `&CommandQueue` never gives access to the inner commands.
85unsafe impl Sync for CommandQueue {}
86
87impl CommandQueue {
88    /// Push a [`Command`] onto the queue.
89    #[inline]
90    pub fn push(&mut self, command: impl Command) {
91        // SAFETY: self is guaranteed to live for the lifetime of this method
92        unsafe {
93            self.get_raw().push(command);
94        }
95    }
96
97    /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
98    /// This clears the queue.
99    #[inline]
100    pub fn apply(&mut self, world: &mut World) {
101        // flush the world's internal queue
102        world.flush_commands();
103
104        // SAFETY: A reference is always a valid pointer
105        unsafe {
106            self.get_raw().apply_or_drop_queued(Some(world.into()));
107        }
108    }
109
110    /// Take all commands from `other` and append them to `self`, leaving `other` empty
111    pub fn append(&mut self, other: &mut CommandQueue) {
112        self.bytes.append(&mut other.bytes);
113    }
114
115    /// Returns false if there are any commands in the queue
116    #[inline]
117    pub fn is_empty(&self) -> bool {
118        self.cursor >= self.bytes.len()
119    }
120
121    /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
122    pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
123        // SAFETY: self is always valid memory
124        unsafe {
125            RawCommandQueue {
126                bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
127                cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
128                panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
129            }
130        }
131    }
132}
133
134impl RawCommandQueue {
135    /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
136    pub(crate) fn new() -> Self {
137        // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
138        unsafe {
139            Self {
140                bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
141                cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
142                panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
143            }
144        }
145    }
146
147    /// Returns true if the queue is empty.
148    ///
149    /// # Safety
150    ///
151    /// * Caller ensures that `bytes` and `cursor` point to valid memory
152    pub unsafe fn is_empty(&self) -> bool {
153        // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
154        (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
155    }
156
157    /// Push a [`Command`] onto the queue.
158    ///
159    /// # Safety
160    ///
161    /// * Caller ensures that `self` has not outlived the underlying queue
162    #[inline]
163    pub unsafe fn push<C: Command>(&mut self, command: C) {
164        // Stores a command alongside its metadata.
165        // `repr(C)` prevents the compiler from reordering the fields,
166        // while `repr(packed)` prevents the compiler from inserting padding bytes.
167        #[repr(C, packed)]
168        struct Packed<C: Command> {
169            meta: CommandMeta,
170            command: C,
171        }
172
173        let meta = CommandMeta {
174            consume_command_and_get_size: |command, world, cursor| {
175                *cursor += size_of::<C>();
176
177                // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
178                // `command` must point to a value of type `C`.
179                let command: C = unsafe { command.read_unaligned() };
180                match world {
181                    // Apply command to the provided world...
182                    Some(mut world) => {
183                        // SAFETY: Caller ensures pointer is not null
184                        let world = unsafe { world.as_mut() };
185                        command.apply(world);
186                        // The command may have queued up world commands, which we flush here to ensure they are also picked up.
187                        // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
188                        // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
189                        world.flush();
190                    }
191                    // ...or discard it.
192                    None => drop(command),
193                }
194            },
195        };
196
197        // SAFETY: There are no outstanding references to self.bytes
198        let bytes = unsafe { self.bytes.as_mut() };
199
200        let old_len = bytes.len();
201
202        // Reserve enough bytes for both the metadata and the command itself.
203        bytes.reserve(size_of::<Packed<C>>());
204
205        // Pointer to the bytes at the end of the buffer.
206        // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
207        let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
208
209        // Write the metadata into the buffer, followed by the command.
210        // We are using a packed struct to write them both as one operation.
211        // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
212        // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
213        // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
214        unsafe {
215            ptr.cast::<Packed<C>>()
216                .write_unaligned(Packed { meta, command });
217        }
218
219        // Extend the length of the buffer to include the data we just wrote.
220        // SAFETY: The new length is guaranteed to fit in the vector's capacity,
221        // due to the call to `.reserve()` above.
222        unsafe {
223            bytes.set_len(old_len + size_of::<Packed<C>>());
224        }
225    }
226
227    /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
228    /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
229    /// This clears the queue.
230    ///
231    /// # Safety
232    ///
233    /// * Caller ensures that `self` has not outlived the underlying queue
234    #[inline]
235    pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
236        // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
237        // If this is not the command queue on world we have exclusive ownership and self will not be mutated
238        let start = *self.cursor.as_ref();
239        let stop = self.bytes.as_ref().len();
240        let mut local_cursor = start;
241        // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
242        // the remaining commands currently in this list. This is safe.
243        *self.cursor.as_mut() = stop;
244
245        while local_cursor < stop {
246            // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
247            // Since we know that the cursor is in bounds, it must point to the start of a new command.
248            let meta = unsafe {
249                self.bytes
250                    .as_mut()
251                    .as_mut_ptr()
252                    .add(local_cursor)
253                    .cast::<CommandMeta>()
254                    .read_unaligned()
255            };
256
257            // Advance to the bytes just after `meta`, which represent a type-erased command.
258            local_cursor += size_of::<CommandMeta>();
259            // Construct an owned pointer to the command.
260            // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
261            // guarantees that nothing stored in the buffer will get observed after this function ends.
262            // `cmd` points to a valid address of a stored command, so it must be non-null.
263            let cmd = unsafe {
264                OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
265                    self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
266                ))
267            };
268            let f = AssertUnwindSafe(|| {
269                // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
270                // since they were stored next to each other by `.push()`.
271                // For ZSTs, the type doesn't matter as long as the pointer is non-null.
272                // This also advances the cursor past the command. For ZSTs, the cursor will not move.
273                // At this point, it will either point to the next `CommandMeta`,
274                // or the cursor will be out of bounds and the loop will end.
275                unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
276            });
277
278            #[cfg(feature = "std")]
279            {
280                let result = std::panic::catch_unwind(f);
281
282                if let Err(payload) = result {
283                    // local_cursor now points to the location _after_ the panicked command.
284                    // Add the remaining commands that _would have_ been applied to the
285                    // panic_recovery queue.
286                    //
287                    // This uses `current_stop` instead of `stop` to account for any commands
288                    // that were queued _during_ this panic.
289                    //
290                    // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
291                    // an applied Command, the correct command order will be retained.
292                    let panic_recovery = self.panic_recovery.as_mut();
293                    let bytes = self.bytes.as_mut();
294                    let current_stop = bytes.len();
295                    panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
296                    bytes.set_len(start);
297                    *self.cursor.as_mut() = start;
298
299                    // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
300                    // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
301                    // until we reach the top.
302                    if start == 0 {
303                        bytes.append(panic_recovery);
304                    }
305                    std::panic::resume_unwind(payload);
306                }
307            }
308
309            #[cfg(not(feature = "std"))]
310            (f)();
311        }
312
313        // Reset the buffer: all commands past the original `start` cursor have been applied.
314        // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
315        // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
316        unsafe {
317            self.bytes.as_mut().set_len(start);
318            *self.cursor.as_mut() = start;
319        };
320    }
321}
322
323impl Drop for CommandQueue {
324    fn drop(&mut self) {
325        if !self.bytes.is_empty() {
326            if let Some(caller) = self.caller.into_option() {
327                warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{caller:?}");
328            } else {
329                warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
330            }
331        }
332        // SAFETY: A reference is always a valid pointer
333        unsafe { self.get_raw().apply_or_drop_queued(None) };
334    }
335}
336
337impl SystemBuffer for CommandQueue {
338    #[inline]
339    fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
340        #[cfg(feature = "trace")]
341        let _span_guard = _system_meta.commands_span.enter();
342        self.apply(world);
343    }
344
345    #[inline]
346    fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
347        world.commands().append(self);
348    }
349}
350
351#[cfg(test)]
352mod test {
353    use super::*;
354    use crate::{component::Component, resource::Resource};
355    use alloc::{borrow::ToOwned, string::String, sync::Arc};
356    use core::{
357        panic::AssertUnwindSafe,
358        sync::atomic::{AtomicU32, Ordering},
359    };
360
361    #[cfg(miri)]
362    use alloc::format;
363
364    struct DropCheck(Arc<AtomicU32>);
365
366    impl DropCheck {
367        fn new() -> (Self, Arc<AtomicU32>) {
368            let drops = Arc::new(AtomicU32::new(0));
369            (Self(drops.clone()), drops)
370        }
371    }
372
373    impl Drop for DropCheck {
374        fn drop(&mut self) {
375            self.0.fetch_add(1, Ordering::Relaxed);
376        }
377    }
378
379    impl Command for DropCheck {
380        fn apply(self, _: &mut World) {}
381    }
382
383    #[test]
384    fn test_command_queue_inner_drop() {
385        let mut queue = CommandQueue::default();
386
387        let (dropcheck_a, drops_a) = DropCheck::new();
388        let (dropcheck_b, drops_b) = DropCheck::new();
389
390        queue.push(dropcheck_a);
391        queue.push(dropcheck_b);
392
393        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
394        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
395
396        let mut world = World::new();
397        queue.apply(&mut world);
398
399        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
400        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
401    }
402
403    /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
404    /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
405    #[test]
406    fn test_command_queue_inner_drop_early() {
407        let mut queue = CommandQueue::default();
408
409        let (dropcheck_a, drops_a) = DropCheck::new();
410        let (dropcheck_b, drops_b) = DropCheck::new();
411
412        queue.push(dropcheck_a);
413        queue.push(dropcheck_b);
414
415        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
416        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
417
418        drop(queue);
419
420        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
421        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
422    }
423
424    #[derive(Component)]
425    struct A;
426
427    struct SpawnCommand;
428
429    impl Command for SpawnCommand {
430        fn apply(self, world: &mut World) {
431            world.spawn(A);
432        }
433    }
434
435    #[test]
436    fn test_command_queue_inner() {
437        let mut queue = CommandQueue::default();
438
439        queue.push(SpawnCommand);
440        queue.push(SpawnCommand);
441
442        let mut world = World::new();
443        queue.apply(&mut world);
444
445        assert_eq!(world.query::<&A>().query(&world).count(), 2);
446
447        // The previous call to `apply` cleared the queue.
448        // This call should do nothing.
449        queue.apply(&mut world);
450        assert_eq!(world.query::<&A>().query(&world).count(), 2);
451    }
452
453    #[expect(
454        dead_code,
455        reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
456    )]
457    struct PanicCommand(String);
458    impl Command for PanicCommand {
459        fn apply(self, _: &mut World) {
460            panic!("command is panicking");
461        }
462    }
463
464    #[test]
465    fn test_command_queue_inner_panic_safe() {
466        std::panic::set_hook(Box::new(|_| {}));
467
468        let mut queue = CommandQueue::default();
469
470        queue.push(PanicCommand("I panic!".to_owned()));
471        queue.push(SpawnCommand);
472
473        let mut world = World::new();
474
475        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
476            queue.apply(&mut world);
477        }));
478
479        // Even though the first command panicked, it's still ok to push
480        // more commands.
481        queue.push(SpawnCommand);
482        queue.push(SpawnCommand);
483        queue.apply(&mut world);
484        assert_eq!(world.query::<&A>().query(&world).count(), 3);
485    }
486
487    #[test]
488    fn test_command_queue_inner_nested_panic_safe() {
489        std::panic::set_hook(Box::new(|_| {}));
490
491        #[derive(Resource, Default)]
492        struct Order(Vec<usize>);
493
494        let mut world = World::new();
495        world.init_resource::<Order>();
496
497        fn add_index(index: usize) -> impl Command {
498            move |world: &mut World| world.resource_mut::<Order>().0.push(index)
499        }
500        world.commands().queue(add_index(1));
501        world.commands().queue(|world: &mut World| {
502            world.commands().queue(add_index(2));
503            world.commands().queue(PanicCommand("I panic!".to_owned()));
504            world.commands().queue(add_index(3));
505            world.flush_commands();
506        });
507        world.commands().queue(add_index(4));
508
509        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
510            world.flush_commands();
511        }));
512
513        world.commands().queue(add_index(5));
514        world.flush_commands();
515        assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
516    }
517
518    // NOTE: `CommandQueue` is `Send` because `Command` is send.
519    // If the `Command` trait gets reworked to be non-send, `CommandQueue`
520    // should be reworked.
521    // This test asserts that Command types are send.
522    fn assert_is_send_impl(_: impl Send) {}
523    fn assert_is_send(command: impl Command) {
524        assert_is_send_impl(command);
525    }
526
527    #[test]
528    fn test_command_is_send() {
529        assert_is_send(SpawnCommand);
530    }
531
532    #[expect(
533        dead_code,
534        reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
535    )]
536    struct CommandWithPadding(u8, u16);
537    impl Command for CommandWithPadding {
538        fn apply(self, _: &mut World) {}
539    }
540
541    #[cfg(miri)]
542    #[test]
543    fn test_uninit_bytes() {
544        let mut queue = CommandQueue::default();
545        queue.push(CommandWithPadding(0, 0));
546        let _ = format!("{:?}", queue.bytes);
547    }
548}