bevy_ecs/schedule/executor/
multi_threaded.rs

1use alloc::sync::Arc;
2use core::any::Any;
3use std::sync::{Mutex, MutexGuard};
4
5use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
6#[cfg(feature = "trace")]
7use bevy_utils::tracing::info_span;
8#[cfg(feature = "trace")]
9use bevy_utils::tracing::Span;
10use bevy_utils::{default, syncunsafecell::SyncUnsafeCell};
11use core::panic::AssertUnwindSafe;
12
13use concurrent_queue::ConcurrentQueue;
14use fixedbitset::FixedBitSet;
15
16use crate::{
17    archetype::ArchetypeComponentId,
18    prelude::Resource,
19    query::Access,
20    schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21    system::BoxedSystem,
22    world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use crate as bevy_ecs;
26
27use super::__rust_begin_short_backtrace;
28
29/// Borrowed data used by the [`MultiThreadedExecutor`].
30struct Environment<'env, 'sys> {
31    executor: &'env MultiThreadedExecutor,
32    systems: &'sys [SyncUnsafeCell<BoxedSystem>],
33    conditions: SyncUnsafeCell<Conditions<'sys>>,
34    world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38    system_conditions: &'a mut [Vec<BoxedCondition>],
39    set_conditions: &'a mut [Vec<BoxedCondition>],
40    sets_with_conditions_of_systems: &'a [FixedBitSet],
41    systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45    fn new(
46        executor: &'env MultiThreadedExecutor,
47        schedule: &'sys mut SystemSchedule,
48        world: &'env mut World,
49    ) -> Self {
50        Environment {
51            executor,
52            systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53            conditions: SyncUnsafeCell::new(Conditions {
54                system_conditions: &mut schedule.system_conditions,
55                set_conditions: &mut schedule.set_conditions,
56                sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57                systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58            }),
59            world_cell: world.as_unsafe_world_cell(),
60        }
61    }
62}
63
64/// Per-system data used by the [`MultiThreadedExecutor`].
65// Copied here because it can't be read from the system when it's running.
66struct SystemTaskMetadata {
67    /// The [`ArchetypeComponentId`] access of the system.
68    archetype_component_access: Access<ArchetypeComponentId>,
69    /// Indices of the systems that directly depend on the system.
70    dependents: Vec<usize>,
71    /// Is `true` if the system does not access `!Send` data.
72    is_send: bool,
73    /// Is `true` if the system is exclusive.
74    is_exclusive: bool,
75}
76
77/// The result of running a system that is sent across a channel.
78struct SystemResult {
79    system_index: usize,
80}
81
82/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
83pub struct MultiThreadedExecutor {
84    /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
85    state: Mutex<ExecutorState>,
86    /// Queue of system completion events.
87    system_completion: ConcurrentQueue<SystemResult>,
88    /// Setting when true applies deferred system buffers after all systems have run
89    apply_final_deferred: bool,
90    /// When set, tells the executor that a thread has panicked.
91    panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
92    starting_systems: FixedBitSet,
93    /// Cached tracing span
94    #[cfg(feature = "trace")]
95    executor_span: Span,
96}
97
98/// The state of the executor while running.
99pub struct ExecutorState {
100    /// Metadata for scheduling and running system tasks.
101    system_task_metadata: Vec<SystemTaskMetadata>,
102    /// Union of the accesses of all currently running systems.
103    active_access: Access<ArchetypeComponentId>,
104    /// Returns `true` if a system with non-`Send` access is running.
105    local_thread_running: bool,
106    /// Returns `true` if an exclusive system is running.
107    exclusive_running: bool,
108    /// The number of systems that are running.
109    num_running_systems: usize,
110    /// The number of dependencies each system has that have not completed.
111    num_dependencies_remaining: Vec<usize>,
112    /// System sets whose conditions have been evaluated.
113    evaluated_sets: FixedBitSet,
114    /// Systems that have no remaining dependencies and are waiting to run.
115    ready_systems: FixedBitSet,
116    /// copy of `ready_systems`
117    ready_systems_copy: FixedBitSet,
118    /// Systems that are running.
119    running_systems: FixedBitSet,
120    /// Systems that got skipped.
121    skipped_systems: FixedBitSet,
122    /// Systems whose conditions have been evaluated and were run or skipped.
123    completed_systems: FixedBitSet,
124    /// Systems that have run but have not had their buffers applied.
125    unapplied_systems: FixedBitSet,
126}
127
128/// References to data required by the executor.
129/// This is copied to each system task so that can invoke the executor when they complete.
130// These all need to outlive 'scope in order to be sent to new tasks,
131// and keeping them all in a struct means we can use lifetime elision.
132#[derive(Copy, Clone)]
133struct Context<'scope, 'env, 'sys> {
134    environment: &'env Environment<'env, 'sys>,
135    scope: &'scope Scope<'scope, 'env, ()>,
136}
137
138impl Default for MultiThreadedExecutor {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144impl SystemExecutor for MultiThreadedExecutor {
145    fn kind(&self) -> ExecutorKind {
146        ExecutorKind::MultiThreaded
147    }
148
149    fn init(&mut self, schedule: &SystemSchedule) {
150        let state = self.state.get_mut().unwrap();
151        // pre-allocate space
152        let sys_count = schedule.system_ids.len();
153        let set_count = schedule.set_ids.len();
154
155        self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
156        self.starting_systems = FixedBitSet::with_capacity(sys_count);
157        state.evaluated_sets = FixedBitSet::with_capacity(set_count);
158        state.ready_systems = FixedBitSet::with_capacity(sys_count);
159        state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
160        state.running_systems = FixedBitSet::with_capacity(sys_count);
161        state.completed_systems = FixedBitSet::with_capacity(sys_count);
162        state.skipped_systems = FixedBitSet::with_capacity(sys_count);
163        state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
164
165        state.system_task_metadata = Vec::with_capacity(sys_count);
166        for index in 0..sys_count {
167            state.system_task_metadata.push(SystemTaskMetadata {
168                archetype_component_access: default(),
169                dependents: schedule.system_dependents[index].clone(),
170                is_send: schedule.systems[index].is_send(),
171                is_exclusive: schedule.systems[index].is_exclusive(),
172            });
173            if schedule.system_dependencies[index] == 0 {
174                self.starting_systems.insert(index);
175            }
176        }
177
178        state.num_dependencies_remaining = Vec::with_capacity(sys_count);
179    }
180
181    fn run(
182        &mut self,
183        schedule: &mut SystemSchedule,
184        world: &mut World,
185        _skip_systems: Option<&FixedBitSet>,
186    ) {
187        let state = self.state.get_mut().unwrap();
188        // reset counts
189        if schedule.systems.is_empty() {
190            return;
191        }
192        state.num_running_systems = 0;
193        state
194            .num_dependencies_remaining
195            .clone_from(&schedule.system_dependencies);
196        state.ready_systems.clone_from(&self.starting_systems);
197
198        // If stepping is enabled, make sure we skip those systems that should
199        // not be run.
200        #[cfg(feature = "bevy_debug_stepping")]
201        if let Some(skipped_systems) = _skip_systems {
202            debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203            // mark skipped systems as completed
204            state.completed_systems |= skipped_systems;
205
206            // signal the dependencies for each of the skipped systems, as
207            // though they had run
208            for system_index in skipped_systems.ones() {
209                state.signal_dependents(system_index);
210                state.ready_systems.remove(system_index);
211            }
212        }
213
214        let thread_executor = world
215            .get_resource::<MainThreadExecutor>()
216            .map(|e| e.0.clone());
217        let thread_executor = thread_executor.as_deref();
218
219        let environment = &Environment::new(self, schedule, world);
220
221        ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222            false,
223            thread_executor,
224            |scope| {
225                let context = Context { environment, scope };
226
227                // The first tick won't need to process finished systems, but we still need to run the loop in
228                // tick_executor() in case a system completes while the first tick still holds the mutex.
229                context.tick_executor();
230            },
231        );
232
233        // End the borrows of self and world in environment by copying out the reference to systems.
234        let systems = environment.systems;
235
236        let state = self.state.get_mut().unwrap();
237        if self.apply_final_deferred {
238            // Do one final apply buffers after all systems have completed
239            // Commands should be applied while on the scope's thread, not the executor's thread
240            let res = apply_deferred(&state.unapplied_systems, systems, world);
241            if let Err(payload) = res {
242                let panic_payload = self.panic_payload.get_mut().unwrap();
243                *panic_payload = Some(payload);
244            }
245            state.unapplied_systems.clear();
246        }
247
248        // check to see if there was a panic
249        let payload = self.panic_payload.get_mut().unwrap();
250        if let Some(payload) = payload.take() {
251            std::panic::resume_unwind(payload);
252        }
253
254        debug_assert!(state.ready_systems.is_clear());
255        debug_assert!(state.running_systems.is_clear());
256        state.active_access.clear();
257        state.evaluated_sets.clear();
258        state.skipped_systems.clear();
259        state.completed_systems.clear();
260    }
261
262    fn set_apply_final_deferred(&mut self, value: bool) {
263        self.apply_final_deferred = value;
264    }
265}
266
267impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
268    fn system_completed(
269        &self,
270        system_index: usize,
271        res: Result<(), Box<dyn Any + Send>>,
272        system: &BoxedSystem,
273    ) {
274        // tell the executor that the system finished
275        self.environment
276            .executor
277            .system_completion
278            .push(SystemResult { system_index })
279            .unwrap_or_else(|error| unreachable!("{}", error));
280        if let Err(payload) = res {
281            eprintln!("Encountered a panic in system `{}`!", &*system.name());
282            // set the payload to propagate the error
283            {
284                let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
285                *panic_payload = Some(payload);
286            }
287        }
288        self.tick_executor();
289    }
290
291    fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
292        let guard = self.environment.executor.state.try_lock().ok()?;
293        // SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
294        // is synchronized by the lock on the executor state.
295        let conditions = unsafe { &mut *self.environment.conditions.get() };
296        Some((conditions, guard))
297    }
298
299    fn tick_executor(&self) {
300        // Ensure that the executor handles any events pushed to the system_completion queue by this thread.
301        // If this thread acquires the lock, the exector runs after the push() and they are processed.
302        // If this thread does not acquire the lock, then the is_empty() check on the other thread runs
303        // after the lock is released, which is after try_lock() failed, which is after the push()
304        // on this thread, so the is_empty() check will see the new events and loop.
305        loop {
306            let Some((conditions, mut guard)) = self.try_lock() else {
307                return;
308            };
309            guard.tick(self, conditions);
310            // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
311            drop(guard);
312            if self.environment.executor.system_completion.is_empty() {
313                return;
314            }
315        }
316    }
317}
318
319impl MultiThreadedExecutor {
320    /// Creates a new `multi_threaded` executor for use with a [`Schedule`].
321    ///
322    /// [`Schedule`]: crate::schedule::Schedule
323    pub fn new() -> Self {
324        Self {
325            state: Mutex::new(ExecutorState::new()),
326            system_completion: ConcurrentQueue::unbounded(),
327            starting_systems: FixedBitSet::new(),
328            apply_final_deferred: true,
329            panic_payload: Mutex::new(None),
330            #[cfg(feature = "trace")]
331            executor_span: info_span!("multithreaded executor"),
332        }
333    }
334}
335
336impl ExecutorState {
337    fn new() -> Self {
338        Self {
339            system_task_metadata: Vec::new(),
340            num_running_systems: 0,
341            num_dependencies_remaining: Vec::new(),
342            active_access: default(),
343            local_thread_running: false,
344            exclusive_running: false,
345            evaluated_sets: FixedBitSet::new(),
346            ready_systems: FixedBitSet::new(),
347            ready_systems_copy: FixedBitSet::new(),
348            running_systems: FixedBitSet::new(),
349            skipped_systems: FixedBitSet::new(),
350            completed_systems: FixedBitSet::new(),
351            unapplied_systems: FixedBitSet::new(),
352        }
353    }
354
355    fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
356        #[cfg(feature = "trace")]
357        let _span = context.environment.executor.executor_span.enter();
358
359        for result in context.environment.executor.system_completion.try_iter() {
360            self.finish_system_and_handle_dependents(result);
361        }
362
363        self.rebuild_active_access();
364
365        // SAFETY:
366        // - `finish_system_and_handle_dependents` has updated the currently running systems.
367        // - `rebuild_active_access` locks access for all currently running systems.
368        unsafe {
369            self.spawn_system_tasks(context, conditions);
370        }
371    }
372
373    /// # Safety
374    /// - Caller must ensure that `self.ready_systems` does not contain any systems that
375    ///   have been mutably borrowed (such as the systems currently running).
376    /// - `world_cell` must have permission to access all world data (not counting
377    ///   any world data that is claimed by systems currently running on this executor).
378    unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
379        if self.exclusive_running {
380            return;
381        }
382
383        // can't borrow since loop mutably borrows `self`
384        let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
385
386        // Skipping systems may cause their dependents to become ready immediately.
387        // If that happens, we need to run again immediately or we may fail to spawn those dependents.
388        let mut check_for_new_ready_systems = true;
389        while check_for_new_ready_systems {
390            check_for_new_ready_systems = false;
391
392            ready_systems.clone_from(&self.ready_systems);
393
394            for system_index in ready_systems.ones() {
395                debug_assert!(!self.running_systems.contains(system_index));
396                // SAFETY: Caller assured that these systems are not running.
397                // Therefore, no other reference to this system exists and there is no aliasing.
398                let system = unsafe { &mut *context.environment.systems[system_index].get() };
399
400                if !self.can_run(
401                    system_index,
402                    system,
403                    conditions,
404                    context.environment.world_cell,
405                ) {
406                    // NOTE: exclusive systems with ambiguities are susceptible to
407                    // being significantly displaced here (compared to single-threaded order)
408                    // if systems after them in topological order can run
409                    // if that becomes an issue, `break;` if exclusive system
410                    continue;
411                }
412
413                self.ready_systems.remove(system_index);
414
415                // SAFETY: `can_run` returned true, which means that:
416                // - It must have called `update_archetype_component_access` for each run condition.
417                // - There can be no systems running whose accesses would conflict with any conditions.
418                if unsafe {
419                    !self.should_run(
420                        system_index,
421                        system,
422                        conditions,
423                        context.environment.world_cell,
424                    )
425                } {
426                    self.skip_system_and_signal_dependents(system_index);
427                    // signal_dependents may have set more systems to ready.
428                    check_for_new_ready_systems = true;
429                    continue;
430                }
431
432                self.running_systems.insert(system_index);
433                self.num_running_systems += 1;
434
435                if self.system_task_metadata[system_index].is_exclusive {
436                    // SAFETY: `can_run` returned true for this system,
437                    // which means no systems are currently borrowed.
438                    unsafe {
439                        self.spawn_exclusive_system_task(context, system_index);
440                    }
441                    check_for_new_ready_systems = false;
442                    break;
443                }
444
445                // SAFETY:
446                // - Caller ensured no other reference to this system exists.
447                // - `can_run` has been called, which calls `update_archetype_component_access` with this system.
448                // - `can_run` returned true, so no systems with conflicting world access are running.
449                unsafe {
450                    self.spawn_system_task(context, system_index);
451                }
452            }
453        }
454
455        // give back
456        self.ready_systems_copy = ready_systems;
457    }
458
459    fn can_run(
460        &mut self,
461        system_index: usize,
462        system: &mut BoxedSystem,
463        conditions: &mut Conditions,
464        world: UnsafeWorldCell,
465    ) -> bool {
466        let system_meta = &self.system_task_metadata[system_index];
467        if system_meta.is_exclusive && self.num_running_systems > 0 {
468            return false;
469        }
470
471        if !system_meta.is_send && self.local_thread_running {
472            return false;
473        }
474
475        // TODO: an earlier out if world's archetypes did not change
476        for set_idx in conditions.sets_with_conditions_of_systems[system_index]
477            .difference(&self.evaluated_sets)
478        {
479            for condition in &mut conditions.set_conditions[set_idx] {
480                condition.update_archetype_component_access(world);
481                if !condition
482                    .archetype_component_access()
483                    .is_compatible(&self.active_access)
484                {
485                    return false;
486                }
487            }
488        }
489
490        for condition in &mut conditions.system_conditions[system_index] {
491            condition.update_archetype_component_access(world);
492            if !condition
493                .archetype_component_access()
494                .is_compatible(&self.active_access)
495            {
496                return false;
497            }
498        }
499
500        if !self.skipped_systems.contains(system_index) {
501            system.update_archetype_component_access(world);
502            if !system
503                .archetype_component_access()
504                .is_compatible(&self.active_access)
505            {
506                return false;
507            }
508
509            self.system_task_metadata[system_index]
510                .archetype_component_access
511                .clone_from(system.archetype_component_access());
512        }
513
514        true
515    }
516
517    /// # Safety
518    /// * `world` must have permission to read any world data required by
519    ///   the system's conditions: this includes conditions for the system
520    ///   itself, and conditions for any of the system's sets.
521    /// * `update_archetype_component` must have been called with `world`
522    ///   for the system as well as system and system set's run conditions.
523    unsafe fn should_run(
524        &mut self,
525        system_index: usize,
526        system: &mut BoxedSystem,
527        conditions: &mut Conditions,
528        world: UnsafeWorldCell,
529    ) -> bool {
530        let mut should_run = !self.skipped_systems.contains(system_index);
531
532        for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
533            if self.evaluated_sets.contains(set_idx) {
534                continue;
535            }
536
537            // Evaluate the system set's conditions.
538            // SAFETY:
539            // - The caller ensures that `world` has permission to read any data
540            //   required by the conditions.
541            // - `update_archetype_component_access` has been called for each run condition.
542            let set_conditions_met = unsafe {
543                evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
544            };
545
546            if !set_conditions_met {
547                self.skipped_systems
548                    .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
549            }
550
551            should_run &= set_conditions_met;
552            self.evaluated_sets.insert(set_idx);
553        }
554
555        // Evaluate the system's conditions.
556        // SAFETY:
557        // - The caller ensures that `world` has permission to read any data
558        //   required by the conditions.
559        // - `update_archetype_component_access` has been called for each run condition.
560        let system_conditions_met = unsafe {
561            evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
562        };
563
564        if !system_conditions_met {
565            self.skipped_systems.insert(system_index);
566        }
567
568        should_run &= system_conditions_met;
569
570        if should_run {
571            // SAFETY:
572            // - The caller ensures that `world` has permission to read any data
573            //   required by the system.
574            // - `update_archetype_component_access` has been called for system.
575            let valid_params = unsafe { system.validate_param_unsafe(world) };
576            if !valid_params {
577                self.skipped_systems.insert(system_index);
578            }
579            should_run &= valid_params;
580        }
581
582        should_run
583    }
584
585    /// # Safety
586    /// - Caller must not alias systems that are running.
587    /// - `world` must have permission to access the world data
588    ///   used by the specified system.
589    /// - `update_archetype_component_access` must have been called with `world`
590    ///   on the system associated with `system_index`.
591    unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
592        // SAFETY: this system is not running, no other reference exists
593        let system = unsafe { &mut *context.environment.systems[system_index].get() };
594        // Move the full context object into the new future.
595        let context = *context;
596
597        let system_meta = &self.system_task_metadata[system_index];
598
599        let task = async move {
600            let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
601                // SAFETY:
602                // - The caller ensures that we have permission to
603                // access the world data used by the system.
604                // - `update_archetype_component_access` has been called.
605                unsafe {
606                    __rust_begin_short_backtrace::run_unsafe(
607                        &mut **system,
608                        context.environment.world_cell,
609                    );
610                };
611            }));
612            context.system_completed(system_index, res, system);
613        };
614
615        self.active_access
616            .extend(&system_meta.archetype_component_access);
617
618        if system_meta.is_send {
619            context.scope.spawn(task);
620        } else {
621            self.local_thread_running = true;
622            context.scope.spawn_on_external(task);
623        }
624    }
625
626    /// # Safety
627    /// Caller must ensure no systems are currently borrowed.
628    unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
629        // SAFETY: this system is not running, no other reference exists
630        let system = unsafe { &mut *context.environment.systems[system_index].get() };
631        // Move the full context object into the new future.
632        let context = *context;
633
634        if is_apply_deferred(system) {
635            // TODO: avoid allocation
636            let unapplied_systems = self.unapplied_systems.clone();
637            self.unapplied_systems.clear();
638            let task = async move {
639                // SAFETY: `can_run` returned true for this system, which means
640                // that no other systems currently have access to the world.
641                let world = unsafe { context.environment.world_cell.world_mut() };
642                let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
643                context.system_completed(system_index, res, system);
644            };
645
646            context.scope.spawn_on_scope(task);
647        } else {
648            let task = async move {
649                // SAFETY: `can_run` returned true for this system, which means
650                // that no other systems currently have access to the world.
651                let world = unsafe { context.environment.world_cell.world_mut() };
652                let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
653                    __rust_begin_short_backtrace::run(&mut **system, world);
654                }));
655                context.system_completed(system_index, res, system);
656            };
657
658            context.scope.spawn_on_scope(task);
659        }
660
661        self.exclusive_running = true;
662        self.local_thread_running = true;
663    }
664
665    fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
666        let SystemResult { system_index, .. } = result;
667
668        if self.system_task_metadata[system_index].is_exclusive {
669            self.exclusive_running = false;
670        }
671
672        if !self.system_task_metadata[system_index].is_send {
673            self.local_thread_running = false;
674        }
675
676        debug_assert!(self.num_running_systems >= 1);
677        self.num_running_systems -= 1;
678        self.running_systems.remove(system_index);
679        self.completed_systems.insert(system_index);
680        self.unapplied_systems.insert(system_index);
681
682        self.signal_dependents(system_index);
683    }
684
685    fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
686        self.completed_systems.insert(system_index);
687        self.signal_dependents(system_index);
688    }
689
690    fn signal_dependents(&mut self, system_index: usize) {
691        for &dep_idx in &self.system_task_metadata[system_index].dependents {
692            let remaining = &mut self.num_dependencies_remaining[dep_idx];
693            debug_assert!(*remaining >= 1);
694            *remaining -= 1;
695            if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
696                self.ready_systems.insert(dep_idx);
697            }
698        }
699    }
700
701    fn rebuild_active_access(&mut self) {
702        self.active_access.clear();
703        for index in self.running_systems.ones() {
704            let system_meta = &self.system_task_metadata[index];
705            self.active_access
706                .extend(&system_meta.archetype_component_access);
707        }
708    }
709}
710
711fn apply_deferred(
712    unapplied_systems: &FixedBitSet,
713    systems: &[SyncUnsafeCell<BoxedSystem>],
714    world: &mut World,
715) -> Result<(), Box<dyn Any + Send>> {
716    for system_index in unapplied_systems.ones() {
717        // SAFETY: none of these systems are running, no other references exist
718        let system = unsafe { &mut *systems[system_index].get() };
719        let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
720            system.apply_deferred(world);
721        }));
722        if let Err(payload) = res {
723            eprintln!(
724                "Encountered a panic when applying buffers for system `{}`!",
725                &*system.name()
726            );
727            return Err(payload);
728        }
729    }
730    Ok(())
731}
732
733/// # Safety
734/// - `world` must have permission to read any world data
735///   required by `conditions`.
736/// - `update_archetype_component_access` must have been called
737///   with `world` for each condition in `conditions`.
738unsafe fn evaluate_and_fold_conditions(
739    conditions: &mut [BoxedCondition],
740    world: UnsafeWorldCell,
741) -> bool {
742    // not short-circuiting is intentional
743    #[allow(clippy::unnecessary_fold)]
744    conditions
745        .iter_mut()
746        .map(|condition| {
747            // SAFETY:
748            // - The caller ensures that `world` has permission to read any data
749            //   required by the condition.
750            // - `update_archetype_component_access` has been called for condition.
751            if !unsafe { condition.validate_param_unsafe(world) } {
752                return false;
753            }
754            // SAFETY:
755            // - The caller ensures that `world` has permission to read any data
756            //   required by the condition.
757            // - `update_archetype_component_access` has been called for condition.
758            unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
759        })
760        .fold(true, |acc, res| acc && res)
761}
762
763/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
764#[derive(Resource, Clone)]
765pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
766
767impl Default for MainThreadExecutor {
768    fn default() -> Self {
769        Self::new()
770    }
771}
772
773impl MainThreadExecutor {
774    /// Creates a new executor that can be used to run systems on the main thread.
775    pub fn new() -> Self {
776        MainThreadExecutor(TaskPool::get_thread_executor())
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use crate::{
783        self as bevy_ecs,
784        prelude::Resource,
785        schedule::{ExecutorKind, IntoSystemConfigs, Schedule},
786        system::Commands,
787        world::World,
788    };
789
790    #[derive(Resource)]
791    struct R;
792
793    #[test]
794    fn skipped_systems_notify_dependents() {
795        let mut world = World::new();
796        let mut schedule = Schedule::default();
797        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
798        schedule.add_systems(
799            (
800                (|| {}).run_if(|| false),
801                // This system depends on a system that is always skipped.
802                |mut commands: Commands| {
803                    commands.insert_resource(R);
804                },
805            )
806                .chain(),
807        );
808        schedule.run(&mut world);
809        assert!(world.get_resource::<R>().is_some());
810    }
811
812    /// Regression test for a weird bug flagged by MIRI in
813    /// `spawn_exclusive_system_task`, related to a `&mut World` being captured
814    /// inside an `async` block and somehow remaining alive even after its last use.
815    #[test]
816    fn check_spawn_exclusive_system_task_miri() {
817        let mut world = World::new();
818        let mut schedule = Schedule::default();
819        schedule.set_executor_kind(ExecutorKind::MultiThreaded);
820        schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
821        schedule.run(&mut world);
822    }
823}