bevy_ecs/world/command_queue.rs
1use crate::{
2 change_detection::MaybeLocation,
3 system::{Command, SystemBuffer, SystemMeta},
4 world::{DeferredWorld, World},
5};
6
7use alloc::{boxed::Box, vec::Vec};
8use bevy_ptr::{OwningPtr, Unaligned};
9use core::{
10 fmt::Debug,
11 mem::{size_of, MaybeUninit},
12 panic::AssertUnwindSafe,
13 ptr::{addr_of_mut, NonNull},
14};
15use log::warn;
16
17struct CommandMeta {
18 /// SAFETY: The `value` must point to a value of type `T: Command`,
19 /// where `T` is some specific type that was used to produce this metadata.
20 ///
21 /// `world` is optional to allow this one function pointer to perform double-duty as a drop.
22 ///
23 /// Advances `cursor` by the size of `T` in bytes.
24 consume_command_and_get_size:
25 unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
26}
27
28/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
29// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
30// as an optimization. Since commands are used frequently in systems as a way to spawn
31// entities/components/resources, and it's not currently possible to parallelize these
32// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
33// preferred to simplicity of implementation.
34pub struct CommandQueue {
35 // This buffer densely stores all queued commands.
36 //
37 // For each command, one `CommandMeta` is stored, followed by zero or more bytes
38 // to store the command itself. To interpret these bytes, a pointer must
39 // be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
40 pub(crate) bytes: Vec<MaybeUninit<u8>>,
41 pub(crate) cursor: usize,
42 pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
43 pub(crate) caller: MaybeLocation,
44}
45
46impl Default for CommandQueue {
47 #[track_caller]
48 fn default() -> Self {
49 Self {
50 bytes: Default::default(),
51 cursor: Default::default(),
52 panic_recovery: Default::default(),
53 caller: MaybeLocation::caller(),
54 }
55 }
56}
57
58/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
59/// partially applying the world's command queue recursively
60#[derive(Clone)]
61pub(crate) struct RawCommandQueue {
62 pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
63 pub(crate) cursor: NonNull<usize>,
64 pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
65}
66
67// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
68// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
69// which gets extremely verbose very quickly, while also providing no useful information.
70// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
71// So instead, the manual impl just prints the length of vec.
72impl Debug for CommandQueue {
73 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74 f.debug_struct("CommandQueue")
75 .field("len_bytes", &self.bytes.len())
76 .field("caller", &self.caller)
77 .finish_non_exhaustive()
78 }
79}
80
81// SAFETY: All commands [`Command`] implement [`Send`]
82unsafe impl Send for CommandQueue {}
83
84// SAFETY: `&CommandQueue` never gives access to the inner commands.
85unsafe impl Sync for CommandQueue {}
86
87impl CommandQueue {
88 /// Push a [`Command`] onto the queue.
89 #[inline]
90 pub fn push(&mut self, command: impl Command) {
91 // SAFETY: self is guaranteed to live for the lifetime of this method
92 unsafe {
93 self.get_raw().push(command);
94 }
95 }
96
97 /// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
98 /// This clears the queue.
99 #[inline]
100 pub fn apply(&mut self, world: &mut World) {
101 // flush the world's internal queue
102 world.flush_commands();
103
104 // SAFETY: A reference is always a valid pointer
105 unsafe {
106 self.get_raw().apply_or_drop_queued(Some(world.into()));
107 }
108 }
109
110 /// Take all commands from `other` and append them to `self`, leaving `other` empty
111 pub fn append(&mut self, other: &mut CommandQueue) {
112 self.bytes.append(&mut other.bytes);
113 }
114
115 /// Returns false if there are any commands in the queue
116 #[inline]
117 pub fn is_empty(&self) -> bool {
118 self.cursor >= self.bytes.len()
119 }
120
121 /// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
122 pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
123 // SAFETY: self is always valid memory
124 unsafe {
125 RawCommandQueue {
126 bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
127 cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
128 panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
129 }
130 }
131 }
132}
133
134impl RawCommandQueue {
135 /// Returns a new `RawCommandQueue` instance, this must be manually dropped.
136 pub(crate) fn new() -> Self {
137 // SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
138 unsafe {
139 Self {
140 bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
141 cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
142 panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
143 }
144 }
145 }
146
147 /// Returns true if the queue is empty.
148 ///
149 /// # Safety
150 ///
151 /// * Caller ensures that `bytes` and `cursor` point to valid memory
152 pub unsafe fn is_empty(&self) -> bool {
153 // SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
154 (unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
155 }
156
157 /// Push a [`Command`] onto the queue.
158 ///
159 /// # Safety
160 ///
161 /// * Caller ensures that `self` has not outlived the underlying queue
162 #[inline]
163 pub unsafe fn push<C: Command>(&mut self, command: C) {
164 // Stores a command alongside its metadata.
165 // `repr(C)` prevents the compiler from reordering the fields,
166 // while `repr(packed)` prevents the compiler from inserting padding bytes.
167 #[repr(C, packed)]
168 struct Packed<C: Command> {
169 meta: CommandMeta,
170 command: C,
171 }
172
173 let meta = CommandMeta {
174 consume_command_and_get_size: |command, world, cursor| {
175 *cursor += size_of::<C>();
176
177 // SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
178 // `command` must point to a value of type `C`.
179 let command: C = unsafe { command.read_unaligned() };
180 match world {
181 // Apply command to the provided world...
182 Some(mut world) => {
183 // SAFETY: Caller ensures pointer is not null
184 let world = unsafe { world.as_mut() };
185 command.apply(world);
186 // The command may have queued up world commands, which we flush here to ensure they are also picked up.
187 // If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
188 // is still at the current `stop`, ensuring only the newly queued Commands will be applied.
189 world.flush();
190 }
191 // ...or discard it.
192 None => drop(command),
193 }
194 },
195 };
196
197 // SAFETY: There are no outstanding references to self.bytes
198 let bytes = unsafe { self.bytes.as_mut() };
199
200 let old_len = bytes.len();
201
202 // Reserve enough bytes for both the metadata and the command itself.
203 bytes.reserve(size_of::<Packed<C>>());
204
205 // Pointer to the bytes at the end of the buffer.
206 // SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
207 let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
208
209 // Write the metadata into the buffer, followed by the command.
210 // We are using a packed struct to write them both as one operation.
211 // SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
212 // The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
213 // and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
214 unsafe {
215 ptr.cast::<Packed<C>>()
216 .write_unaligned(Packed { meta, command });
217 }
218
219 // Extend the length of the buffer to include the data we just wrote.
220 // SAFETY: The new length is guaranteed to fit in the vector's capacity,
221 // due to the call to `.reserve()` above.
222 unsafe {
223 bytes.set_len(old_len + size_of::<Packed<C>>());
224 }
225 }
226
227 /// If `world` is [`Some`], this will apply the queued [commands](`Command`).
228 /// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
229 /// This clears the queue.
230 ///
231 /// # Safety
232 ///
233 /// * Caller ensures that `self` has not outlived the underlying queue
234 #[inline]
235 pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
236 // SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
237 // If this is not the command queue on world we have exclusive ownership and self will not be mutated
238 let start = *self.cursor.as_ref();
239 let stop = self.bytes.as_ref().len();
240 let mut local_cursor = start;
241 // SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
242 // the remaining commands currently in this list. This is safe.
243 *self.cursor.as_mut() = stop;
244
245 while local_cursor < stop {
246 // SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
247 // Since we know that the cursor is in bounds, it must point to the start of a new command.
248 let meta = unsafe {
249 self.bytes
250 .as_mut()
251 .as_mut_ptr()
252 .add(local_cursor)
253 .cast::<CommandMeta>()
254 .read_unaligned()
255 };
256
257 // Advance to the bytes just after `meta`, which represent a type-erased command.
258 local_cursor += size_of::<CommandMeta>();
259 // Construct an owned pointer to the command.
260 // SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
261 // guarantees that nothing stored in the buffer will get observed after this function ends.
262 // `cmd` points to a valid address of a stored command, so it must be non-null.
263 let cmd = unsafe {
264 OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
265 self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
266 ))
267 };
268 let f = AssertUnwindSafe(|| {
269 // SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
270 // since they were stored next to each other by `.push()`.
271 // For ZSTs, the type doesn't matter as long as the pointer is non-null.
272 // This also advances the cursor past the command. For ZSTs, the cursor will not move.
273 // At this point, it will either point to the next `CommandMeta`,
274 // or the cursor will be out of bounds and the loop will end.
275 unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
276 });
277
278 #[cfg(feature = "std")]
279 {
280 let result = std::panic::catch_unwind(f);
281
282 if let Err(payload) = result {
283 // local_cursor now points to the location _after_ the panicked command.
284 // Add the remaining commands that _would have_ been applied to the
285 // panic_recovery queue.
286 //
287 // This uses `current_stop` instead of `stop` to account for any commands
288 // that were queued _during_ this panic.
289 //
290 // This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
291 // an applied Command, the correct command order will be retained.
292 let panic_recovery = self.panic_recovery.as_mut();
293 let bytes = self.bytes.as_mut();
294 let current_stop = bytes.len();
295 panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
296 bytes.set_len(start);
297 *self.cursor.as_mut() = start;
298
299 // This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
300 // when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
301 // until we reach the top.
302 if start == 0 {
303 bytes.append(panic_recovery);
304 }
305 std::panic::resume_unwind(payload);
306 }
307 }
308
309 #[cfg(not(feature = "std"))]
310 (f)();
311 }
312
313 // Reset the buffer: all commands past the original `start` cursor have been applied.
314 // SAFETY: we are setting the length of bytes to the original length, minus the length of the original
315 // list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
316 unsafe {
317 self.bytes.as_mut().set_len(start);
318 *self.cursor.as_mut() = start;
319 };
320 }
321}
322
323impl Drop for CommandQueue {
324 fn drop(&mut self) {
325 if !self.bytes.is_empty() {
326 if let Some(caller) = self.caller.into_option() {
327 warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{caller:?}");
328 } else {
329 warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
330 }
331 }
332 // SAFETY: A reference is always a valid pointer
333 unsafe { self.get_raw().apply_or_drop_queued(None) };
334 }
335}
336
337impl SystemBuffer for CommandQueue {
338 #[inline]
339 fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
340 #[cfg(feature = "trace")]
341 let _span_guard = _system_meta.commands_span.enter();
342 self.apply(world);
343 }
344
345 #[inline]
346 fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
347 world.commands().append(self);
348 }
349}
350
351#[cfg(test)]
352mod test {
353 use super::*;
354 use crate::{component::Component, resource::Resource};
355 use alloc::{borrow::ToOwned, string::String, sync::Arc};
356 use core::{
357 panic::AssertUnwindSafe,
358 sync::atomic::{AtomicU32, Ordering},
359 };
360
361 #[cfg(miri)]
362 use alloc::format;
363
364 struct DropCheck(Arc<AtomicU32>);
365
366 impl DropCheck {
367 fn new() -> (Self, Arc<AtomicU32>) {
368 let drops = Arc::new(AtomicU32::new(0));
369 (Self(drops.clone()), drops)
370 }
371 }
372
373 impl Drop for DropCheck {
374 fn drop(&mut self) {
375 self.0.fetch_add(1, Ordering::Relaxed);
376 }
377 }
378
379 impl Command for DropCheck {
380 fn apply(self, _: &mut World) {}
381 }
382
383 #[test]
384 fn test_command_queue_inner_drop() {
385 let mut queue = CommandQueue::default();
386
387 let (dropcheck_a, drops_a) = DropCheck::new();
388 let (dropcheck_b, drops_b) = DropCheck::new();
389
390 queue.push(dropcheck_a);
391 queue.push(dropcheck_b);
392
393 assert_eq!(drops_a.load(Ordering::Relaxed), 0);
394 assert_eq!(drops_b.load(Ordering::Relaxed), 0);
395
396 let mut world = World::new();
397 queue.apply(&mut world);
398
399 assert_eq!(drops_a.load(Ordering::Relaxed), 1);
400 assert_eq!(drops_b.load(Ordering::Relaxed), 1);
401 }
402
403 /// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
404 /// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
405 #[test]
406 fn test_command_queue_inner_drop_early() {
407 let mut queue = CommandQueue::default();
408
409 let (dropcheck_a, drops_a) = DropCheck::new();
410 let (dropcheck_b, drops_b) = DropCheck::new();
411
412 queue.push(dropcheck_a);
413 queue.push(dropcheck_b);
414
415 assert_eq!(drops_a.load(Ordering::Relaxed), 0);
416 assert_eq!(drops_b.load(Ordering::Relaxed), 0);
417
418 drop(queue);
419
420 assert_eq!(drops_a.load(Ordering::Relaxed), 1);
421 assert_eq!(drops_b.load(Ordering::Relaxed), 1);
422 }
423
424 #[derive(Component)]
425 struct A;
426
427 struct SpawnCommand;
428
429 impl Command for SpawnCommand {
430 fn apply(self, world: &mut World) {
431 world.spawn(A);
432 }
433 }
434
435 #[test]
436 fn test_command_queue_inner() {
437 let mut queue = CommandQueue::default();
438
439 queue.push(SpawnCommand);
440 queue.push(SpawnCommand);
441
442 let mut world = World::new();
443 queue.apply(&mut world);
444
445 assert_eq!(world.query::<&A>().query(&world).count(), 2);
446
447 // The previous call to `apply` cleared the queue.
448 // This call should do nothing.
449 queue.apply(&mut world);
450 assert_eq!(world.query::<&A>().query(&world).count(), 2);
451 }
452
453 #[expect(
454 dead_code,
455 reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
456 )]
457 struct PanicCommand(String);
458 impl Command for PanicCommand {
459 fn apply(self, _: &mut World) {
460 panic!("command is panicking");
461 }
462 }
463
464 #[test]
465 fn test_command_queue_inner_panic_safe() {
466 std::panic::set_hook(Box::new(|_| {}));
467
468 let mut queue = CommandQueue::default();
469
470 queue.push(PanicCommand("I panic!".to_owned()));
471 queue.push(SpawnCommand);
472
473 let mut world = World::new();
474
475 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
476 queue.apply(&mut world);
477 }));
478
479 // Even though the first command panicked, it's still ok to push
480 // more commands.
481 queue.push(SpawnCommand);
482 queue.push(SpawnCommand);
483 queue.apply(&mut world);
484 assert_eq!(world.query::<&A>().query(&world).count(), 3);
485 }
486
487 #[test]
488 fn test_command_queue_inner_nested_panic_safe() {
489 std::panic::set_hook(Box::new(|_| {}));
490
491 #[derive(Resource, Default)]
492 struct Order(Vec<usize>);
493
494 let mut world = World::new();
495 world.init_resource::<Order>();
496
497 fn add_index(index: usize) -> impl Command {
498 move |world: &mut World| world.resource_mut::<Order>().0.push(index)
499 }
500 world.commands().queue(add_index(1));
501 world.commands().queue(|world: &mut World| {
502 world.commands().queue(add_index(2));
503 world.commands().queue(PanicCommand("I panic!".to_owned()));
504 world.commands().queue(add_index(3));
505 world.flush_commands();
506 });
507 world.commands().queue(add_index(4));
508
509 let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
510 world.flush_commands();
511 }));
512
513 world.commands().queue(add_index(5));
514 world.flush_commands();
515 assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
516 }
517
518 // NOTE: `CommandQueue` is `Send` because `Command` is send.
519 // If the `Command` trait gets reworked to be non-send, `CommandQueue`
520 // should be reworked.
521 // This test asserts that Command types are send.
522 fn assert_is_send_impl(_: impl Send) {}
523 fn assert_is_send(command: impl Command) {
524 assert_is_send_impl(command);
525 }
526
527 #[test]
528 fn test_command_is_send() {
529 assert_is_send(SpawnCommand);
530 }
531
532 #[expect(
533 dead_code,
534 reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
535 )]
536 struct CommandWithPadding(u8, u16);
537 impl Command for CommandWithPadding {
538 fn apply(self, _: &mut World) {}
539 }
540
541 #[cfg(miri)]
542 #[test]
543 fn test_uninit_bytes() {
544 let mut queue = CommandQueue::default();
545 queue.push(CommandWithPadding(0, 0));
546 let _ = format!("{:?}", queue.bytes);
547 }
548}