bevy_ecs/schedule/executor/
multi_threaded.rs

1use alloc::{boxed::Box, vec::Vec};
2use bevy_platform::cell::SyncUnsafeCell;
3use bevy_platform::sync::Arc;
4use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
5use concurrent_queue::ConcurrentQueue;
6use core::{any::Any, panic::AssertUnwindSafe};
7use fixedbitset::FixedBitSet;
8#[cfg(feature = "std")]
9use std::eprintln;
10use std::sync::{Mutex, MutexGuard};
11
12#[cfg(feature = "trace")]
13use tracing::{info_span, Span};
14
15use crate::{
16    error::{ErrorContext, ErrorHandler, Result},
17    prelude::Resource,
18    schedule::{
19        is_apply_deferred, ConditionWithAccess, ExecutorKind, SystemExecutor, SystemSchedule,
20        SystemWithAccess,
21    },
22    system::{RunSystemError, ScheduleSystem},
23    world::{unsafe_world_cell::UnsafeWorldCell, World},
24};
25#[cfg(feature = "hotpatching")]
26use crate::{prelude::DetectChanges, HotPatchChanges};
27
28use super::__rust_begin_short_backtrace;
29
30/// Borrowed data used by the [`MultiThreadedExecutor`].
31struct Environment<'env, 'sys> {
32    executor: &'env MultiThreadedExecutor,
33    systems: &'sys [SyncUnsafeCell<SystemWithAccess>],
34    conditions: SyncUnsafeCell<Conditions<'sys>>,
35    world_cell: UnsafeWorldCell<'env>,
36}
37
38struct Conditions<'a> {
39    system_conditions: &'a mut [Vec<ConditionWithAccess>],
40    set_conditions: &'a mut [Vec<ConditionWithAccess>],
41    sets_with_conditions_of_systems: &'a [FixedBitSet],
42    systems_in_sets_with_conditions: &'a [FixedBitSet],
43}
44
45impl<'env, 'sys> Environment<'env, 'sys> {
46    fn new(
47        executor: &'env MultiThreadedExecutor,
48        schedule: &'sys mut SystemSchedule,
49        world: &'env mut World,
50    ) -> Self {
51        Environment {
52            executor,
53            systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
54            conditions: SyncUnsafeCell::new(Conditions {
55                system_conditions: &mut schedule.system_conditions,
56                set_conditions: &mut schedule.set_conditions,
57                sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
58                systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
59            }),
60            world_cell: world.as_unsafe_world_cell(),
61        }
62    }
63}
64
65/// Per-system data used by the [`MultiThreadedExecutor`].
66// Copied here because it can't be read from the system when it's running.
67struct SystemTaskMetadata {
68    /// The set of systems whose `component_access_set()` conflicts with this one.
69    conflicting_systems: FixedBitSet,
70    /// The set of systems whose `component_access_set()` conflicts with this system's conditions.
71    /// Note that this is separate from `conflicting_systems` to handle the case where
72    /// a system is skipped by an earlier system set condition or system stepping,
73    /// and needs access to run its conditions but not for itself.
74    condition_conflicting_systems: FixedBitSet,
75    /// Indices of the systems that directly depend on the system.
76    dependents: Vec<usize>,
77    /// Is `true` if the system does not access `!Send` data.
78    is_send: bool,
79    /// Is `true` if the system is exclusive.
80    is_exclusive: bool,
81}
82
83/// The result of running a system that is sent across a channel.
84struct SystemResult {
85    system_index: usize,
86}
87
88/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
89pub struct MultiThreadedExecutor {
90    /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
91    state: Mutex<ExecutorState>,
92    /// Queue of system completion events.
93    system_completion: ConcurrentQueue<SystemResult>,
94    /// Setting when true applies deferred system buffers after all systems have run
95    apply_final_deferred: bool,
96    /// When set, tells the executor that a thread has panicked.
97    panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
98    starting_systems: FixedBitSet,
99    /// Cached tracing span
100    #[cfg(feature = "trace")]
101    executor_span: Span,
102}
103
104/// The state of the executor while running.
105pub struct ExecutorState {
106    /// Metadata for scheduling and running system tasks.
107    system_task_metadata: Vec<SystemTaskMetadata>,
108    /// The set of systems whose `component_access_set()` conflicts with this system set's conditions.
109    set_condition_conflicting_systems: Vec<FixedBitSet>,
110    /// Returns `true` if a system with non-`Send` access is running.
111    local_thread_running: bool,
112    /// Returns `true` if an exclusive system is running.
113    exclusive_running: bool,
114    /// The number of systems that are running.
115    num_running_systems: usize,
116    /// The number of dependencies each system has that have not completed.
117    num_dependencies_remaining: Vec<usize>,
118    /// System sets whose conditions have been evaluated.
119    evaluated_sets: FixedBitSet,
120    /// Systems that have no remaining dependencies and are waiting to run.
121    ready_systems: FixedBitSet,
122    /// copy of `ready_systems`
123    ready_systems_copy: FixedBitSet,
124    /// Systems that are running.
125    running_systems: FixedBitSet,
126    /// Systems that got skipped.
127    skipped_systems: FixedBitSet,
128    /// Systems whose conditions have been evaluated and were run or skipped.
129    completed_systems: FixedBitSet,
130    /// Systems that have run but have not had their buffers applied.
131    unapplied_systems: FixedBitSet,
132}
133
134/// References to data required by the executor.
135/// This is copied to each system task so that can invoke the executor when they complete.
136// These all need to outlive 'scope in order to be sent to new tasks,
137// and keeping them all in a struct means we can use lifetime elision.
138#[derive(Copy, Clone)]
139struct Context<'scope, 'env, 'sys> {
140    environment: &'env Environment<'env, 'sys>,
141    scope: &'scope Scope<'scope, 'env, ()>,
142    error_handler: ErrorHandler,
143}
144
145impl Default for MultiThreadedExecutor {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl SystemExecutor for MultiThreadedExecutor {
152    fn kind(&self) -> ExecutorKind {
153        ExecutorKind::MultiThreaded
154    }
155
156    fn init(&mut self, schedule: &SystemSchedule) {
157        let state = self.state.get_mut().unwrap();
158        // pre-allocate space
159        let sys_count = schedule.system_ids.len();
160        let set_count = schedule.set_ids.len();
161
162        self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
163        self.starting_systems = FixedBitSet::with_capacity(sys_count);
164        state.evaluated_sets = FixedBitSet::with_capacity(set_count);
165        state.ready_systems = FixedBitSet::with_capacity(sys_count);
166        state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
167        state.running_systems = FixedBitSet::with_capacity(sys_count);
168        state.completed_systems = FixedBitSet::with_capacity(sys_count);
169        state.skipped_systems = FixedBitSet::with_capacity(sys_count);
170        state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
171
172        state.system_task_metadata = Vec::with_capacity(sys_count);
173        for index in 0..sys_count {
174            state.system_task_metadata.push(SystemTaskMetadata {
175                conflicting_systems: FixedBitSet::with_capacity(sys_count),
176                condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
177                dependents: schedule.system_dependents[index].clone(),
178                is_send: schedule.systems[index].system.is_send(),
179                is_exclusive: schedule.systems[index].system.is_exclusive(),
180            });
181            if schedule.system_dependencies[index] == 0 {
182                self.starting_systems.insert(index);
183            }
184        }
185
186        {
187            #[cfg(feature = "trace")]
188            let _span = info_span!("calculate conflicting systems").entered();
189            for index1 in 0..sys_count {
190                let system1 = &schedule.systems[index1];
191                for index2 in 0..index1 {
192                    let system2 = &schedule.systems[index2];
193                    if !system2.access.is_compatible(&system1.access) {
194                        state.system_task_metadata[index1]
195                            .conflicting_systems
196                            .insert(index2);
197                        state.system_task_metadata[index2]
198                            .conflicting_systems
199                            .insert(index1);
200                    }
201                }
202
203                for index2 in 0..sys_count {
204                    let system2 = &schedule.systems[index2];
205                    if schedule.system_conditions[index1]
206                        .iter()
207                        .any(|condition| !system2.access.is_compatible(&condition.access))
208                    {
209                        state.system_task_metadata[index1]
210                            .condition_conflicting_systems
211                            .insert(index2);
212                    }
213                }
214            }
215
216            state.set_condition_conflicting_systems.clear();
217            state.set_condition_conflicting_systems.reserve(set_count);
218            for set_idx in 0..set_count {
219                let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
220                for sys_index in 0..sys_count {
221                    let system = &schedule.systems[sys_index];
222                    if schedule.set_conditions[set_idx]
223                        .iter()
224                        .any(|condition| !system.access.is_compatible(&condition.access))
225                    {
226                        conflicting_systems.insert(sys_index);
227                    }
228                }
229                state
230                    .set_condition_conflicting_systems
231                    .push(conflicting_systems);
232            }
233        }
234
235        state.num_dependencies_remaining = Vec::with_capacity(sys_count);
236    }
237
238    fn run(
239        &mut self,
240        schedule: &mut SystemSchedule,
241        world: &mut World,
242        _skip_systems: Option<&FixedBitSet>,
243        error_handler: ErrorHandler,
244    ) {
245        let state = self.state.get_mut().unwrap();
246        // reset counts
247        if schedule.systems.is_empty() {
248            return;
249        }
250        state.num_running_systems = 0;
251        state
252            .num_dependencies_remaining
253            .clone_from(&schedule.system_dependencies);
254        state.ready_systems.clone_from(&self.starting_systems);
255
256        // If stepping is enabled, make sure we skip those systems that should
257        // not be run.
258        #[cfg(feature = "bevy_debug_stepping")]
259        if let Some(skipped_systems) = _skip_systems {
260            debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
261            // mark skipped systems as completed
262            state.completed_systems |= skipped_systems;
263
264            // signal the dependencies for each of the skipped systems, as
265            // though they had run
266            for system_index in skipped_systems.ones() {
267                state.signal_dependents(system_index);
268                state.ready_systems.remove(system_index);
269            }
270        }
271
272        let thread_executor = world
273            .get_resource::<MainThreadExecutor>()
274            .map(|e| e.0.clone());
275        let thread_executor = thread_executor.as_deref();
276
277        let environment = &Environment::new(self, schedule, world);
278
279        ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
280            false,
281            thread_executor,
282            |scope| {
283                let context = Context {
284                    environment,
285                    scope,
286                    error_handler,
287                };
288
289                // The first tick won't need to process finished systems, but we still need to run the loop in
290                // tick_executor() in case a system completes while the first tick still holds the mutex.
291                context.tick_executor();
292            },
293        );
294
295        // End the borrows of self and world in environment by copying out the reference to systems.
296        let systems = environment.systems;
297
298        let state = self.state.get_mut().unwrap();
299        if self.apply_final_deferred {
300            // Do one final apply buffers after all systems have completed
301            // Commands should be applied while on the scope's thread, not the executor's thread
302            let res = apply_deferred(&state.unapplied_systems, systems, world);
303            if let Err(payload) = res {
304                let panic_payload = self.panic_payload.get_mut().unwrap();
305                *panic_payload = Some(payload);
306            }
307            state.unapplied_systems.clear();
308        }
309
310        // check to see if there was a panic
311        let payload = self.panic_payload.get_mut().unwrap();
312        if let Some(payload) = payload.take() {
313            std::panic::resume_unwind(payload);
314        }
315
316        debug_assert!(state.ready_systems.is_clear());
317        debug_assert!(state.running_systems.is_clear());
318        state.evaluated_sets.clear();
319        state.skipped_systems.clear();
320        state.completed_systems.clear();
321    }
322
323    fn set_apply_final_deferred(&mut self, value: bool) {
324        self.apply_final_deferred = value;
325    }
326}
327
328impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
329    fn system_completed(
330        &self,
331        system_index: usize,
332        res: Result<(), Box<dyn Any + Send>>,
333        system: &ScheduleSystem,
334    ) {
335        // tell the executor that the system finished
336        self.environment
337            .executor
338            .system_completion
339            .push(SystemResult { system_index })
340            .unwrap_or_else(|error| unreachable!("{}", error));
341        if let Err(payload) = res {
342            #[cfg(feature = "std")]
343            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
344            {
345                eprintln!("Encountered a panic in system `{}`!", system.name());
346            }
347            // set the payload to propagate the error
348            {
349                let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
350                *panic_payload = Some(payload);
351            }
352        }
353        self.tick_executor();
354    }
355
356    #[expect(
357        clippy::mut_from_ref,
358        reason = "Field is only accessed here and is guarded by lock with a documented safety comment"
359    )]
360    fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
361        let guard = self.environment.executor.state.try_lock().ok()?;
362        // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
363        // is synchronized by the lock on the executor state.
364        let conditions = unsafe { &mut *self.environment.conditions.get() };
365        Some((conditions, guard))
366    }
367
368    fn tick_executor(&self) {
369        // Ensure that the executor handles any events pushed to the system_completion queue by this thread.
370        // If this thread acquires the lock, the executor runs after the push() and they are processed.
371        // If this thread does not acquire the lock, then the is_empty() check on the other thread runs
372        // after the lock is released, which is after try_lock() failed, which is after the push()
373        // on this thread, so the is_empty() check will see the new events and loop.
374        loop {
375            let Some((conditions, mut guard)) = self.try_lock() else {
376                return;
377            };
378            guard.tick(self, conditions);
379            // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
380            drop(guard);
381            if self.environment.executor.system_completion.is_empty() {
382                return;
383            }
384        }
385    }
386}
387
388impl MultiThreadedExecutor {
389    /// Creates a new `multi_threaded` executor for use with a [`Schedule`].
390    ///
391    /// [`Schedule`]: crate::schedule::Schedule
392    pub fn new() -> Self {
393        Self {
394            state: Mutex::new(ExecutorState::new()),
395            system_completion: ConcurrentQueue::unbounded(),
396            starting_systems: FixedBitSet::new(),
397            apply_final_deferred: true,
398            panic_payload: Mutex::new(None),
399            #[cfg(feature = "trace")]
400            executor_span: info_span!("multithreaded executor"),
401        }
402    }
403}
404
405impl ExecutorState {
406    fn new() -> Self {
407        Self {
408            system_task_metadata: Vec::new(),
409            set_condition_conflicting_systems: Vec::new(),
410            num_running_systems: 0,
411            num_dependencies_remaining: Vec::new(),
412            local_thread_running: false,
413            exclusive_running: false,
414            evaluated_sets: FixedBitSet::new(),
415            ready_systems: FixedBitSet::new(),
416            ready_systems_copy: FixedBitSet::new(),
417            running_systems: FixedBitSet::new(),
418            skipped_systems: FixedBitSet::new(),
419            completed_systems: FixedBitSet::new(),
420            unapplied_systems: FixedBitSet::new(),
421        }
422    }
423
424    fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
425        #[cfg(feature = "trace")]
426        let _span = context.environment.executor.executor_span.enter();
427
428        for result in context.environment.executor.system_completion.try_iter() {
429            self.finish_system_and_handle_dependents(result);
430        }
431
432        // SAFETY:
433        // - `finish_system_and_handle_dependents` has updated the currently running systems.
434        // - `rebuild_active_access` locks access for all currently running systems.
435        unsafe {
436            self.spawn_system_tasks(context, conditions);
437        }
438    }
439
440    /// # Safety
441    /// - Caller must ensure that `self.ready_systems` does not contain any systems that
442    ///   have been mutably borrowed (such as the systems currently running).
443    /// - `world_cell` must have permission to access all world data (not counting
444    ///   any world data that is claimed by systems currently running on this executor).
445    unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
446        if self.exclusive_running {
447            return;
448        }
449
450        #[cfg(feature = "hotpatching")]
451        let hotpatch_tick = context
452            .environment
453            .world_cell
454            .get_resource_ref::<HotPatchChanges>()
455            .map(|r| r.last_changed())
456            .unwrap_or_default();
457
458        // can't borrow since loop mutably borrows `self`
459        let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
460
461        // Skipping systems may cause their dependents to become ready immediately.
462        // If that happens, we need to run again immediately or we may fail to spawn those dependents.
463        let mut check_for_new_ready_systems = true;
464        while check_for_new_ready_systems {
465            check_for_new_ready_systems = false;
466
467            ready_systems.clone_from(&self.ready_systems);
468
469            for system_index in ready_systems.ones() {
470                debug_assert!(!self.running_systems.contains(system_index));
471                // SAFETY: Caller assured that these systems are not running.
472                // Therefore, no other reference to this system exists and there is no aliasing.
473                let system =
474                    &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
475
476                #[cfg(feature = "hotpatching")]
477                if hotpatch_tick.is_newer_than(
478                    system.get_last_run(),
479                    context.environment.world_cell.change_tick(),
480                ) {
481                    system.refresh_hotpatch();
482                }
483
484                if !self.can_run(system_index, conditions) {
485                    // NOTE: exclusive systems with ambiguities are susceptible to
486                    // being significantly displaced here (compared to single-threaded order)
487                    // if systems after them in topological order can run
488                    // if that becomes an issue, `break;` if exclusive system
489                    continue;
490                }
491
492                self.ready_systems.remove(system_index);
493
494                // SAFETY: `can_run` returned true, which means that:
495                // - There can be no systems running whose accesses would conflict with any conditions.
496                if unsafe {
497                    !self.should_run(
498                        system_index,
499                        system,
500                        conditions,
501                        context.environment.world_cell,
502                        context.error_handler,
503                    )
504                } {
505                    self.skip_system_and_signal_dependents(system_index);
506                    // signal_dependents may have set more systems to ready.
507                    check_for_new_ready_systems = true;
508                    continue;
509                }
510
511                self.running_systems.insert(system_index);
512                self.num_running_systems += 1;
513
514                if self.system_task_metadata[system_index].is_exclusive {
515                    // SAFETY: `can_run` returned true for this system,
516                    // which means no systems are currently borrowed.
517                    unsafe {
518                        self.spawn_exclusive_system_task(context, system_index);
519                    }
520                    check_for_new_ready_systems = false;
521                    break;
522                }
523
524                // SAFETY:
525                // - Caller ensured no other reference to this system exists.
526                // - `system_task_metadata[system_index].is_exclusive` is `false`,
527                //   so `System::is_exclusive` returned `false` when we called it.
528                // - `can_run` returned true, so no systems with conflicting world access are running.
529                unsafe {
530                    self.spawn_system_task(context, system_index);
531                }
532            }
533        }
534
535        // give back
536        self.ready_systems_copy = ready_systems;
537    }
538
539    fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {
540        let system_meta = &self.system_task_metadata[system_index];
541        if system_meta.is_exclusive && self.num_running_systems > 0 {
542            return false;
543        }
544
545        if !system_meta.is_send && self.local_thread_running {
546            return false;
547        }
548
549        // TODO: an earlier out if world's archetypes did not change
550        for set_idx in conditions.sets_with_conditions_of_systems[system_index]
551            .difference(&self.evaluated_sets)
552        {
553            if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
554                return false;
555            }
556        }
557
558        if !system_meta
559            .condition_conflicting_systems
560            .is_disjoint(&self.running_systems)
561        {
562            return false;
563        }
564
565        if !self.skipped_systems.contains(system_index)
566            && !system_meta
567                .conflicting_systems
568                .is_disjoint(&self.running_systems)
569        {
570            return false;
571        }
572
573        true
574    }
575
576    /// # Safety
577    /// * `world` must have permission to read any world data required by
578    ///   the system's conditions: this includes conditions for the system
579    ///   itself, and conditions for any of the system's sets.
580    unsafe fn should_run(
581        &mut self,
582        system_index: usize,
583        system: &mut ScheduleSystem,
584        conditions: &mut Conditions,
585        world: UnsafeWorldCell,
586        error_handler: ErrorHandler,
587    ) -> bool {
588        let mut should_run = !self.skipped_systems.contains(system_index);
589
590        for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
591            if self.evaluated_sets.contains(set_idx) {
592                continue;
593            }
594
595            // Evaluate the system set's conditions.
596            // SAFETY:
597            // - The caller ensures that `world` has permission to read any data
598            //   required by the conditions.
599            let set_conditions_met = unsafe {
600                evaluate_and_fold_conditions(
601                    &mut conditions.set_conditions[set_idx],
602                    world,
603                    error_handler,
604                    system,
605                    true,
606                )
607            };
608
609            if !set_conditions_met {
610                self.skipped_systems
611                    .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
612            }
613
614            should_run &= set_conditions_met;
615            self.evaluated_sets.insert(set_idx);
616        }
617
618        // Evaluate the system's conditions.
619        // SAFETY:
620        // - The caller ensures that `world` has permission to read any data
621        //   required by the conditions.
622        let system_conditions_met = unsafe {
623            evaluate_and_fold_conditions(
624                &mut conditions.system_conditions[system_index],
625                world,
626                error_handler,
627                system,
628                false,
629            )
630        };
631
632        if !system_conditions_met {
633            self.skipped_systems.insert(system_index);
634        }
635
636        should_run &= system_conditions_met;
637
638        if should_run {
639            // SAFETY:
640            // - The caller ensures that `world` has permission to read any data
641            //   required by the system.
642            let valid_params = match unsafe { system.validate_param_unsafe(world) } {
643                Ok(()) => true,
644                Err(e) => {
645                    if !e.skipped {
646                        error_handler(
647                            e.into(),
648                            ErrorContext::System {
649                                name: system.name(),
650                                last_run: system.get_last_run(),
651                            },
652                        );
653                    }
654                    false
655                }
656            };
657            if !valid_params {
658                self.skipped_systems.insert(system_index);
659            }
660
661            should_run &= valid_params;
662        }
663
664        should_run
665    }
666
667    /// # Safety
668    /// - Caller must not alias systems that are running.
669    /// - `is_exclusive` must have returned `false` for the specified system.
670    /// - `world` must have permission to access the world data
671    ///   used by the specified system.
672    unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
673        // SAFETY: this system is not running, no other reference exists
674        let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
675        // Move the full context object into the new future.
676        let context = *context;
677
678        let system_meta = &self.system_task_metadata[system_index];
679
680        let task = async move {
681            let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
682                // SAFETY:
683                // - The caller ensures that we have permission to
684                // access the world data used by the system.
685                // - `is_exclusive` returned false
686                unsafe {
687                    if let Err(RunSystemError::Failed(err)) =
688                        __rust_begin_short_backtrace::run_unsafe(
689                            system,
690                            context.environment.world_cell,
691                        )
692                    {
693                        (context.error_handler)(
694                            err,
695                            ErrorContext::System {
696                                name: system.name(),
697                                last_run: system.get_last_run(),
698                            },
699                        );
700                    }
701                };
702            }));
703            context.system_completed(system_index, res, system);
704        };
705
706        if system_meta.is_send {
707            context.scope.spawn(task);
708        } else {
709            self.local_thread_running = true;
710            context.scope.spawn_on_external(task);
711        }
712    }
713
714    /// # Safety
715    /// Caller must ensure no systems are currently borrowed.
716    unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
717        // SAFETY: this system is not running, no other reference exists
718        let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
719        // Move the full context object into the new future.
720        let context = *context;
721
722        if is_apply_deferred(&**system) {
723            // TODO: avoid allocation
724            let unapplied_systems = self.unapplied_systems.clone();
725            self.unapplied_systems.clear();
726            let task = async move {
727                // SAFETY: `can_run` returned true for this system, which means
728                // that no other systems currently have access to the world.
729                let world = unsafe { context.environment.world_cell.world_mut() };
730                let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
731                context.system_completed(system_index, res, system);
732            };
733
734            context.scope.spawn_on_scope(task);
735        } else {
736            let task = async move {
737                // SAFETY: `can_run` returned true for this system, which means
738                // that no other systems currently have access to the world.
739                let world = unsafe { context.environment.world_cell.world_mut() };
740                let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
741                    if let Err(RunSystemError::Failed(err)) =
742                        __rust_begin_short_backtrace::run(system, world)
743                    {
744                        (context.error_handler)(
745                            err,
746                            ErrorContext::System {
747                                name: system.name(),
748                                last_run: system.get_last_run(),
749                            },
750                        );
751                    }
752                }));
753                context.system_completed(system_index, res, system);
754            };
755
756            context.scope.spawn_on_scope(task);
757        }
758
759        self.exclusive_running = true;
760        self.local_thread_running = true;
761    }
762
763    fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
764        let SystemResult { system_index, .. } = result;
765
766        if self.system_task_metadata[system_index].is_exclusive {
767            self.exclusive_running = false;
768        }
769
770        if !self.system_task_metadata[system_index].is_send {
771            self.local_thread_running = false;
772        }
773
774        debug_assert!(self.num_running_systems >= 1);
775        self.num_running_systems -= 1;
776        self.running_systems.remove(system_index);
777        self.completed_systems.insert(system_index);
778        self.unapplied_systems.insert(system_index);
779
780        self.signal_dependents(system_index);
781    }
782
783    fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
784        self.completed_systems.insert(system_index);
785        self.signal_dependents(system_index);
786    }
787
788    fn signal_dependents(&mut self, system_index: usize) {
789        for &dep_idx in &self.system_task_metadata[system_index].dependents {
790            let remaining = &mut self.num_dependencies_remaining[dep_idx];
791            debug_assert!(*remaining >= 1);
792            *remaining -= 1;
793            if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
794                self.ready_systems.insert(dep_idx);
795            }
796        }
797    }
798}
799
800fn apply_deferred(
801    unapplied_systems: &FixedBitSet,
802    systems: &[SyncUnsafeCell<SystemWithAccess>],
803    world: &mut World,
804) -> Result<(), Box<dyn Any + Send>> {
805    for system_index in unapplied_systems.ones() {
806        // SAFETY: none of these systems are running, no other references exist
807        let system = &mut unsafe { &mut *systems[system_index].get() }.system;
808        let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
809            system.apply_deferred(world);
810        }));
811        if let Err(payload) = res {
812            #[cfg(feature = "std")]
813            #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
814            {
815                eprintln!(
816                    "Encountered a panic when applying buffers for system `{}`!",
817                    system.name()
818                );
819            }
820            return Err(payload);
821        }
822    }
823    Ok(())
824}
825
826/// # Safety
827/// - `world` must have permission to read any world data
828///   required by `conditions`.
829unsafe fn evaluate_and_fold_conditions(
830    conditions: &mut [ConditionWithAccess],
831    world: UnsafeWorldCell,
832    error_handler: ErrorHandler,
833    for_system: &ScheduleSystem,
834    on_set: bool,
835) -> bool {
836    #[expect(
837        clippy::unnecessary_fold,
838        reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
839    )]
840    conditions
841        .iter_mut()
842        .map(|ConditionWithAccess { condition, .. }| {
843            // SAFETY:
844            // - The caller ensures that `world` has permission to read any data
845            //   required by the condition.
846            unsafe { condition.validate_param_unsafe(world) }
847                .map_err(From::from)
848                .and_then(|()| {
849                    // SAFETY:
850                    // - The caller ensures that `world` has permission to read any data
851                    //   required by the condition.
852                    // - `update_archetype_component_access` has been called for condition.
853                    unsafe {
854                        __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world)
855                    }
856                })
857                .unwrap_or_else(|err| {
858                    if let RunSystemError::Failed(err) = err {
859                        error_handler(
860                            err,
861                            ErrorContext::RunCondition {
862                                name: condition.name(),
863                                last_run: condition.get_last_run(),
864                                system: for_system.name(),
865                                on_set,
866                            },
867                        );
868                    };
869                    false
870                })
871        })
872        .fold(true, |acc, res| acc && res)
873}
874
875/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
876#[derive(Resource, Clone)]
877pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
878
879impl Default for MainThreadExecutor {
880    fn default() -> Self {
881        Self::new()
882    }
883}
884
885impl MainThreadExecutor {
886    /// Creates a new executor that can be used to run systems on the main thread.
887    pub fn new() -> Self {
888        MainThreadExecutor(TaskPool::get_thread_executor())
889    }
890}
891
892#[cfg(test)]
893mod tests {
894    use crate::{
895        prelude::Resource,
896        schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},
897        system::Commands,
898        world::World,
899    };
900
901    #[derive(Resource)]
902    struct R;
903
904    #[test]
905    fn skipped_systems_notify_dependents() {
906        let mut world = World::new();
907        let mut schedule = Schedule::default();
908        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
909        schedule.add_systems(
910            (
911                (|| {}).run_if(|| false),
912                // This system depends on a system that is always skipped.
913                |mut commands: Commands| {
914                    commands.insert_resource(R);
915                },
916            )
917                .chain(),
918        );
919        schedule.run(&mut world);
920        assert!(world.get_resource::<R>().is_some());
921    }
922
923    /// Regression test for a weird bug flagged by MIRI in
924    /// `spawn_exclusive_system_task`, related to a `&mut World` being captured
925    /// inside an `async` block and somehow remaining alive even after its last use.
926    #[test]
927    fn check_spawn_exclusive_system_task_miri() {
928        let mut world = World::new();
929        let mut schedule = Schedule::default();
930        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
931        schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
932        schedule.run(&mut world);
933    }
934}