1use alloc::sync::Arc;
2use core::any::Any;
3use std::sync::{Mutex, MutexGuard};
4
5use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
6#[cfg(feature = "trace")]
7use bevy_utils::tracing::info_span;
8#[cfg(feature = "trace")]
9use bevy_utils::tracing::Span;
10use bevy_utils::{default, syncunsafecell::SyncUnsafeCell};
11use core::panic::AssertUnwindSafe;
12
13use concurrent_queue::ConcurrentQueue;
14use fixedbitset::FixedBitSet;
15
16use crate::{
17 archetype::ArchetypeComponentId,
18 prelude::Resource,
19 query::Access,
20 schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
21 system::BoxedSystem,
22 world::{unsafe_world_cell::UnsafeWorldCell, World},
23};
24
25use crate as bevy_ecs;
26
27use super::__rust_begin_short_backtrace;
28
29struct Environment<'env, 'sys> {
31 executor: &'env MultiThreadedExecutor,
32 systems: &'sys [SyncUnsafeCell<BoxedSystem>],
33 conditions: SyncUnsafeCell<Conditions<'sys>>,
34 world_cell: UnsafeWorldCell<'env>,
35}
36
37struct Conditions<'a> {
38 system_conditions: &'a mut [Vec<BoxedCondition>],
39 set_conditions: &'a mut [Vec<BoxedCondition>],
40 sets_with_conditions_of_systems: &'a [FixedBitSet],
41 systems_in_sets_with_conditions: &'a [FixedBitSet],
42}
43
44impl<'env, 'sys> Environment<'env, 'sys> {
45 fn new(
46 executor: &'env MultiThreadedExecutor,
47 schedule: &'sys mut SystemSchedule,
48 world: &'env mut World,
49 ) -> Self {
50 Environment {
51 executor,
52 systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
53 conditions: SyncUnsafeCell::new(Conditions {
54 system_conditions: &mut schedule.system_conditions,
55 set_conditions: &mut schedule.set_conditions,
56 sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
57 systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
58 }),
59 world_cell: world.as_unsafe_world_cell(),
60 }
61 }
62}
63
64struct SystemTaskMetadata {
67 archetype_component_access: Access<ArchetypeComponentId>,
69 dependents: Vec<usize>,
71 is_send: bool,
73 is_exclusive: bool,
75}
76
77struct SystemResult {
79 system_index: usize,
80}
81
82pub struct MultiThreadedExecutor {
84 state: Mutex<ExecutorState>,
86 system_completion: ConcurrentQueue<SystemResult>,
88 apply_final_deferred: bool,
90 panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
92 starting_systems: FixedBitSet,
93 #[cfg(feature = "trace")]
95 executor_span: Span,
96}
97
98pub struct ExecutorState {
100 system_task_metadata: Vec<SystemTaskMetadata>,
102 active_access: Access<ArchetypeComponentId>,
104 local_thread_running: bool,
106 exclusive_running: bool,
108 num_running_systems: usize,
110 num_dependencies_remaining: Vec<usize>,
112 evaluated_sets: FixedBitSet,
114 ready_systems: FixedBitSet,
116 ready_systems_copy: FixedBitSet,
118 running_systems: FixedBitSet,
120 skipped_systems: FixedBitSet,
122 completed_systems: FixedBitSet,
124 unapplied_systems: FixedBitSet,
126}
127
128#[derive(Copy, Clone)]
133struct Context<'scope, 'env, 'sys> {
134 environment: &'env Environment<'env, 'sys>,
135 scope: &'scope Scope<'scope, 'env, ()>,
136}
137
138impl Default for MultiThreadedExecutor {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl SystemExecutor for MultiThreadedExecutor {
145 fn kind(&self) -> ExecutorKind {
146 ExecutorKind::MultiThreaded
147 }
148
149 fn init(&mut self, schedule: &SystemSchedule) {
150 let state = self.state.get_mut().unwrap();
151 let sys_count = schedule.system_ids.len();
153 let set_count = schedule.set_ids.len();
154
155 self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
156 self.starting_systems = FixedBitSet::with_capacity(sys_count);
157 state.evaluated_sets = FixedBitSet::with_capacity(set_count);
158 state.ready_systems = FixedBitSet::with_capacity(sys_count);
159 state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
160 state.running_systems = FixedBitSet::with_capacity(sys_count);
161 state.completed_systems = FixedBitSet::with_capacity(sys_count);
162 state.skipped_systems = FixedBitSet::with_capacity(sys_count);
163 state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
164
165 state.system_task_metadata = Vec::with_capacity(sys_count);
166 for index in 0..sys_count {
167 state.system_task_metadata.push(SystemTaskMetadata {
168 archetype_component_access: default(),
169 dependents: schedule.system_dependents[index].clone(),
170 is_send: schedule.systems[index].is_send(),
171 is_exclusive: schedule.systems[index].is_exclusive(),
172 });
173 if schedule.system_dependencies[index] == 0 {
174 self.starting_systems.insert(index);
175 }
176 }
177
178 state.num_dependencies_remaining = Vec::with_capacity(sys_count);
179 }
180
181 fn run(
182 &mut self,
183 schedule: &mut SystemSchedule,
184 world: &mut World,
185 _skip_systems: Option<&FixedBitSet>,
186 ) {
187 let state = self.state.get_mut().unwrap();
188 if schedule.systems.is_empty() {
190 return;
191 }
192 state.num_running_systems = 0;
193 state
194 .num_dependencies_remaining
195 .clone_from(&schedule.system_dependencies);
196 state.ready_systems.clone_from(&self.starting_systems);
197
198 #[cfg(feature = "bevy_debug_stepping")]
201 if let Some(skipped_systems) = _skip_systems {
202 debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
203 state.completed_systems |= skipped_systems;
205
206 for system_index in skipped_systems.ones() {
209 state.signal_dependents(system_index);
210 state.ready_systems.remove(system_index);
211 }
212 }
213
214 let thread_executor = world
215 .get_resource::<MainThreadExecutor>()
216 .map(|e| e.0.clone());
217 let thread_executor = thread_executor.as_deref();
218
219 let environment = &Environment::new(self, schedule, world);
220
221 ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
222 false,
223 thread_executor,
224 |scope| {
225 let context = Context { environment, scope };
226
227 context.tick_executor();
230 },
231 );
232
233 let systems = environment.systems;
235
236 let state = self.state.get_mut().unwrap();
237 if self.apply_final_deferred {
238 let res = apply_deferred(&state.unapplied_systems, systems, world);
241 if let Err(payload) = res {
242 let panic_payload = self.panic_payload.get_mut().unwrap();
243 *panic_payload = Some(payload);
244 }
245 state.unapplied_systems.clear();
246 }
247
248 let payload = self.panic_payload.get_mut().unwrap();
250 if let Some(payload) = payload.take() {
251 std::panic::resume_unwind(payload);
252 }
253
254 debug_assert!(state.ready_systems.is_clear());
255 debug_assert!(state.running_systems.is_clear());
256 state.active_access.clear();
257 state.evaluated_sets.clear();
258 state.skipped_systems.clear();
259 state.completed_systems.clear();
260 }
261
262 fn set_apply_final_deferred(&mut self, value: bool) {
263 self.apply_final_deferred = value;
264 }
265}
266
267impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
268 fn system_completed(
269 &self,
270 system_index: usize,
271 res: Result<(), Box<dyn Any + Send>>,
272 system: &BoxedSystem,
273 ) {
274 self.environment
276 .executor
277 .system_completion
278 .push(SystemResult { system_index })
279 .unwrap_or_else(|error| unreachable!("{}", error));
280 if let Err(payload) = res {
281 eprintln!("Encountered a panic in system `{}`!", &*system.name());
282 {
284 let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
285 *panic_payload = Some(payload);
286 }
287 }
288 self.tick_executor();
289 }
290
291 fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
292 let guard = self.environment.executor.state.try_lock().ok()?;
293 let conditions = unsafe { &mut *self.environment.conditions.get() };
296 Some((conditions, guard))
297 }
298
299 fn tick_executor(&self) {
300 loop {
306 let Some((conditions, mut guard)) = self.try_lock() else {
307 return;
308 };
309 guard.tick(self, conditions);
310 drop(guard);
312 if self.environment.executor.system_completion.is_empty() {
313 return;
314 }
315 }
316 }
317}
318
319impl MultiThreadedExecutor {
320 pub fn new() -> Self {
324 Self {
325 state: Mutex::new(ExecutorState::new()),
326 system_completion: ConcurrentQueue::unbounded(),
327 starting_systems: FixedBitSet::new(),
328 apply_final_deferred: true,
329 panic_payload: Mutex::new(None),
330 #[cfg(feature = "trace")]
331 executor_span: info_span!("multithreaded executor"),
332 }
333 }
334}
335
336impl ExecutorState {
337 fn new() -> Self {
338 Self {
339 system_task_metadata: Vec::new(),
340 num_running_systems: 0,
341 num_dependencies_remaining: Vec::new(),
342 active_access: default(),
343 local_thread_running: false,
344 exclusive_running: false,
345 evaluated_sets: FixedBitSet::new(),
346 ready_systems: FixedBitSet::new(),
347 ready_systems_copy: FixedBitSet::new(),
348 running_systems: FixedBitSet::new(),
349 skipped_systems: FixedBitSet::new(),
350 completed_systems: FixedBitSet::new(),
351 unapplied_systems: FixedBitSet::new(),
352 }
353 }
354
355 fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
356 #[cfg(feature = "trace")]
357 let _span = context.environment.executor.executor_span.enter();
358
359 for result in context.environment.executor.system_completion.try_iter() {
360 self.finish_system_and_handle_dependents(result);
361 }
362
363 self.rebuild_active_access();
364
365 unsafe {
369 self.spawn_system_tasks(context, conditions);
370 }
371 }
372
373 unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
379 if self.exclusive_running {
380 return;
381 }
382
383 let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
385
386 let mut check_for_new_ready_systems = true;
389 while check_for_new_ready_systems {
390 check_for_new_ready_systems = false;
391
392 ready_systems.clone_from(&self.ready_systems);
393
394 for system_index in ready_systems.ones() {
395 debug_assert!(!self.running_systems.contains(system_index));
396 let system = unsafe { &mut *context.environment.systems[system_index].get() };
399
400 if !self.can_run(
401 system_index,
402 system,
403 conditions,
404 context.environment.world_cell,
405 ) {
406 continue;
411 }
412
413 self.ready_systems.remove(system_index);
414
415 if unsafe {
419 !self.should_run(
420 system_index,
421 system,
422 conditions,
423 context.environment.world_cell,
424 )
425 } {
426 self.skip_system_and_signal_dependents(system_index);
427 check_for_new_ready_systems = true;
429 continue;
430 }
431
432 self.running_systems.insert(system_index);
433 self.num_running_systems += 1;
434
435 if self.system_task_metadata[system_index].is_exclusive {
436 unsafe {
439 self.spawn_exclusive_system_task(context, system_index);
440 }
441 check_for_new_ready_systems = false;
442 break;
443 }
444
445 unsafe {
450 self.spawn_system_task(context, system_index);
451 }
452 }
453 }
454
455 self.ready_systems_copy = ready_systems;
457 }
458
459 fn can_run(
460 &mut self,
461 system_index: usize,
462 system: &mut BoxedSystem,
463 conditions: &mut Conditions,
464 world: UnsafeWorldCell,
465 ) -> bool {
466 let system_meta = &self.system_task_metadata[system_index];
467 if system_meta.is_exclusive && self.num_running_systems > 0 {
468 return false;
469 }
470
471 if !system_meta.is_send && self.local_thread_running {
472 return false;
473 }
474
475 for set_idx in conditions.sets_with_conditions_of_systems[system_index]
477 .difference(&self.evaluated_sets)
478 {
479 for condition in &mut conditions.set_conditions[set_idx] {
480 condition.update_archetype_component_access(world);
481 if !condition
482 .archetype_component_access()
483 .is_compatible(&self.active_access)
484 {
485 return false;
486 }
487 }
488 }
489
490 for condition in &mut conditions.system_conditions[system_index] {
491 condition.update_archetype_component_access(world);
492 if !condition
493 .archetype_component_access()
494 .is_compatible(&self.active_access)
495 {
496 return false;
497 }
498 }
499
500 if !self.skipped_systems.contains(system_index) {
501 system.update_archetype_component_access(world);
502 if !system
503 .archetype_component_access()
504 .is_compatible(&self.active_access)
505 {
506 return false;
507 }
508
509 self.system_task_metadata[system_index]
510 .archetype_component_access
511 .clone_from(system.archetype_component_access());
512 }
513
514 true
515 }
516
517 unsafe fn should_run(
524 &mut self,
525 system_index: usize,
526 system: &mut BoxedSystem,
527 conditions: &mut Conditions,
528 world: UnsafeWorldCell,
529 ) -> bool {
530 let mut should_run = !self.skipped_systems.contains(system_index);
531
532 for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
533 if self.evaluated_sets.contains(set_idx) {
534 continue;
535 }
536
537 let set_conditions_met = unsafe {
543 evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world)
544 };
545
546 if !set_conditions_met {
547 self.skipped_systems
548 .union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
549 }
550
551 should_run &= set_conditions_met;
552 self.evaluated_sets.insert(set_idx);
553 }
554
555 let system_conditions_met = unsafe {
561 evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world)
562 };
563
564 if !system_conditions_met {
565 self.skipped_systems.insert(system_index);
566 }
567
568 should_run &= system_conditions_met;
569
570 if should_run {
571 let valid_params = unsafe { system.validate_param_unsafe(world) };
576 if !valid_params {
577 self.skipped_systems.insert(system_index);
578 }
579 should_run &= valid_params;
580 }
581
582 should_run
583 }
584
585 unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
592 let system = unsafe { &mut *context.environment.systems[system_index].get() };
594 let context = *context;
596
597 let system_meta = &self.system_task_metadata[system_index];
598
599 let task = async move {
600 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
601 unsafe {
606 __rust_begin_short_backtrace::run_unsafe(
607 &mut **system,
608 context.environment.world_cell,
609 );
610 };
611 }));
612 context.system_completed(system_index, res, system);
613 };
614
615 self.active_access
616 .extend(&system_meta.archetype_component_access);
617
618 if system_meta.is_send {
619 context.scope.spawn(task);
620 } else {
621 self.local_thread_running = true;
622 context.scope.spawn_on_external(task);
623 }
624 }
625
626 unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
629 let system = unsafe { &mut *context.environment.systems[system_index].get() };
631 let context = *context;
633
634 if is_apply_deferred(system) {
635 let unapplied_systems = self.unapplied_systems.clone();
637 self.unapplied_systems.clear();
638 let task = async move {
639 let world = unsafe { context.environment.world_cell.world_mut() };
642 let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
643 context.system_completed(system_index, res, system);
644 };
645
646 context.scope.spawn_on_scope(task);
647 } else {
648 let task = async move {
649 let world = unsafe { context.environment.world_cell.world_mut() };
652 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
653 __rust_begin_short_backtrace::run(&mut **system, world);
654 }));
655 context.system_completed(system_index, res, system);
656 };
657
658 context.scope.spawn_on_scope(task);
659 }
660
661 self.exclusive_running = true;
662 self.local_thread_running = true;
663 }
664
665 fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
666 let SystemResult { system_index, .. } = result;
667
668 if self.system_task_metadata[system_index].is_exclusive {
669 self.exclusive_running = false;
670 }
671
672 if !self.system_task_metadata[system_index].is_send {
673 self.local_thread_running = false;
674 }
675
676 debug_assert!(self.num_running_systems >= 1);
677 self.num_running_systems -= 1;
678 self.running_systems.remove(system_index);
679 self.completed_systems.insert(system_index);
680 self.unapplied_systems.insert(system_index);
681
682 self.signal_dependents(system_index);
683 }
684
685 fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
686 self.completed_systems.insert(system_index);
687 self.signal_dependents(system_index);
688 }
689
690 fn signal_dependents(&mut self, system_index: usize) {
691 for &dep_idx in &self.system_task_metadata[system_index].dependents {
692 let remaining = &mut self.num_dependencies_remaining[dep_idx];
693 debug_assert!(*remaining >= 1);
694 *remaining -= 1;
695 if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
696 self.ready_systems.insert(dep_idx);
697 }
698 }
699 }
700
701 fn rebuild_active_access(&mut self) {
702 self.active_access.clear();
703 for index in self.running_systems.ones() {
704 let system_meta = &self.system_task_metadata[index];
705 self.active_access
706 .extend(&system_meta.archetype_component_access);
707 }
708 }
709}
710
711fn apply_deferred(
712 unapplied_systems: &FixedBitSet,
713 systems: &[SyncUnsafeCell<BoxedSystem>],
714 world: &mut World,
715) -> Result<(), Box<dyn Any + Send>> {
716 for system_index in unapplied_systems.ones() {
717 let system = unsafe { &mut *systems[system_index].get() };
719 let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
720 system.apply_deferred(world);
721 }));
722 if let Err(payload) = res {
723 eprintln!(
724 "Encountered a panic when applying buffers for system `{}`!",
725 &*system.name()
726 );
727 return Err(payload);
728 }
729 }
730 Ok(())
731}
732
733unsafe fn evaluate_and_fold_conditions(
739 conditions: &mut [BoxedCondition],
740 world: UnsafeWorldCell,
741) -> bool {
742 #[allow(clippy::unnecessary_fold)]
744 conditions
745 .iter_mut()
746 .map(|condition| {
747 if !unsafe { condition.validate_param_unsafe(world) } {
752 return false;
753 }
754 unsafe { __rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world) }
759 })
760 .fold(true, |acc, res| acc && res)
761}
762
763#[derive(Resource, Clone)]
765pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
766
767impl Default for MainThreadExecutor {
768 fn default() -> Self {
769 Self::new()
770 }
771}
772
773impl MainThreadExecutor {
774 pub fn new() -> Self {
776 MainThreadExecutor(TaskPool::get_thread_executor())
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use crate::{
783 self as bevy_ecs,
784 prelude::Resource,
785 schedule::{ExecutorKind, IntoSystemConfigs, Schedule},
786 system::Commands,
787 world::World,
788 };
789
790 #[derive(Resource)]
791 struct R;
792
793 #[test]
794 fn skipped_systems_notify_dependents() {
795 let mut world = World::new();
796 let mut schedule = Schedule::default();
797 schedule.set_executor_kind(ExecutorKind::MultiThreaded);
798 schedule.add_systems(
799 (
800 (|| {}).run_if(|| false),
801 |mut commands: Commands| {
803 commands.insert_resource(R);
804 },
805 )
806 .chain(),
807 );
808 schedule.run(&mut world);
809 assert!(world.get_resource::<R>().is_some());
810 }
811
812 #[test]
816 fn check_spawn_exclusive_system_task_miri() {
817 let mut world = World::new();
818 let mut schedule = Schedule::default();
819 schedule.set_executor_kind(ExecutorKind::MultiThreaded);
820 schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
821 schedule.run(&mut world);
822 }
823}