bevy_ecs/world/
command_queue.rs

1#[cfg(feature = "track_location")]
2use crate::change_detection::MaybeLocation;
3use crate::{
4    system::{Command, SystemBuffer, SystemMeta},
5    world::{DeferredWorld, World},
6};
7
8use alloc::{boxed::Box, vec::Vec};
9use bevy_ptr::{OwningPtr, Unaligned};
10use core::{
11    fmt::Debug,
12    mem::{size_of, MaybeUninit},
13    panic::AssertUnwindSafe,
14    ptr::{addr_of_mut, NonNull},
15};
16use log::warn;
17
18struct CommandMeta {
19    /// SAFETY: The `value` must point to a value of type `T: Command`,
20    /// where `T` is some specific type that was used to produce this metadata.
21    ///
22    /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
23    ///
24    /// Advances `cursor` by the size of `T` in bytes.
25    consume_command_and_get_size:
26        unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
27}
28
29/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
30// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
31// as an optimization. Since commands are used frequently in systems as a way to spawn
32// entities/components/resources, and it's not currently possible to parallelize these
33// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
34// preferred to simplicity of implementation.
35pub struct CommandQueue {
36    // This buffer densely stores all queued commands.
37    //
38    // For each command, one `CommandMeta` is stored, followed by zero or more bytes
39    // to store the command itself. To interpret these bytes, a pointer must
40    // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
41    pub(crate) bytes: Vec<MaybeUninit<u8>>,
42    pub(crate) cursor: usize,
43    pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
44    #[cfg(feature = "track_location")]
45    pub(crate) caller: MaybeLocation,
46}
47
48impl Default for CommandQueue {
49    #[track_caller]
50    fn default() -> Self {
51        Self {
52            bytes: Default::default(),
53            cursor: Default::default(),
54            panic_recovery: Default::default(),
55            #[cfg(feature = "track_location")]
56            caller: MaybeLocation::caller(),
57        }
58    }
59}
60
61/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
62/// partially applying the world's command queue recursively
63#[derive(Clone)]
64pub(crate) struct RawCommandQueue {
65    pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
66    pub(crate) cursor: NonNull<usize>,
67    pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
68}
69
70// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
71// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
72// which gets extremely verbose very quickly, while also providing no useful information.
73// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
74// So instead, the manual impl just prints the length of vec.
75impl Debug for CommandQueue {
76    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
77        let mut binding = f.debug_struct("CommandQueue");
78        binding.field("len_bytes", &self.bytes.len());
79
80        #[cfg(feature = "track_location")]
81        binding.field("caller", &self.caller.into_option());
82
83        binding.finish_non_exhaustive()
84    }
85}
86
87// SAFETY: All commands [`Command`] implement [`Send`]
88unsafe impl Send for CommandQueue {}
89
90// SAFETY: `&CommandQueue` never gives access to the inner commands.
91unsafe impl Sync for CommandQueue {}
92
93impl CommandQueue {
94    /// Push a [`Command`] onto the queue.
95    #[inline]
96    pub fn push(&mut self, command: impl Command) {
97        // SAFETY: self is guaranteed to live for the lifetime of this method
98        unsafe {
99            self.get_raw().push(command);
100        }
101    }
102
103    /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
104    /// This clears the queue.
105    #[inline]
106    pub fn apply(&mut self, world: &mut World) {
107        // flush the previously queued entities
108        world.flush_entities();
109
110        // flush the world's internal queue
111        world.flush_commands();
112
113        // SAFETY: A reference is always a valid pointer
114        unsafe {
115            self.get_raw().apply_or_drop_queued(Some(world.into()));
116        }
117    }
118
119    /// Take all commands from `other` and append them to `self`, leaving `other` empty
120    pub fn append(&mut self, other: &mut CommandQueue) {
121        self.bytes.append(&mut other.bytes);
122    }
123
124    /// Returns false if there are any commands in the queue
125    #[inline]
126    pub fn is_empty(&self) -> bool {
127        self.cursor >= self.bytes.len()
128    }
129
130    /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
131    pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
132        // SAFETY: self is always valid memory
133        unsafe {
134            RawCommandQueue {
135                bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
136                cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
137                panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
138            }
139        }
140    }
141}
142
143impl RawCommandQueue {
144    /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
145    pub(crate) fn new() -> Self {
146        // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
147        unsafe {
148            Self {
149                bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
150                cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
151                panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
152            }
153        }
154    }
155
156    /// Returns true if the queue is empty.
157    ///
158    /// # Safety
159    ///
160    /// * Caller ensures that `bytes` and `cursor` point to valid memory
161    pub unsafe fn is_empty(&self) -> bool {
162        // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
163        (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
164    }
165
166    /// Push a [`Command`] onto the queue.
167    ///
168    /// # Safety
169    ///
170    /// * Caller ensures that `self` has not outlived the underlying queue
171    #[inline]
172    pub unsafe fn push<C: Command>(&mut self, command: C) {
173        // Stores a command alongside its metadata.
174        // `repr(C)` prevents the compiler from reordering the fields,
175        // while `repr(packed)` prevents the compiler from inserting padding bytes.
176        #[repr(C, packed)]
177        struct Packed<C: Command> {
178            meta: CommandMeta,
179            command: C,
180        }
181
182        let meta = CommandMeta {
183            consume_command_and_get_size: |command, world, cursor| {
184                *cursor += size_of::<C>();
185
186                // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
187                // `command` must point to a value of type `C`.
188                let command: C = unsafe { command.read_unaligned() };
189                match world {
190                    // Apply command to the provided world...
191                    Some(mut world) => {
192                        // SAFETY: Caller ensures pointer is not null
193                        let world = unsafe { world.as_mut() };
194                        command.apply(world);
195                        // The command may have queued up world commands, which we flush here to ensure they are also picked up.
196                        // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
197                        // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
198                        world.flush();
199                    }
200                    // ...or discard it.
201                    None => drop(command),
202                }
203            },
204        };
205
206        // SAFETY: There are no outstanding references to self.bytes
207        let bytes = unsafe { self.bytes.as_mut() };
208
209        let old_len = bytes.len();
210
211        // Reserve enough bytes for both the metadata and the command itself.
212        bytes.reserve(size_of::<Packed<C>>());
213
214        // Pointer to the bytes at the end of the buffer.
215        // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
216        let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
217
218        // Write the metadata into the buffer, followed by the command.
219        // We are using a packed struct to write them both as one operation.
220        // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
221        // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
222        // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
223        unsafe {
224            ptr.cast::<Packed<C>>()
225                .write_unaligned(Packed { meta, command });
226        }
227
228        // Extend the length of the buffer to include the data we just wrote.
229        // SAFETY: The new length is guaranteed to fit in the vector's capacity,
230        // due to the call to `.reserve()` above.
231        unsafe {
232            bytes.set_len(old_len + size_of::<Packed<C>>());
233        }
234    }
235
236    /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
237    /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
238    /// This clears the queue.
239    ///
240    /// # Safety
241    ///
242    /// * Caller ensures that `self` has not outlived the underlying queue
243    #[inline]
244    pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
245        // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
246        // If this is not the command queue on world we have exclusive ownership and self will not be mutated
247        let start = *self.cursor.as_ref();
248        let stop = self.bytes.as_ref().len();
249        let mut local_cursor = start;
250        // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
251        // the remaining commands currently in this list. This is safe.
252        *self.cursor.as_mut() = stop;
253
254        while local_cursor < stop {
255            // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
256            // Since we know that the cursor is in bounds, it must point to the start of a new command.
257            let meta = unsafe {
258                self.bytes
259                    .as_mut()
260                    .as_mut_ptr()
261                    .add(local_cursor)
262                    .cast::<CommandMeta>()
263                    .read_unaligned()
264            };
265
266            // Advance to the bytes just after `meta`, which represent a type-erased command.
267            local_cursor += size_of::<CommandMeta>();
268            // Construct an owned pointer to the command.
269            // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
270            // guarantees that nothing stored in the buffer will get observed after this function ends.
271            // `cmd` points to a valid address of a stored command, so it must be non-null.
272            let cmd = unsafe {
273                OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
274                    self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
275                ))
276            };
277            let f = AssertUnwindSafe(|| {
278                // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
279                // since they were stored next to each other by `.push()`.
280                // For ZSTs, the type doesn't matter as long as the pointer is non-null.
281                // This also advances the cursor past the command. For ZSTs, the cursor will not move.
282                // At this point, it will either point to the next `CommandMeta`,
283                // or the cursor will be out of bounds and the loop will end.
284                unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
285            });
286
287            #[cfg(feature = "std")]
288            {
289                let result = std::panic::catch_unwind(f);
290
291                if let Err(payload) = result {
292                    // local_cursor now points to the location _after_ the panicked command.
293                    // Add the remaining commands that _would have_ been applied to the
294                    // panic_recovery queue.
295                    //
296                    // This uses `current_stop` instead of `stop` to account for any commands
297                    // that were queued _during_ this panic.
298                    //
299                    // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
300                    // an applied Command, the correct command order will be retained.
301                    let panic_recovery = self.panic_recovery.as_mut();
302                    let bytes = self.bytes.as_mut();
303                    let current_stop = bytes.len();
304                    panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
305                    bytes.set_len(start);
306                    *self.cursor.as_mut() = start;
307
308                    // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
309                    // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
310                    // until we reach the top.
311                    if start == 0 {
312                        bytes.append(panic_recovery);
313                    }
314                    std::panic::resume_unwind(payload);
315                }
316            }
317
318            #[cfg(not(feature = "std"))]
319            (f)();
320        }
321
322        // Reset the buffer: all commands past the original `start` cursor have been applied.
323        // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
324        // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
325        unsafe {
326            self.bytes.as_mut().set_len(start);
327            *self.cursor.as_mut() = start;
328        };
329    }
330}
331
332impl Drop for CommandQueue {
333    fn drop(&mut self) {
334        if !self.bytes.is_empty() {
335            #[cfg(feature = "track_location")]
336            warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{:?}",self.caller.into_option());
337            #[cfg(not(feature = "track_location"))]
338            warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
339        }
340        // SAFETY: A reference is always a valid pointer
341        unsafe { self.get_raw().apply_or_drop_queued(None) };
342    }
343}
344
345impl SystemBuffer for CommandQueue {
346    #[inline]
347    fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
348        #[cfg(feature = "trace")]
349        let _span_guard = _system_meta.commands_span.enter();
350        self.apply(world);
351    }
352
353    #[inline]
354    fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
355        world.commands().append(self);
356    }
357}
358
359#[cfg(test)]
360mod test {
361    use super::*;
362    use crate::{component::Component, resource::Resource};
363    use alloc::{borrow::ToOwned, string::String, sync::Arc};
364    use core::{
365        panic::AssertUnwindSafe,
366        sync::atomic::{AtomicU32, Ordering},
367    };
368
369    #[cfg(miri)]
370    use alloc::format;
371
372    struct DropCheck(Arc<AtomicU32>);
373
374    impl DropCheck {
375        fn new() -> (Self, Arc<AtomicU32>) {
376            let drops = Arc::new(AtomicU32::new(0));
377            (Self(drops.clone()), drops)
378        }
379    }
380
381    impl Drop for DropCheck {
382        fn drop(&mut self) {
383            self.0.fetch_add(1, Ordering::Relaxed);
384        }
385    }
386
387    impl Command for DropCheck {
388        fn apply(self, _: &mut World) {}
389    }
390
391    #[test]
392    fn test_command_queue_inner_drop() {
393        let mut queue = CommandQueue::default();
394
395        let (dropcheck_a, drops_a) = DropCheck::new();
396        let (dropcheck_b, drops_b) = DropCheck::new();
397
398        queue.push(dropcheck_a);
399        queue.push(dropcheck_b);
400
401        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
402        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
403
404        let mut world = World::new();
405        queue.apply(&mut world);
406
407        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
408        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
409    }
410
411    /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
412    /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
413    #[test]
414    fn test_command_queue_inner_drop_early() {
415        let mut queue = CommandQueue::default();
416
417        let (dropcheck_a, drops_a) = DropCheck::new();
418        let (dropcheck_b, drops_b) = DropCheck::new();
419
420        queue.push(dropcheck_a);
421        queue.push(dropcheck_b);
422
423        assert_eq!(drops_a.load(Ordering::Relaxed), 0);
424        assert_eq!(drops_b.load(Ordering::Relaxed), 0);
425
426        drop(queue);
427
428        assert_eq!(drops_a.load(Ordering::Relaxed), 1);
429        assert_eq!(drops_b.load(Ordering::Relaxed), 1);
430    }
431
432    #[derive(Component)]
433    struct A;
434
435    struct SpawnCommand;
436
437    impl Command for SpawnCommand {
438        fn apply(self, world: &mut World) {
439            world.spawn(A);
440        }
441    }
442
443    #[test]
444    fn test_command_queue_inner() {
445        let mut queue = CommandQueue::default();
446
447        queue.push(SpawnCommand);
448        queue.push(SpawnCommand);
449
450        let mut world = World::new();
451        queue.apply(&mut world);
452
453        assert_eq!(world.query::<&A>().query(&world).count(), 2);
454
455        // The previous call to `apply` cleared the queue.
456        // This call should do nothing.
457        queue.apply(&mut world);
458        assert_eq!(world.query::<&A>().query(&world).count(), 2);
459    }
460
461    #[expect(
462        dead_code,
463        reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
464    )]
465    struct PanicCommand(String);
466    impl Command for PanicCommand {
467        fn apply(self, _: &mut World) {
468            panic!("command is panicking");
469        }
470    }
471
472    #[test]
473    fn test_command_queue_inner_panic_safe() {
474        std::panic::set_hook(Box::new(|_| {}));
475
476        let mut queue = CommandQueue::default();
477
478        queue.push(PanicCommand("I panic!".to_owned()));
479        queue.push(SpawnCommand);
480
481        let mut world = World::new();
482
483        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
484            queue.apply(&mut world);
485        }));
486
487        // Even though the first command panicked, it's still ok to push
488        // more commands.
489        queue.push(SpawnCommand);
490        queue.push(SpawnCommand);
491        queue.apply(&mut world);
492        assert_eq!(world.query::<&A>().query(&world).count(), 3);
493    }
494
495    #[test]
496    fn test_command_queue_inner_nested_panic_safe() {
497        std::panic::set_hook(Box::new(|_| {}));
498
499        #[derive(Resource, Default)]
500        struct Order(Vec<usize>);
501
502        let mut world = World::new();
503        world.init_resource::<Order>();
504
505        fn add_index(index: usize) -> impl Command {
506            move |world: &mut World| world.resource_mut::<Order>().0.push(index)
507        }
508        world.commands().queue(add_index(1));
509        world.commands().queue(|world: &mut World| {
510            world.commands().queue(add_index(2));
511            world.commands().queue(PanicCommand("I panic!".to_owned()));
512            world.commands().queue(add_index(3));
513            world.flush_commands();
514        });
515        world.commands().queue(add_index(4));
516
517        let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
518            world.flush_commands();
519        }));
520
521        world.commands().queue(add_index(5));
522        world.flush_commands();
523        assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
524    }
525
526    // NOTE: `CommandQueue` is `Send` because `Command` is send.
527    // If the `Command` trait gets reworked to be non-send, `CommandQueue`
528    // should be reworked.
529    // This test asserts that Command types are send.
530    fn assert_is_send_impl(_: impl Send) {}
531    fn assert_is_send(command: impl Command) {
532        assert_is_send_impl(command);
533    }
534
535    #[test]
536    fn test_command_is_send() {
537        assert_is_send(SpawnCommand);
538    }
539
540    #[expect(
541        dead_code,
542        reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
543    )]
544    struct CommandWithPadding(u8, u16);
545    impl Command for CommandWithPadding {
546        fn apply(self, _: &mut World) {}
547    }
548
549    #[cfg(miri)]
550    #[test]
551    fn test_uninit_bytes() {
552        let mut queue = CommandQueue::default();
553        queue.push(CommandWithPadding(0, 0));
554        let _ = format!("{:?}", queue.bytes);
555    }
556}