1use alloc::{boxed::Box, vec::Vec};
2use bevy_platform::sync::Arc;
3use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
4use bevy_utils::{default, syncunsafecell::SyncUnsafeCell};
5use concurrent_queue::ConcurrentQueue;
6use core::{any::Any, panic::AssertUnwindSafe};
7use fixedbitset::FixedBitSet;
8#[cfg(feature = "std")]
9use std::eprintln;
10use std::sync::{Mutex, MutexGuard};
11
12#[cfg(feature = "trace")]
13use tracing::{info_span, Span};
14
15use crate::{
16 archetype::ArchetypeComponentId,
17 error::{default_error_handler, BevyError, ErrorContext, Result},
18 prelude::Resource,
19 query::Access,
20 schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21 system::ScheduleSystem,
22 world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use super::__rust_begin_short_backtrace;
26
27struct Environment<'env, 'sys> {
29 executor: &'env MultiThreadedExecutor,
30 systems: &'sys [SyncUnsafeCell<ScheduleSystem>],
31 conditions: SyncUnsafeCell<Conditions<'sys>>,
32 world_cell: UnsafeWorldCell<'env>,
33}
34
35struct Conditions<'a> {
36 system_conditions: &'a mut [Vec<BoxedCondition>],
37 set_conditions: &'a mut [Vec<BoxedCondition>],
38 sets_with_conditions_of_systems: &'a [FixedBitSet],
39 systems_in_sets_with_conditions: &'a [FixedBitSet],
40}
41
42impl<'env, 'sys> Environment<'env, 'sys> {
43 fn new(
44 executor: &'env MultiThreadedExecutor,
45 schedule: &'sys mut SystemSchedule,
46 world: &'env mut World,
47 ) -> Self {
48 Environment {
49 executor,
50 systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
51 conditions: SyncUnsafeCell::new(Conditions {
52 system_conditions: &mut schedule.system_conditions,
53 set_conditions: &mut schedule.set_conditions,
54 sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
55 systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
56 }),
57 world_cell: world.as_unsafe_world_cell(),
58 }
59 }
60}
61
62struct SystemTaskMetadata {
65 archetype_component_access: Access<ArchetypeComponentId>,
67 dependents: Vec<usize>,
69 is_send: bool,
71 is_exclusive: bool,
73}
74
75struct SystemResult {
77 system_index: usize,
78}
79
80pub struct MultiThreadedExecutor {
82 state: Mutex<ExecutorState>,
84 system_completion: ConcurrentQueue<SystemResult>,
86 apply_final_deferred: bool,
88 panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
90 starting_systems: FixedBitSet,
91 #[cfg(feature = "trace")]
93 executor_span: Span,
94}
95
96pub struct ExecutorState {
98 system_task_metadata: Vec<SystemTaskMetadata>,
100 active_access: Access<ArchetypeComponentId>,
102 local_thread_running: bool,
104 exclusive_running: bool,
106 num_running_systems: usize,
108 num_dependencies_remaining: Vec<usize>,
110 evaluated_sets: FixedBitSet,
112 ready_systems: FixedBitSet,
114 ready_systems_copy: FixedBitSet,
116 running_systems: FixedBitSet,
118 skipped_systems: FixedBitSet,
120 completed_systems: FixedBitSet,
122 unapplied_systems: FixedBitSet,
124}
125
126#[derive(Copy, Clone)]
131struct Context<'scope, 'env, 'sys> {
132 environment: &'env Environment<'env, 'sys>,
133 scope: &'scope Scope<'scope, 'env, ()>,
134 error_handler: fn(BevyError, ErrorContext),
135}
136
137impl Default for MultiThreadedExecutor {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl SystemExecutor for MultiThreadedExecutor {
144 fn kind(&self) -> ExecutorKind {
145 ExecutorKind::MultiThreaded
146 }
147
148 fn init(&mut self, schedule: &SystemSchedule) {
149 let state = self.state.get_mut().unwrap();
150 let sys_count = schedule.system_ids.len();
152 let set_count = schedule.set_ids.len();
153
154 self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
155 self.starting_systems = FixedBitSet::with_capacity(sys_count);
156 state.evaluated_sets = FixedBitSet::with_capacity(set_count);
157 state.ready_systems = FixedBitSet::with_capacity(sys_count);
158 state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
159 state.running_systems = FixedBitSet::with_capacity(sys_count);
160 state.completed_systems = FixedBitSet::with_capacity(sys_count);
161 state.skipped_systems = FixedBitSet::with_capacity(sys_count);
162 state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
163
164 state.system_task_metadata = Vec::with_capacity(sys_count);
165 for index in 0..sys_count {
166 state.system_task_metadata.push(SystemTaskMetadata {
167 archetype_component_access: default(),
168 dependents: schedule.system_dependents[index].clone(),
169 is_send: schedule.systems[index].is_send(),
170 is_exclusive: schedule.systems[index].is_exclusive(),
171 });
172 if schedule.system_dependencies[index] == 0 {
173 self.starting_systems.insert(index);
174 }
175 }
176
177 state.num_dependencies_remaining = Vec::with_capacity(sys_count);
178 }
179
180 fn run(
181 &mut self,
182 schedule: &mut SystemSchedule,
183 world: &mut World,
184 _skip_systems: Option<&FixedBitSet>,
185 error_handler: fn(BevyError, ErrorContext),
186 ) {
187 let state = self.state.get_mut().unwrap();
188 if schedule.systems.is_empty() {
190 return;
191 }
192 state.num_running_systems = 0;
193 state
194 .num_dependencies_remaining
195 .clone_from(&schedule.system_dependencies);
196 state.ready_systems.clone_from(&self.starting_systems);
197
198 #[cfg(feature = "bevy_debug_stepping")]
201 if let Some(skipped_systems) = _skip_systems {
202 debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203 state.completed_systems |= skipped_systems;
205
206 for system_index in skipped_systems.ones() {
209 state.signal_dependents(system_index);
210 state.ready_systems.remove(system_index);
211 }
212 }
213
214 let thread_executor = world
215 .get_resource::<MainThreadExecutor>()
216 .map(|e| e.0.clone());
217 let thread_executor = thread_executor.as_deref();
218
219 let environment = &Environment::new(self, schedule, world);
220
221 ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222 false,
223 thread_executor,
224 |scope| {
225 let context = Context {
226 environment,
227 scope,
228 error_handler,
229 };
230
231 context.tick_executor();
234 },
235 );
236
237 let systems = environment.systems;
239
240 let state = self.state.get_mut().unwrap();
241 if self.apply_final_deferred {
242 let res = apply_deferred(&state.unapplied_systems, systems, world);
245 if let Err(payload) = res {
246 let panic_payload = self.panic_payload.get_mut().unwrap();
247 *panic_payload = Some(payload);
248 }
249 state.unapplied_systems.clear();
250 }
251
252 let payload = self.panic_payload.get_mut().unwrap();
254 if let Some(payload) = payload.take() {
255 std::panic::resume_unwind(payload);
256 }
257
258 debug_assert!(state.ready_systems.is_clear());
259 debug_assert!(state.running_systems.is_clear());
260 state.active_access.clear();
261 state.evaluated_sets.clear();
262 state.skipped_systems.clear();
263 state.completed_systems.clear();
264 }
265
266 fn set_apply_final_deferred(&mut self, value: bool) {
267 self.apply_final_deferred = value;
268 }
269}
270
271impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
272 fn system_completed(
273 &self,
274 system_index: usize,
275 res: Result<(), Box<dyn Any + Send>>,
276 system: &ScheduleSystem,
277 ) {
278 self.environment
280 .executor
281 .system_completion
282 .push(SystemResult { system_index })
283 .unwrap_or_else(|error| unreachable!("{}", error));
284 if let Err(payload) = res {
285 #[cfg(feature = "std")]
286 #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
287 {
288 eprintln!("Encountered a panic in system `{}`!", &*system.name());
289 }
290 {
292 let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
293 *panic_payload = Some(payload);
294 }
295 }
296 self.tick_executor();
297 }
298
299 fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
300 let guard = self.environment.executor.state.try_lock().ok()?;
301 let conditions = unsafe { &mut *self.environment.conditions.get() };
304 Some((conditions, guard))
305 }
306
307 fn tick_executor(&self) {
308 loop {
314 let Some((conditions, mut guard)) = self.try_lock() else {
315 return;
316 };
317 guard.tick(self, conditions);
318 drop(guard);
320 if self.environment.executor.system_completion.is_empty() {
321 return;
322 }
323 }
324 }
325}
326
327impl MultiThreadedExecutor {
328 pub fn new() -> Self {
332 Self {
333 state: Mutex::new(ExecutorState::new()),
334 system_completion: ConcurrentQueue::unbounded(),
335 starting_systems: FixedBitSet::new(),
336 apply_final_deferred: true,
337 panic_payload: Mutex::new(None),
338 #[cfg(feature = "trace")]
339 executor_span: info_span!("multithreaded executor"),
340 }
341 }
342}
343
344impl ExecutorState {
345 fn new() -> Self {
346 Self {
347 system_task_metadata: Vec::new(),
348 num_running_systems: 0,
349 num_dependencies_remaining: Vec::new(),
350 active_access: default(),
351 local_thread_running: false,
352 exclusive_running: false,
353 evaluated_sets: FixedBitSet::new(),
354 ready_systems: FixedBitSet::new(),
355 ready_systems_copy: FixedBitSet::new(),
356 running_systems: FixedBitSet::new(),
357 skipped_systems: FixedBitSet::new(),
358 completed_systems: FixedBitSet::new(),
359 unapplied_systems: FixedBitSet::new(),
360 }
361 }
362
363 fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
364 #[cfg(feature = "trace")]
365 let _span = context.environment.executor.executor_span.enter();
366
367 for result in context.environment.executor.system_completion.try_iter() {
368 self.finish_system_and_handle_dependents(result);
369 }
370
371 self.rebuild_active_access();
372
373 unsafe {
377 self.spawn_system_tasks(context, conditions);
378 }
379 }
380
381 unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
387 if self.exclusive_running {
388 return;
389 }
390
391 let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
393
394 let mut check_for_new_ready_systems = true;
397 while check_for_new_ready_systems {
398 check_for_new_ready_systems = false;
399
400 ready_systems.clone_from(&self.ready_systems);
401
402 for system_index in ready_systems.ones() {
403 debug_assert!(!self.running_systems.contains(system_index));
404 let system = unsafe { &mut *context.environment.systems[system_index].get() };
407
408 if !self.can_run(
409 system_index,
410 system,
411 conditions,
412 context.environment.world_cell,
413 ) {
414 continue;
419 }
420
421 self.ready_systems.remove(system_index);
422
423 if unsafe {
427 !self.should_run(
428 system_index,
429 system,
430 conditions,
431 context.environment.world_cell,
432 )
433 } {
434 self.skip_system_and_signal_dependents(system_index);
435 check_for_new_ready_systems = true;
437 continue;
438 }
439
440 self.running_systems.insert(system_index);
441 self.num_running_systems += 1;
442
443 if self.system_task_metadata[system_index].is_exclusive {
444 unsafe {
447 self.spawn_exclusive_system_task(context, system_index);
448 }
449 check_for_new_ready_systems = false;
450 break;
451 }
452
453 unsafe {
460 self.spawn_system_task(context, system_index);
461 }
462 }
463 }
464
465 self.ready_systems_copy = ready_systems;
467 }
468
469 fn can_run(
470 &mut self,
471 system_index: usize,
472 system: &mut ScheduleSystem,
473 conditions: &mut Conditions,
474 world: UnsafeWorldCell,
475 ) -> bool {
476 let system_meta = &self.system_task_metadata[system_index];
477 if system_meta.is_exclusive && self.num_running_systems > 0 {
478 return false;
479 }
480
481 if !system_meta.is_send && self.local_thread_running {
482 return false;
483 }
484
485 for set_idx in conditions.sets_with_conditions_of_systems[system_index]
487 .difference(&self.evaluated_sets)
488 {
489 for condition in &mut conditions.set_conditions[set_idx] {
490 condition.update_archetype_component_access(world);
491 if !condition
492 .archetype_component_access()
493 .is_compatible(&self.active_access)
494 {
495 return false;
496 }
497 }
498 }
499
500 for condition in &mut conditions.system_conditions[system_index] {
501 condition.update_archetype_component_access(world);
502 if !condition
503 .archetype_component_access()
504 .is_compatible(&self.active_access)
505 {
506 return false;
507 }
508 }
509
510 if !self.skipped_systems.contains(system_index) {
511 system.update_archetype_component_access(world);
512 if !system
513 .archetype_component_access()
514 .is_compatible(&self.active_access)
515 {
516 return false;
517 }
518
519 self.system_task_metadata[system_index]
520 .archetype_component_access
521 .clone_from(system.archetype_component_access());
522 }
523
524 true
525 }
526
527 unsafe fn should_run(
534 &mut self,
535 system_index: usize,
536 system: &mut ScheduleSystem,
537 conditions: &mut Conditions,
538 world: UnsafeWorldCell,
539 ) -> bool {
540 let mut should_run = !self.skipped_systems.contains(system_index);
541 let error_handler = default_error_handler();
542
543 for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
544 if self.evaluated_sets.contains(set_idx) {
545 continue;
546 }
547
548 let set_conditions_met = unsafe {
554 evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
555 };
556
557 if !set_conditions_met {
558 self.skipped_systems
559 .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
560 }
561
562 should_run &= set_conditions_met;
563 self.evaluated_sets.insert(set_idx);
564 }
565
566 let system_conditions_met = unsafe {
572 evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
573 };
574
575 if !system_conditions_met {
576 self.skipped_systems.insert(system_index);
577 }
578
579 should_run &= system_conditions_met;
580
581 if should_run {
582 let valid_params = match unsafe { system.validate_param_unsafe(world) } {
587 Ok(()) => true,
588 Err(e) => {
589 if !e.skipped {
590 error_handler(
591 e.into(),
592 ErrorContext::System {
593 name: system.name(),
594 last_run: system.get_last_run(),
595 },
596 );
597 }
598 false
599 }
600 };
601 if !valid_params {
602 self.skipped_systems.insert(system_index);
603 }
604
605 should_run &= valid_params;
606 }
607
608 should_run
609 }
610
611 unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
619 let system = unsafe { &mut *context.environment.systems[system_index].get() };
621 let context = *context;
623
624 let system_meta = &self.system_task_metadata[system_index];
625
626 let task = async move {
627 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
628 unsafe {
634 if let Err(err) = __rust_begin_short_backtrace::run_unsafe(
635 system,
636 context.environment.world_cell,
637 ) {
638 (context.error_handler)(
639 err,
640 ErrorContext::System {
641 name: system.name(),
642 last_run: system.get_last_run(),
643 },
644 );
645 }
646 };
647 }));
648 context.system_completed(system_index, res, system);
649 };
650
651 self.active_access
652 .extend(&system_meta.archetype_component_access);
653
654 if system_meta.is_send {
655 context.scope.spawn(task);
656 } else {
657 self.local_thread_running = true;
658 context.scope.spawn_on_external(task);
659 }
660 }
661
662 unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
665 let system = unsafe { &mut *context.environment.systems[system_index].get() };
667 let context = *context;
669
670 if is_apply_deferred(system) {
671 let unapplied_systems = self.unapplied_systems.clone();
673 self.unapplied_systems.clear();
674 let task = async move {
675 let world = unsafe { context.environment.world_cell.world_mut() };
678 let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
679 context.system_completed(system_index, res, system);
680 };
681
682 context.scope.spawn_on_scope(task);
683 } else {
684 let task = async move {
685 let world = unsafe { context.environment.world_cell.world_mut() };
688 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
689 if let Err(err) = __rust_begin_short_backtrace::run(system, world) {
690 (context.error_handler)(
691 err,
692 ErrorContext::System {
693 name: system.name(),
694 last_run: system.get_last_run(),
695 },
696 );
697 }
698 }));
699 context.system_completed(system_index, res, system);
700 };
701
702 context.scope.spawn_on_scope(task);
703 }
704
705 self.exclusive_running = true;
706 self.local_thread_running = true;
707 }
708
709 fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
710 let SystemResult { system_index, .. } = result;
711
712 if self.system_task_metadata[system_index].is_exclusive {
713 self.exclusive_running = false;
714 }
715
716 if !self.system_task_metadata[system_index].is_send {
717 self.local_thread_running = false;
718 }
719
720 debug_assert!(self.num_running_systems >= 1);
721 self.num_running_systems -= 1;
722 self.running_systems.remove(system_index);
723 self.completed_systems.insert(system_index);
724 self.unapplied_systems.insert(system_index);
725
726 self.signal_dependents(system_index);
727 }
728
729 fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
730 self.completed_systems.insert(system_index);
731 self.signal_dependents(system_index);
732 }
733
734 fn signal_dependents(&mut self, system_index: usize) {
735 for &dep_idx in &self.system_task_metadata[system_index].dependents {
736 let remaining = &mut self.num_dependencies_remaining[dep_idx];
737 debug_assert!(*remaining >= 1);
738 *remaining -= 1;
739 if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
740 self.ready_systems.insert(dep_idx);
741 }
742 }
743 }
744
745 fn rebuild_active_access(&mut self) {
746 self.active_access.clear();
747 for index in self.running_systems.ones() {
748 let system_meta = &self.system_task_metadata[index];
749 self.active_access
750 .extend(&system_meta.archetype_component_access);
751 }
752 }
753}
754
755fn apply_deferred(
756 unapplied_systems: &FixedBitSet,
757 systems: &[SyncUnsafeCell<ScheduleSystem>],
758 world: &mut World,
759) -> Result<(), Box<dyn Any + Send>> {
760 for system_index in unapplied_systems.ones() {
761 let system = unsafe { &mut *systems[system_index].get() };
763 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
764 system.apply_deferred(world);
765 }));
766 if let Err(payload) = res {
767 #[cfg(feature = "std")]
768 #[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
769 {
770 eprintln!(
771 "Encountered a panic when applying buffers for system `{}`!",
772 &*system.name()
773 );
774 }
775 return Err(payload);
776 }
777 }
778 Ok(())
779}
780
781unsafe fn evaluate_and_fold_conditions(
787 conditions: &mut [BoxedCondition],
788 world: UnsafeWorldCell,
789) -> bool {
790 let error_handler = default_error_handler();
791
792 #[expect(
793 clippy::unnecessary_fold,
794 reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
795 )]
796 conditions
797 .iter_mut()
798 .map(|condition| {
799 match unsafe { condition.validate_param_unsafe(world) } {
804 Ok(()) => (),
805 Err(e) => {
806 if !e.skipped {
807 error_handler(
808 e.into(),
809 ErrorContext::System {
810 name: condition.name(),
811 last_run: condition.get_last_run(),
812 },
813 );
814 }
815 return false;
816 }
817 }
818 unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
823 })
824 .fold(true, |acc, res| acc && res)
825}
826
827#[derive(Resource, Clone)]
829pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
830
831impl Default for MainThreadExecutor {
832 fn default() -> Self {
833 Self::new()
834 }
835}
836
837impl MainThreadExecutor {
838 pub fn new() -> Self {
840 MainThreadExecutor(TaskPool::get_thread_executor())
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use crate::{
847 prelude::Resource,
848 schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},
849 system::Commands,
850 world::World,
851 };
852
853 #[derive(Resource)]
854 struct R;
855
856 #[test]
857 fn skipped_systems_notify_dependents() {
858 let mut world = World::new();
859 let mut schedule = Schedule::default();
860 schedule.set_executor_kind(ExecutorKind::MultiThreaded);
861 schedule.add_systems(
862 (
863 (|| {}).run_if(|| false),
864 |mut commands: Commands| {
866 commands.insert_resource(R);
867 },
868 )
869 .chain(),
870 );
871 schedule.run(&mut world);
872 assert!(world.get_resource::<R>().is_some());
873 }
874
875 #[test]
879 fn check_spawn_exclusive_system_task_miri() {
880 let mut world = World::new();
881 let mut schedule = Schedule::default();
882 schedule.set_executor_kind(ExecutorKind::MultiThreaded);
883 schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
884 schedule.run(&mut world);
885 }
886}