bevy_ecs/schedule/executor/
mod.rs

1#[cfg(feature = "std")]
2mod multi_threaded;
3mod single_threaded;
4
5use alloc::{vec, vec::Vec};
6use bevy_utils::prelude::DebugName;
7use core::any::TypeId;
8
9pub use self::single_threaded::SingleThreadedExecutor;
10
11#[cfg(feature = "std")]
12pub use self::multi_threaded::{MainThreadExecutor, MultiThreadedExecutor};
13
14use fixedbitset::FixedBitSet;
15
16use crate::{
17    change_detection::{CheckChangeTicks, Tick},
18    error::{BevyError, ErrorContext, Result},
19    prelude::{IntoSystemSet, SystemSet},
20    query::FilteredAccessSet,
21    schedule::{
22        ConditionWithAccess, InternedSystemSet, SystemKey, SystemSetKey, SystemTypeSet,
23        SystemWithAccess,
24    },
25    system::{RunSystemError, System, SystemIn, SystemParamValidationError, SystemStateFlags},
26    world::{unsafe_world_cell::UnsafeWorldCell, DeferredWorld, World},
27};
28
29/// Types that can run a [`SystemSchedule`] on a [`World`].
30pub(super) trait SystemExecutor: Send + Sync {
31    fn kind(&self) -> ExecutorKind;
32    fn init(&mut self, schedule: &SystemSchedule);
33    fn run(
34        &mut self,
35        schedule: &mut SystemSchedule,
36        world: &mut World,
37        skip_systems: Option<&FixedBitSet>,
38        error_handler: fn(BevyError, ErrorContext),
39    );
40    fn set_apply_final_deferred(&mut self, value: bool);
41}
42
43/// Specifies how a [`Schedule`](super::Schedule) will be run.
44///
45/// The default depends on the target platform:
46///  - [`SingleThreaded`](ExecutorKind::SingleThreaded) on Wasm.
47///  - [`MultiThreaded`](ExecutorKind::MultiThreaded) everywhere else.
48#[derive(PartialEq, Eq, Default, Debug, Copy, Clone)]
49pub enum ExecutorKind {
50    /// Runs the schedule using a single thread.
51    ///
52    /// Useful if you're dealing with a single-threaded environment, saving your threads for
53    /// other things, or just trying minimize overhead.
54    #[cfg_attr(
55        any(
56            target_arch = "wasm32",
57            not(feature = "std"),
58            not(feature = "multi_threaded")
59        ),
60        default
61    )]
62    SingleThreaded,
63    /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
64    #[cfg(feature = "std")]
65    #[cfg_attr(all(not(target_arch = "wasm32"), feature = "multi_threaded"), default)]
66    MultiThreaded,
67}
68
69/// Holds systems and conditions of a [`Schedule`](super::Schedule) sorted in topological order
70/// (along with dependency information for `multi_threaded` execution).
71///
72/// Since the arrays are sorted in the same order, elements are referenced by their index.
73/// [`FixedBitSet`] is used as a smaller, more efficient substitute of `HashSet<usize>`.
74#[derive(Default)]
75pub struct SystemSchedule {
76    /// List of system node ids.
77    pub(super) system_ids: Vec<SystemKey>,
78    /// Indexed by system node id.
79    pub(super) systems: Vec<SystemWithAccess>,
80    /// Indexed by system node id.
81    pub(super) system_conditions: Vec<Vec<ConditionWithAccess>>,
82    /// Indexed by system node id.
83    /// Number of systems that the system immediately depends on.
84    #[cfg_attr(
85        not(feature = "std"),
86        expect(dead_code, reason = "currently only used with the std feature")
87    )]
88    pub(super) system_dependencies: Vec<usize>,
89    /// Indexed by system node id.
90    /// List of systems that immediately depend on the system.
91    #[cfg_attr(
92        not(feature = "std"),
93        expect(dead_code, reason = "currently only used with the std feature")
94    )]
95    pub(super) system_dependents: Vec<Vec<usize>>,
96    /// Indexed by system node id.
97    /// List of sets containing the system that have conditions
98    pub(super) sets_with_conditions_of_systems: Vec<FixedBitSet>,
99    /// List of system set node ids.
100    pub(super) set_ids: Vec<SystemSetKey>,
101    /// Indexed by system set node id.
102    pub(super) set_conditions: Vec<Vec<ConditionWithAccess>>,
103    /// Indexed by system set node id.
104    /// List of systems that are in sets that have conditions.
105    ///
106    /// If a set doesn't run because of its conditions, this is used to skip all systems in it.
107    pub(super) systems_in_sets_with_conditions: Vec<FixedBitSet>,
108}
109
110impl SystemSchedule {
111    /// Creates an empty [`SystemSchedule`].
112    pub const fn new() -> Self {
113        Self {
114            systems: Vec::new(),
115            system_conditions: Vec::new(),
116            set_conditions: Vec::new(),
117            system_ids: Vec::new(),
118            set_ids: Vec::new(),
119            system_dependencies: Vec::new(),
120            system_dependents: Vec::new(),
121            sets_with_conditions_of_systems: Vec::new(),
122            systems_in_sets_with_conditions: Vec::new(),
123        }
124    }
125}
126
127/// A special [`System`] that instructs the executor to call
128/// [`System::apply_deferred`] on the systems that have run but not applied
129/// their [`Deferred`] system parameters (like [`Commands`]) or other system buffers.
130///
131/// ## Scheduling
132///
133/// `ApplyDeferred` systems are scheduled *by default*
134/// - later in the same schedule run (for example, if a system with `Commands` param
135///   is scheduled in `Update`, all the changes will be visible in `PostUpdate`)
136/// - between systems with dependencies if the dependency [has deferred buffers]
137///   (if system `bar` directly or indirectly depends on `foo`, and `foo` uses
138///   `Commands` param, changes to the world in `foo` will be visible in `bar`)
139///
140/// ## Notes
141/// - This system (currently) does nothing if it's called manually or wrapped
142///   inside a [`PipeSystem`].
143/// - Modifying a [`Schedule`] may change the order buffers are applied.
144///
145/// [`System::apply_deferred`]: crate::system::System::apply_deferred
146/// [`Deferred`]: crate::system::Deferred
147/// [`Commands`]: crate::prelude::Commands
148/// [has deferred buffers]: crate::system::System::has_deferred
149/// [`PipeSystem`]: crate::system::PipeSystem
150/// [`Schedule`]: super::Schedule
151#[doc(alias = "apply_system_buffers")]
152pub struct ApplyDeferred;
153
154/// Returns `true` if the [`System`] is an instance of [`ApplyDeferred`].
155pub(super) fn is_apply_deferred(system: &dyn System<In = (), Out = ()>) -> bool {
156    system.type_id() == TypeId::of::<ApplyDeferred>()
157}
158
159impl System for ApplyDeferred {
160    type In = ();
161    type Out = ();
162
163    fn name(&self) -> DebugName {
164        DebugName::borrowed("bevy_ecs::apply_deferred")
165    }
166
167    fn flags(&self) -> SystemStateFlags {
168        // non-send , exclusive , no deferred
169        SystemStateFlags::NON_SEND | SystemStateFlags::EXCLUSIVE
170    }
171
172    unsafe fn run_unsafe(
173        &mut self,
174        _input: SystemIn<'_, Self>,
175        _world: UnsafeWorldCell,
176    ) -> Result<Self::Out, RunSystemError> {
177        // This system does nothing on its own. The executor will apply deferred
178        // commands from other systems instead of running this system.
179        Ok(())
180    }
181
182    #[cfg(feature = "hotpatching")]
183    #[inline]
184    fn refresh_hotpatch(&mut self) {}
185
186    fn run(
187        &mut self,
188        _input: SystemIn<'_, Self>,
189        _world: &mut World,
190    ) -> Result<Self::Out, RunSystemError> {
191        // This system does nothing on its own. The executor will apply deferred
192        // commands from other systems instead of running this system.
193        Ok(())
194    }
195
196    fn apply_deferred(&mut self, _world: &mut World) {}
197
198    fn queue_deferred(&mut self, _world: DeferredWorld) {}
199
200    unsafe fn validate_param_unsafe(
201        &mut self,
202        _world: UnsafeWorldCell,
203    ) -> Result<(), SystemParamValidationError> {
204        // This system is always valid to run because it doesn't do anything,
205        // and only used as a marker for the executor.
206        Ok(())
207    }
208
209    fn initialize(&mut self, _world: &mut World) -> FilteredAccessSet {
210        FilteredAccessSet::new()
211    }
212
213    fn check_change_tick(&mut self, _check: CheckChangeTicks) {}
214
215    fn default_system_sets(&self) -> Vec<InternedSystemSet> {
216        vec![SystemTypeSet::<Self>::new().intern()]
217    }
218
219    fn get_last_run(&self) -> Tick {
220        // This system is never run, so it has no last run tick.
221        Tick::MAX
222    }
223
224    fn set_last_run(&mut self, _last_run: Tick) {}
225}
226
227impl IntoSystemSet<()> for ApplyDeferred {
228    type Set = SystemTypeSet<Self>;
229
230    fn into_system_set(self) -> Self::Set {
231        SystemTypeSet::<Self>::new()
232    }
233}
234
235/// These functions hide the bottom of the callstack from `RUST_BACKTRACE=1` (assuming the default panic handler is used).
236///
237/// The full callstack will still be visible with `RUST_BACKTRACE=full`.
238/// They are specialized for `System::run` & co instead of being generic over closures because this avoids an
239/// extra frame in the backtrace.
240///
241/// This is reliant on undocumented behavior in Rust's default panic handler, which checks the call stack for symbols
242/// containing the string `__rust_begin_short_backtrace` in their mangled name.
243mod __rust_begin_short_backtrace {
244    use core::hint::black_box;
245
246    #[cfg(feature = "std")]
247    use crate::world::unsafe_world_cell::UnsafeWorldCell;
248    use crate::{
249        error::Result,
250        system::{ReadOnlySystem, RunSystemError, ScheduleSystem},
251        world::World,
252    };
253
254    /// # Safety
255    /// See `System::run_unsafe`.
256    // This is only used by `MultiThreadedExecutor`, and would be dead code without `std`.
257    #[cfg(feature = "std")]
258    #[inline(never)]
259    pub(super) unsafe fn run_unsafe(
260        system: &mut ScheduleSystem,
261        world: UnsafeWorldCell,
262    ) -> Result<(), RunSystemError> {
263        // SAFETY: Upheld by caller
264        let result = unsafe { system.run_unsafe((), world) };
265        // Call `black_box` to prevent this frame from being tail-call optimized away
266        black_box(());
267        result
268    }
269
270    /// # Safety
271    /// See `ReadOnlySystem::run_unsafe`.
272    // This is only used by `MultiThreadedExecutor`, and would be dead code without `std`.
273    #[cfg(feature = "std")]
274    #[inline(never)]
275    pub(super) unsafe fn readonly_run_unsafe<O: 'static>(
276        system: &mut dyn ReadOnlySystem<In = (), Out = O>,
277        world: UnsafeWorldCell,
278    ) -> Result<O, RunSystemError> {
279        // Call `black_box` to prevent this frame from being tail-call optimized away
280        // SAFETY: Upheld by caller
281        black_box(unsafe { system.run_unsafe((), world) })
282    }
283
284    #[cfg(feature = "std")]
285    #[inline(never)]
286    pub(super) fn run(
287        system: &mut ScheduleSystem,
288        world: &mut World,
289    ) -> Result<(), RunSystemError> {
290        let result = system.run((), world);
291        // Call `black_box` to prevent this frame from being tail-call optimized away
292        black_box(());
293        result
294    }
295
296    #[inline(never)]
297    pub(super) fn run_without_applying_deferred(
298        system: &mut ScheduleSystem,
299        world: &mut World,
300    ) -> Result<(), RunSystemError> {
301        let result = system.run_without_applying_deferred((), world);
302        // Call `black_box` to prevent this frame from being tail-call optimized away
303        black_box(());
304        result
305    }
306
307    #[inline(never)]
308    pub(super) fn readonly_run<O: 'static>(
309        system: &mut dyn ReadOnlySystem<In = (), Out = O>,
310        world: &mut World,
311    ) -> Result<O, RunSystemError> {
312        // Call `black_box` to prevent this frame from being tail-call optimized away
313        black_box(system.run((), world))
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use crate::{
320        prelude::{Component, In, IntoSystem, Resource, Schedule},
321        schedule::ExecutorKind,
322        system::{Populated, Res, ResMut, Single},
323        world::World,
324    };
325
326    #[derive(Component)]
327    struct TestComponent;
328
329    const EXECUTORS: [ExecutorKind; 2] =
330        [ExecutorKind::SingleThreaded, ExecutorKind::MultiThreaded];
331
332    #[derive(Resource, Default)]
333    struct TestState {
334        populated_ran: bool,
335        single_ran: bool,
336    }
337
338    #[derive(Resource, Default)]
339    struct Counter(u8);
340
341    fn set_single_state(mut _single: Single<&TestComponent>, mut state: ResMut<TestState>) {
342        state.single_ran = true;
343    }
344
345    fn set_populated_state(
346        mut _populated: Populated<&TestComponent>,
347        mut state: ResMut<TestState>,
348    ) {
349        state.populated_ran = true;
350    }
351
352    #[test]
353    #[expect(clippy::print_stdout, reason = "std and println are allowed in tests")]
354    fn single_and_populated_skipped_and_run() {
355        for executor in EXECUTORS {
356            std::println!("Testing executor: {executor:?}");
357
358            let mut world = World::new();
359            world.init_resource::<TestState>();
360
361            let mut schedule = Schedule::default();
362            schedule.set_executor_kind(executor);
363            schedule.add_systems((set_single_state, set_populated_state));
364            schedule.run(&mut world);
365
366            let state = world.get_resource::<TestState>().unwrap();
367            assert!(!state.single_ran);
368            assert!(!state.populated_ran);
369
370            world.spawn(TestComponent);
371
372            schedule.run(&mut world);
373            let state = world.get_resource::<TestState>().unwrap();
374            assert!(state.single_ran);
375            assert!(state.populated_ran);
376        }
377    }
378
379    fn look_for_missing_resource(_res: Res<TestState>) {}
380
381    #[test]
382    #[should_panic]
383    fn missing_resource_panics_single_threaded() {
384        let mut world = World::new();
385        let mut schedule = Schedule::default();
386
387        schedule.set_executor_kind(ExecutorKind::SingleThreaded);
388        schedule.add_systems(look_for_missing_resource);
389        schedule.run(&mut world);
390    }
391
392    #[test]
393    #[should_panic]
394    fn missing_resource_panics_multi_threaded() {
395        let mut world = World::new();
396        let mut schedule = Schedule::default();
397
398        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
399        schedule.add_systems(look_for_missing_resource);
400        schedule.run(&mut world);
401    }
402
403    #[test]
404    fn piped_systems_first_system_skipped() {
405        // This system should be skipped when run due to no matching entity
406        fn pipe_out(_single: Single<&TestComponent>) -> u8 {
407            42
408        }
409
410        fn pipe_in(_input: In<u8>, mut counter: ResMut<Counter>) {
411            counter.0 += 1;
412        }
413
414        let mut world = World::new();
415        world.init_resource::<Counter>();
416        let mut schedule = Schedule::default();
417
418        schedule.add_systems(pipe_out.pipe(pipe_in));
419        schedule.run(&mut world);
420
421        let counter = world.resource::<Counter>();
422        assert_eq!(counter.0, 0);
423    }
424
425    #[test]
426    fn piped_system_second_system_skipped() {
427        // This system will be run before the second system is validated
428        fn pipe_out(mut counter: ResMut<Counter>) -> u8 {
429            counter.0 += 1;
430            42
431        }
432
433        // This system should be skipped when run due to no matching entity
434        fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>, mut counter: ResMut<Counter>) {
435            counter.0 += 1;
436        }
437
438        let mut world = World::new();
439        world.init_resource::<Counter>();
440        let mut schedule = Schedule::default();
441
442        schedule.add_systems(pipe_out.pipe(pipe_in));
443        schedule.run(&mut world);
444        let counter = world.resource::<Counter>();
445        assert_eq!(counter.0, 1);
446    }
447
448    #[test]
449    #[should_panic]
450    fn piped_system_first_system_panics() {
451        // This system should panic when run because the resource is missing
452        fn pipe_out(_res: Res<TestState>) -> u8 {
453            42
454        }
455
456        fn pipe_in(_input: In<u8>) {}
457
458        let mut world = World::new();
459        let mut schedule = Schedule::default();
460
461        schedule.add_systems(pipe_out.pipe(pipe_in));
462        schedule.run(&mut world);
463    }
464
465    #[test]
466    #[should_panic]
467    fn piped_system_second_system_panics() {
468        fn pipe_out() -> u8 {
469            42
470        }
471
472        // This system should panic when run because the resource is missing
473        fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
474
475        let mut world = World::new();
476        let mut schedule = Schedule::default();
477
478        schedule.add_systems(pipe_out.pipe(pipe_in));
479        schedule.run(&mut world);
480    }
481
482    // This test runs without panicking because we've
483    // decided to use early-out behavior for piped systems
484    #[test]
485    fn piped_system_skip_and_panic() {
486        // This system should be skipped when run due to no matching entity
487        fn pipe_out(_single: Single<&TestComponent>) -> u8 {
488            42
489        }
490
491        // This system should panic when run because the resource is missing
492        fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
493
494        let mut world = World::new();
495        let mut schedule = Schedule::default();
496
497        schedule.add_systems(pipe_out.pipe(pipe_in));
498        schedule.run(&mut world);
499    }
500
501    #[test]
502    #[should_panic]
503    fn piped_system_panic_and_skip() {
504        // This system should panic when run because the resource is missing
505
506        fn pipe_out(_res: Res<TestState>) -> u8 {
507            42
508        }
509
510        // This system should be skipped when run due to no matching entity
511        fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>) {}
512
513        let mut world = World::new();
514        let mut schedule = Schedule::default();
515
516        schedule.add_systems(pipe_out.pipe(pipe_in));
517        schedule.run(&mut world);
518    }
519
520    #[test]
521    #[should_panic]
522    fn piped_system_panic_and_panic() {
523        // This system should panic when run because the resource is missing
524
525        fn pipe_out(_res: Res<TestState>) -> u8 {
526            42
527        }
528
529        // This system should panic when run because the resource is missing
530        fn pipe_in(_input: In<u8>, _res: Res<TestState>) {}
531
532        let mut world = World::new();
533        let mut schedule = Schedule::default();
534
535        schedule.add_systems(pipe_out.pipe(pipe_in));
536        schedule.run(&mut world);
537    }
538
539    #[test]
540    fn piped_system_skip_and_skip() {
541        // This system should be skipped when run due to no matching entity
542
543        fn pipe_out(_single: Single<&TestComponent>, mut counter: ResMut<Counter>) -> u8 {
544            counter.0 += 1;
545            42
546        }
547
548        // This system should be skipped when run due to no matching entity
549        fn pipe_in(_input: In<u8>, _single: Single<&TestComponent>, mut counter: ResMut<Counter>) {
550            counter.0 += 1;
551        }
552
553        let mut world = World::new();
554        world.init_resource::<Counter>();
555        let mut schedule = Schedule::default();
556
557        schedule.add_systems(pipe_out.pipe(pipe_in));
558        schedule.run(&mut world);
559
560        let counter = world.resource::<Counter>();
561        assert_eq!(counter.0, 0);
562    }
563}