bevy_ecs/schedule/executor/
multi_threaded.rs

1use alloc::{boxed::Box, vec::Vec};
2use bevy_platform::sync::Arc;
3use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
4use bevy_utils::{default, syncunsafecell::SyncUnsafeCell};
5use concurrent_queue::ConcurrentQueue;
6use core::{any::Any, panic::AssertUnwindSafe};
7use fixedbitset::FixedBitSet;
8#[cfg(feature = "std")]
9use std::eprintln;
10use std::sync::{Mutex, MutexGuard};
11
12#[cfg(feature = "trace")]
13use tracing::{info_span, Span};
14
15use crate::{
16    archetype::ArchetypeComponentId,
17    error::{default_error_handler, BevyError, ErrorContext, Result},
18    prelude::Resource,
19    query::Access,
20    schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21    system::ScheduleSystem,
22    world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use super::__rust_begin_short_backtrace;
26
27/// Borrowed data used by the [`MultiThreadedExecutor`].
28struct Environment<'env, 'sys> {
29    executor: &'env MultiThreadedExecutor,
30    systems: &'sys [SyncUnsafeCell<ScheduleSystem>],
31    conditions: SyncUnsafeCell<Conditions<'sys>>,
32    world_cell: UnsafeWorldCell<'env>,
33}
34
35struct Conditions<'a> {
36    system_conditions: &'a mut [Vec<BoxedCondition>],
37    set_conditions: &'a mut [Vec<BoxedCondition>],
38    sets_with_conditions_of_systems: &'a [FixedBitSet],
39    systems_in_sets_with_conditions: &'a [FixedBitSet],
40}
41
42impl<'env, 'sys> Environment<'env, 'sys> {
43    fn new(
44        executor: &'env MultiThreadedExecutor,
45        schedule: &'sys mut SystemSchedule,
46        world: &'env mut World,
47    ) -> Self {
48        Environment {
49            executor,
50            systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
51            conditions: SyncUnsafeCell::new(Conditions {
52                system_conditions: &mut schedule.system_conditions,
53                set_conditions: &mut schedule.set_conditions,
54                sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
55                systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
56            }),
57            world_cell: world.as_unsafe_world_cell(),
58        }
59    }
60}
61
62/// Per-system data used by the [`MultiThreadedExecutor`].
63// Copied here because it can't be read from the system when it's running.
64struct SystemTaskMetadata {
65    /// The [`ArchetypeComponentId`] access of the system.
66    archetype_component_access: Access<ArchetypeComponentId>,
67    /// Indices of the systems that directly depend on the system.
68    dependents: Vec<usize>,
69    /// Is `true` if the system does not access `!Send` data.
70    is_send: bool,
71    /// Is `true` if the system is exclusive.
72    is_exclusive: bool,
73}
74
75/// The result of running a system that is sent across a channel.
76struct SystemResult {
77    system_index: usize,
78}
79
80/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
81pub struct MultiThreadedExecutor {
82    /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
83    state: Mutex<ExecutorState>,
84    /// Queue of system completion events.
85    system_completion: ConcurrentQueue<SystemResult>,
86    /// Setting when true applies deferred system buffers after all systems have run
87    apply_final_deferred: bool,
88    /// When set, tells the executor that a thread has panicked.
89    panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
90    starting_systems: FixedBitSet,
91    /// Cached tracing span
92    #[cfg(feature = "trace")]
93    executor_span: Span,
94}
95
96/// The state of the executor while running.
97pub struct ExecutorState {
98    /// Metadata for scheduling and running system tasks.
99    system_task_metadata: Vec<SystemTaskMetadata>,
100    /// Union of the accesses of all currently running systems.
101    active_access: Access<ArchetypeComponentId>,
102    /// Returns `true` if a system with non-`Send` access is running.
103    local_thread_running: bool,
104    /// Returns `true` if an exclusive system is running.
105    exclusive_running: bool,
106    /// The number of systems that are running.
107    num_running_systems: usize,
108    /// The number of dependencies each system has that have not completed.
109    num_dependencies_remaining: Vec<usize>,
110    /// System sets whose conditions have been evaluated.
111    evaluated_sets: FixedBitSet,
112    /// Systems that have no remaining dependencies and are waiting to run.
113    ready_systems: FixedBitSet,
114    /// copy of `ready_systems`
115    ready_systems_copy: FixedBitSet,
116    /// Systems that are running.
117    running_systems: FixedBitSet,
118    /// Systems that got skipped.
119    skipped_systems: FixedBitSet,
120    /// Systems whose conditions have been evaluated and were run or skipped.
121    completed_systems: FixedBitSet,
122    /// Systems that have run but have not had their buffers applied.
123    unapplied_systems: FixedBitSet,
124}
125
126/// References to data required by the executor.
127/// This is copied to each system task so that can invoke the executor when they complete.
128// These all need to outlive 'scope in order to be sent to new tasks,
129// and keeping them all in a struct means we can use lifetime elision.
130#[derive(Copy, Clone)]
131struct Context<'scope, 'env, 'sys> {
132    environment: &'env Environment<'env, 'sys>,
133    scope: &'scope Scope<'scope, 'env, ()>,
134    error_handler: fn(BevyError, ErrorContext),
135}
136
137impl Default for MultiThreadedExecutor {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl SystemExecutor for MultiThreadedExecutor {
144    fn kind(&self) -> ExecutorKind {
145        ExecutorKind::MultiThreaded
146    }
147
148    fn init(&mut self, schedule: &SystemSchedule) {
149        let state = self.state.get_mut().unwrap();
150        // pre-allocate space
151        let sys_count = schedule.system_ids.len();
152        let set_count = schedule.set_ids.len();
153
154        self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
155        self.starting_systems = FixedBitSet::with_capacity(sys_count);
156        state.evaluated_sets = FixedBitSet::with_capacity(set_count);
157        state.ready_systems = FixedBitSet::with_capacity(sys_count);
158        state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
159        state.running_systems = FixedBitSet::with_capacity(sys_count);
160        state.completed_systems = FixedBitSet::with_capacity(sys_count);
161        state.skipped_systems = FixedBitSet::with_capacity(sys_count);
162        state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
163
164        state.system_task_metadata = Vec::with_capacity(sys_count);
165        for index in 0..sys_count {
166            state.system_task_metadata.push(SystemTaskMetadata {
167                archetype_component_access: default(),
168                dependents: schedule.system_dependents[index].clone(),
169                is_send: schedule.systems[index].is_send(),
170                is_exclusive: schedule.systems[index].is_exclusive(),
171            });
172            if schedule.system_dependencies[index] == 0 {
173                self.starting_systems.insert(index);
174            }
175        }
176
177        state.num_dependencies_remaining = Vec::with_capacity(sys_count);
178    }
179
180    fn run(
181        &mut self,
182        schedule: &mut SystemSchedule,
183        world: &mut World,
184        _skip_systems: Option<&FixedBitSet>,
185        error_handler: fn(BevyError, ErrorContext),
186    ) {
187        let state = self.state.get_mut().unwrap();
188        // reset counts
189        if schedule.systems.is_empty() {
190            return;
191        }
192        state.num_running_systems = 0;
193        state
194            .num_dependencies_remaining
195            .clone_from(&schedule.system_dependencies);
196        state.ready_systems.clone_from(&self.starting_systems);
197
198        // If stepping is enabled, make sure we skip those systems that should
199        // not be run.
200        #[cfg(feature = "bevy_debug_stepping")]
201        if let Some(skipped_systems) = _skip_systems {
202            debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203            // mark skipped systems as completed
204            state.completed_systems |= skipped_systems;
205
206            // signal the dependencies for each of the skipped systems, as
207            // though they had run
208            for system_index in skipped_systems.ones() {
209                state.signal_dependents(system_index);
210                state.ready_systems.remove(system_index);
211            }
212        }
213
214        let thread_executor = world
215            .get_resource::<MainThreadExecutor>()
216            .map(|e| e.0.clone());
217        let thread_executor = thread_executor.as_deref();
218
219        let environment = &Environment::new(self, schedule, world);
220
221        ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222            false,
223            thread_executor,
224            |scope| {
225                let context = Context {
226                    environment,
227                    scope,
228                    error_handler,
229                };
230
231                // The first tick won't need to process finished systems, but we still need to run the loop in
232                // tick_executor() in case a system completes while the first tick still holds the mutex.
233                context.tick_executor();
234            },
235        );
236
237        // End the borrows of self and world in environment by copying out the reference to systems.
238        let systems = environment.systems;
239
240        let state = self.state.get_mut().unwrap();
241        if self.apply_final_deferred {
242            // Do one final apply buffers after all systems have completed
243            // Commands should be applied while on the scope's thread, not the executor's thread
244            let res = apply_deferred(&state.unapplied_systems, systems, world);
245            if let Err(payload) = res {
246                let panic_payload = self.panic_payload.get_mut().unwrap();
247                *panic_payload = Some(payload);
248            }
249            state.unapplied_systems.clear();
250        }
251
252        // check to see if there was a panic
253        let payload = self.panic_payload.get_mut().unwrap();
254        if let Some(payload) = payload.take() {
255            std::panic::resume_unwind(payload);
256        }
257
258        debug_assert!(state.ready_systems.is_clear());
259        debug_assert!(state.running_systems.is_clear());
260        state.active_access.clear();
261        state.evaluated_sets.clear();
262        state.skipped_systems.clear();
263        state.completed_systems.clear();
264    }
265
266    fn set_apply_final_deferred(&mut self, value: bool) {
267        self.apply_final_deferred = value;
268    }
269}
270
271impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
272    fn system_completed(
273        &self,
274        system_index: usize,
275        res: Result<(), Box<dyn Any + Send>>,
276        system: &ScheduleSystem,
277    ) {
278        // tell the executor that the system finished
279        self.environment
280            .executor
281            .system_completion
282            .push(SystemResult { system_index })
283            .unwrap_or_else(|error| unreachable!("{}", error));
284        if let Err(payload) = res {
285            #[cfg(feature = "std")]
286            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
287            {
288                eprintln!("Encountered a panic in system `{}`!", &*system.name());
289            }
290            // set the payload to propagate the error
291            {
292                let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
293                *panic_payload = Some(payload);
294            }
295        }
296        self.tick_executor();
297    }
298
299    fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
300        let guard = self.environment.executor.state.try_lock().ok()?;
301        // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
302        // is synchronized by the lock on the executor state.
303        let conditions = unsafe { &mut *self.environment.conditions.get() };
304        Some((conditions, guard))
305    }
306
307    fn tick_executor(&self) {
308        // Ensure that the executor handles any events pushed to the system_completion queue by this thread.
309        // If this thread acquires the lock, the executor runs after the push() and they are processed.
310        // If this thread does not acquire the lock, then the is_empty() check on the other thread runs
311        // after the lock is released, which is after try_lock() failed, which is after the push()
312        // on this thread, so the is_empty() check will see the new events and loop.
313        loop {
314            let Some((conditions, mut guard)) = self.try_lock() else {
315                return;
316            };
317            guard.tick(self, conditions);
318            // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
319            drop(guard);
320            if self.environment.executor.system_completion.is_empty() {
321                return;
322            }
323        }
324    }
325}
326
327impl MultiThreadedExecutor {
328    /// Creates a new `multi_threaded` executor for use with a [`Schedule`].
329    ///
330    /// [`Schedule`]: crate::schedule::Schedule
331    pub fn new() -> Self {
332        Self {
333            state: Mutex::new(ExecutorState::new()),
334            system_completion: ConcurrentQueue::unbounded(),
335            starting_systems: FixedBitSet::new(),
336            apply_final_deferred: true,
337            panic_payload: Mutex::new(None),
338            #[cfg(feature = "trace")]
339            executor_span: info_span!("multithreaded executor"),
340        }
341    }
342}
343
344impl ExecutorState {
345    fn new() -> Self {
346        Self {
347            system_task_metadata: Vec::new(),
348            num_running_systems: 0,
349            num_dependencies_remaining: Vec::new(),
350            active_access: default(),
351            local_thread_running: false,
352            exclusive_running: false,
353            evaluated_sets: FixedBitSet::new(),
354            ready_systems: FixedBitSet::new(),
355            ready_systems_copy: FixedBitSet::new(),
356            running_systems: FixedBitSet::new(),
357            skipped_systems: FixedBitSet::new(),
358            completed_systems: FixedBitSet::new(),
359            unapplied_systems: FixedBitSet::new(),
360        }
361    }
362
363    fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
364        #[cfg(feature = "trace")]
365        let _span = context.environment.executor.executor_span.enter();
366
367        for result in context.environment.executor.system_completion.try_iter() {
368            self.finish_system_and_handle_dependents(result);
369        }
370
371        self.rebuild_active_access();
372
373        // SAFETY:
374        // - `finish_system_and_handle_dependents` has updated the currently running systems.
375        // - `rebuild_active_access` locks access for all currently running systems.
376        unsafe {
377            self.spawn_system_tasks(context, conditions);
378        }
379    }
380
381    /// # Safety
382    /// - Caller must ensure that `self.ready_systems` does not contain any systems that
383    ///   have been mutably borrowed (such as the systems currently running).
384    /// - `world_cell` must have permission to access all world data (not counting
385    ///   any world data that is claimed by systems currently running on this executor).
386    unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
387        if self.exclusive_running {
388            return;
389        }
390
391        // can't borrow since loop mutably borrows `self`
392        let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
393
394        // Skipping systems may cause their dependents to become ready immediately.
395        // If that happens, we need to run again immediately or we may fail to spawn those dependents.
396        let mut check_for_new_ready_systems = true;
397        while check_for_new_ready_systems {
398            check_for_new_ready_systems = false;
399
400            ready_systems.clone_from(&self.ready_systems);
401
402            for system_index in ready_systems.ones() {
403                debug_assert!(!self.running_systems.contains(system_index));
404                // SAFETY: Caller assured that these systems are not running.
405                // Therefore, no other reference to this system exists and there is no aliasing.
406                let system = unsafe { &mut *context.environment.systems[system_index].get() };
407
408                if !self.can_run(
409                    system_index,
410                    system,
411                    conditions,
412                    context.environment.world_cell,
413                ) {
414                    // NOTE: exclusive systems with ambiguities are susceptible to
415                    // being significantly displaced here (compared to single-threaded order)
416                    // if systems after them in topological order can run
417                    // if that becomes an issue, `break;` if exclusive system
418                    continue;
419                }
420
421                self.ready_systems.remove(system_index);
422
423                // SAFETY: `can_run` returned true, which means that:
424                // - It must have called `update_archetype_component_access` for each run condition.
425                // - There can be no systems running whose accesses would conflict with any conditions.
426                if unsafe {
427                    !self.should_run(
428                        system_index,
429                        system,
430                        conditions,
431                        context.environment.world_cell,
432                    )
433                } {
434                    self.skip_system_and_signal_dependents(system_index);
435                    // signal_dependents may have set more systems to ready.
436                    check_for_new_ready_systems = true;
437                    continue;
438                }
439
440                self.running_systems.insert(system_index);
441                self.num_running_systems += 1;
442
443                if self.system_task_metadata[system_index].is_exclusive {
444                    // SAFETY: `can_run` returned true for this system,
445                    // which means no systems are currently borrowed.
446                    unsafe {
447                        self.spawn_exclusive_system_task(context, system_index);
448                    }
449                    check_for_new_ready_systems = false;
450                    break;
451                }
452
453                // SAFETY:
454                // - Caller ensured no other reference to this system exists.
455                // - `system_task_metadata[system_index].is_exclusive` is `false`,
456                //   so `System::is_exclusive` returned `false` when we called it.
457                // - `can_run` has been called, which calls `update_archetype_component_access` with this system.
458                // - `can_run` returned true, so no systems with conflicting world access are running.
459                unsafe {
460                    self.spawn_system_task(context, system_index);
461                }
462            }
463        }
464
465        // give back
466        self.ready_systems_copy = ready_systems;
467    }
468
469    fn can_run(
470        &mut self,
471        system_index: usize,
472        system: &mut ScheduleSystem,
473        conditions: &mut Conditions,
474        world: UnsafeWorldCell,
475    ) -> bool {
476        let system_meta = &self.system_task_metadata[system_index];
477        if system_meta.is_exclusive && self.num_running_systems > 0 {
478            return false;
479        }
480
481        if !system_meta.is_send && self.local_thread_running {
482            return false;
483        }
484
485        // TODO: an earlier out if world's archetypes did not change
486        for set_idx in conditions.sets_with_conditions_of_systems[system_index]
487            .difference(&self.evaluated_sets)
488        {
489            for condition in &mut conditions.set_conditions[set_idx] {
490                condition.update_archetype_component_access(world);
491                if !condition
492                    .archetype_component_access()
493                    .is_compatible(&self.active_access)
494                {
495                    return false;
496                }
497            }
498        }
499
500        for condition in &mut conditions.system_conditions[system_index] {
501            condition.update_archetype_component_access(world);
502            if !condition
503                .archetype_component_access()
504                .is_compatible(&self.active_access)
505            {
506                return false;
507            }
508        }
509
510        if !self.skipped_systems.contains(system_index) {
511            system.update_archetype_component_access(world);
512            if !system
513                .archetype_component_access()
514                .is_compatible(&self.active_access)
515            {
516                return false;
517            }
518
519            self.system_task_metadata[system_index]
520                .archetype_component_access
521                .clone_from(system.archetype_component_access());
522        }
523
524        true
525    }
526
527    /// # Safety
528    /// * `world` must have permission to read any world data required by
529    ///   the system's conditions: this includes conditions for the system
530    ///   itself, and conditions for any of the system's sets.
531    /// * `update_archetype_component` must have been called with `world`
532    ///   for the system as well as system and system set's run conditions.
533    unsafe fn should_run(
534        &mut self,
535        system_index: usize,
536        system: &mut ScheduleSystem,
537        conditions: &mut Conditions,
538        world: UnsafeWorldCell,
539    ) -> bool {
540        let mut should_run = !self.skipped_systems.contains(system_index);
541        let error_handler = default_error_handler();
542
543        for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
544            if self.evaluated_sets.contains(set_idx) {
545                continue;
546            }
547
548            // Evaluate the system set's conditions.
549            // SAFETY:
550            // - The caller ensures that `world` has permission to read any data
551            //   required by the conditions.
552            // - `update_archetype_component_access` has been called for each run condition.
553            let set_conditions_met = unsafe {
554                evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
555            };
556
557            if !set_conditions_met {
558                self.skipped_systems
559                    .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
560            }
561
562            should_run &= set_conditions_met;
563            self.evaluated_sets.insert(set_idx);
564        }
565
566        // Evaluate the system's conditions.
567        // SAFETY:
568        // - The caller ensures that `world` has permission to read any data
569        //   required by the conditions.
570        // - `update_archetype_component_access` has been called for each run condition.
571        let system_conditions_met = unsafe {
572            evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
573        };
574
575        if !system_conditions_met {
576            self.skipped_systems.insert(system_index);
577        }
578
579        should_run &= system_conditions_met;
580
581        if should_run {
582            // SAFETY:
583            // - The caller ensures that `world` has permission to read any data
584            //   required by the system.
585            // - `update_archetype_component_access` has been called for system.
586            let valid_params = match unsafe { system.validate_param_unsafe(world) } {
587                Ok(()) => true,
588                Err(e) => {
589                    if !e.skipped {
590                        error_handler(
591                            e.into(),
592                            ErrorContext::System {
593                                name: system.name(),
594                                last_run: system.get_last_run(),
595                            },
596                        );
597                    }
598                    false
599                }
600            };
601            if !valid_params {
602                self.skipped_systems.insert(system_index);
603            }
604
605            should_run &= valid_params;
606        }
607
608        should_run
609    }
610
611    /// # Safety
612    /// - Caller must not alias systems that are running.
613    /// - `is_exclusive` must have returned `false` for the specified system.
614    /// - `world` must have permission to access the world data
615    ///   used by the specified system.
616    /// - `update_archetype_component_access` must have been called with `world`
617    ///   on the system associated with `system_index`.
618    unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
619        // SAFETY: this system is not running, no other reference exists
620        let system = unsafe { &mut *context.environment.systems[system_index].get() };
621        // Move the full context object into the new future.
622        let context = *context;
623
624        let system_meta = &self.system_task_metadata[system_index];
625
626        let task = async move {
627            let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
628                // SAFETY:
629                // - The caller ensures that we have permission to
630                // access the world data used by the system.
631                // - `is_exclusive` returned false
632                // - `update_archetype_component_access` has been called.
633                unsafe {
634                    if let Err(err) = __rust_begin_short_backtrace::run_unsafe(
635                        system,
636                        context.environment.world_cell,
637                    ) {
638                        (context.error_handler)(
639                            err,
640                            ErrorContext::System {
641                                name: system.name(),
642                                last_run: system.get_last_run(),
643                            },
644                        );
645                    }
646                };
647            }));
648            context.system_completed(system_index, res, system);
649        };
650
651        self.active_access
652            .extend(&system_meta.archetype_component_access);
653
654        if system_meta.is_send {
655            context.scope.spawn(task);
656        } else {
657            self.local_thread_running = true;
658            context.scope.spawn_on_external(task);
659        }
660    }
661
662    /// # Safety
663    /// Caller must ensure no systems are currently borrowed.
664    unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
665        // SAFETY: this system is not running, no other reference exists
666        let system = unsafe { &mut *context.environment.systems[system_index].get() };
667        // Move the full context object into the new future.
668        let context = *context;
669
670        if is_apply_deferred(system) {
671            // TODO: avoid allocation
672            let unapplied_systems = self.unapplied_systems.clone();
673            self.unapplied_systems.clear();
674            let task = async move {
675                // SAFETY: `can_run` returned true for this system, which means
676                // that no other systems currently have access to the world.
677                let world = unsafe { context.environment.world_cell.world_mut() };
678                let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
679                context.system_completed(system_index, res, system);
680            };
681
682            context.scope.spawn_on_scope(task);
683        } else {
684            let task = async move {
685                // SAFETY: `can_run` returned true for this system, which means
686                // that no other systems currently have access to the world.
687                let world = unsafe { context.environment.world_cell.world_mut() };
688                let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
689                    if let Err(err) = __rust_begin_short_backtrace::run(system, world) {
690                        (context.error_handler)(
691                            err,
692                            ErrorContext::System {
693                                name: system.name(),
694                                last_run: system.get_last_run(),
695                            },
696                        );
697                    }
698                }));
699                context.system_completed(system_index, res, system);
700            };
701
702            context.scope.spawn_on_scope(task);
703        }
704
705        self.exclusive_running = true;
706        self.local_thread_running = true;
707    }
708
709    fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
710        let SystemResult { system_index, .. } = result;
711
712        if self.system_task_metadata[system_index].is_exclusive {
713            self.exclusive_running = false;
714        }
715
716        if !self.system_task_metadata[system_index].is_send {
717            self.local_thread_running = false;
718        }
719
720        debug_assert!(self.num_running_systems >= 1);
721        self.num_running_systems -= 1;
722        self.running_systems.remove(system_index);
723        self.completed_systems.insert(system_index);
724        self.unapplied_systems.insert(system_index);
725
726        self.signal_dependents(system_index);
727    }
728
729    fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
730        self.completed_systems.insert(system_index);
731        self.signal_dependents(system_index);
732    }
733
734    fn signal_dependents(&mut self, system_index: usize) {
735        for &dep_idx in &self.system_task_metadata[system_index].dependents {
736            let remaining = &mut self.num_dependencies_remaining[dep_idx];
737            debug_assert!(*remaining >= 1);
738            *remaining -= 1;
739            if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
740                self.ready_systems.insert(dep_idx);
741            }
742        }
743    }
744
745    fn rebuild_active_access(&mut self) {
746        self.active_access.clear();
747        for index in self.running_systems.ones() {
748            let system_meta = &self.system_task_metadata[index];
749            self.active_access
750                .extend(&system_meta.archetype_component_access);
751        }
752    }
753}
754
755fn apply_deferred(
756    unapplied_systems: &FixedBitSet,
757    systems: &[SyncUnsafeCell<ScheduleSystem>],
758    world: &mut World,
759) -> Result<(), Box<dyn Any + Send>> {
760    for system_index in unapplied_systems.ones() {
761        // SAFETY: none of these systems are running, no other references exist
762        let system = unsafe { &mut *systems[system_index].get() };
763        let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
764            system.apply_deferred(world);
765        }));
766        if let Err(payload) = res {
767            #[cfg(feature = "std")]
768            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
769            {
770                eprintln!(
771                    "Encountered a panic when applying buffers for system `{}`!",
772                    &*system.name()
773                );
774            }
775            return Err(payload);
776        }
777    }
778    Ok(())
779}
780
781/// # Safety
782/// - `world` must have permission to read any world data
783///   required by `conditions`.
784/// - `update_archetype_component_access` must have been called
785///   with `world` for each condition in `conditions`.
786unsafe fn evaluate_and_fold_conditions(
787    conditions: &mut [BoxedCondition],
788    world: UnsafeWorldCell,
789) -> bool {
790    let error_handler = default_error_handler();
791
792    #[expect(
793        clippy::unnecessary_fold,
794        reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
795    )]
796    conditions
797        .iter_mut()
798        .map(|condition| {
799            // SAFETY:
800            // - The caller ensures that `world` has permission to read any data
801            //   required by the condition.
802            // - `update_archetype_component_access` has been called for condition.
803            match unsafe { condition.validate_param_unsafe(world) } {
804                Ok(()) => (),
805                Err(e) => {
806                    if !e.skipped {
807                        error_handler(
808                            e.into(),
809                            ErrorContext::System {
810                                name: condition.name(),
811                                last_run: condition.get_last_run(),
812                            },
813                        );
814                    }
815                    return false;
816                }
817            }
818            // SAFETY:
819            // - The caller ensures that `world` has permission to read any data
820            //   required by the condition.
821            // - `update_archetype_component_access` has been called for condition.
822            unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
823        })
824        .fold(true, |acc, res| acc && res)
825}
826
827/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
828#[derive(Resource, Clone)]
829pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
830
831impl Default for MainThreadExecutor {
832    fn default() -> Self {
833        Self::new()
834    }
835}
836
837impl MainThreadExecutor {
838    /// Creates a new executor that can be used to run systems on the main thread.
839    pub fn new() -> Self {
840        MainThreadExecutor(TaskPool::get_thread_executor())
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use crate::{
847        prelude::Resource,
848        schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},
849        system::Commands,
850        world::World,
851    };
852
853    #[derive(Resource)]
854    struct R;
855
856    #[test]
857    fn skipped_systems_notify_dependents() {
858        let mut world = World::new();
859        let mut schedule = Schedule::default();
860        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
861        schedule.add_systems(
862            (
863                (|| {}).run_if(|| false),
864                // This system depends on a system that is always skipped.
865                |mut commands: Commands| {
866                    commands.insert_resource(R);
867                },
868            )
869                .chain(),
870        );
871        schedule.run(&mut world);
872        assert!(world.get_resource::<R>().is_some());
873    }
874
875    /// Regression test for a weird bug flagged by MIRI in
876    /// `spawn_exclusive_system_task`, related to a `&mut World` being captured
877    /// inside an `async` block and somehow remaining alive even after its last use.
878    #[test]
879    fn check_spawn_exclusive_system_task_miri() {
880        let mut world = World::new();
881        let mut schedule = Schedule::default();
882        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
883        schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
884        schedule.run(&mut world);
885    }
886}