bevy_ecs/system/combinator.rs
1use alloc::{format, vec::Vec};
2use bevy_utils::prelude::DebugName;
3use core::marker::PhantomData;
4
5use crate::{
6 change_detection::{CheckChangeTicks, Tick},
7 error::ErrorContext,
8 prelude::World,
9 query::FilteredAccessSet,
10 schedule::InternedSystemSet,
11 system::{input::SystemInput, SystemIn},
12 world::unsafe_world_cell::UnsafeWorldCell,
13};
14
15use super::{IntoSystem, ReadOnlySystem, RunSystemError, System};
16
17/// Customizes the behavior of a [`CombinatorSystem`].
18///
19/// # Examples
20///
21/// ```
22/// use bevy_ecs::prelude::*;
23/// use bevy_ecs::system::{CombinatorSystem, Combine, RunSystemError};
24///
25/// // A system combinator that performs an exclusive-or (XOR)
26/// // operation on the output of two systems.
27/// pub type Xor<A, B> = CombinatorSystem<XorMarker, A, B>;
28///
29/// // This struct is used to customize the behavior of our combinator.
30/// pub struct XorMarker;
31///
32/// impl<A, B> Combine<A, B> for XorMarker
33/// where
34/// A: System<In = (), Out = bool>,
35/// B: System<In = (), Out = bool>,
36/// {
37/// type In = ();
38/// type Out = bool;
39///
40/// fn combine<T>(
41/// _input: Self::In,
42/// data: &mut T,
43/// a: impl FnOnce(A::In, &mut T) -> Result<A::Out, RunSystemError>,
44/// b: impl FnOnce(B::In, &mut T) -> Result<B::Out, RunSystemError>,
45/// ) -> Result<Self::Out, RunSystemError> {
46/// Ok(a((), data).unwrap_or(false) ^ b((), data).unwrap_or(false))
47/// }
48/// }
49///
50/// # #[derive(Resource, PartialEq, Eq)] struct A(u32);
51/// # #[derive(Resource, PartialEq, Eq)] struct B(u32);
52/// # #[derive(Resource, Default)] struct RanFlag(bool);
53/// # let mut world = World::new();
54/// # world.init_resource::<RanFlag>();
55/// #
56/// # let mut app = Schedule::default();
57/// app.add_systems(my_system.run_if(Xor::new(
58/// IntoSystem::into_system(resource_equals(A(1))),
59/// IntoSystem::into_system(resource_equals(B(1))),
60/// // The name of the combined system.
61/// "a ^ b".into(),
62/// )));
63/// # fn my_system(mut flag: ResMut<RanFlag>) { flag.0 = true; }
64/// #
65/// # world.insert_resource(A(0));
66/// # world.insert_resource(B(0));
67/// # app.run(&mut world);
68/// # // Neither condition passes, so the system does not run.
69/// # assert!(!world.resource::<RanFlag>().0);
70/// #
71/// # world.insert_resource(A(1));
72/// # app.run(&mut world);
73/// # // Only the first condition passes, so the system runs.
74/// # assert!(world.resource::<RanFlag>().0);
75/// # world.resource_mut::<RanFlag>().0 = false;
76/// #
77/// # world.insert_resource(B(1));
78/// # app.run(&mut world);
79/// # // Both conditions pass, so the system does not run.
80/// # assert!(!world.resource::<RanFlag>().0);
81/// #
82/// # world.insert_resource(A(0));
83/// # app.run(&mut world);
84/// # // Only the second condition passes, so the system runs.
85/// # assert!(world.resource::<RanFlag>().0);
86/// # world.resource_mut::<RanFlag>().0 = false;
87/// ```
88#[diagnostic::on_unimplemented(
89 message = "`{Self}` can not combine systems `{A}` and `{B}`",
90 label = "invalid system combination",
91 note = "the inputs and outputs of `{A}` and `{B}` are not compatible with this combiner"
92)]
93pub trait Combine<A: System, B: System> {
94 /// The [input](System::In) type for a [`CombinatorSystem`].
95 type In: SystemInput;
96
97 /// The [output](System::Out) type for a [`CombinatorSystem`].
98 type Out;
99
100 /// When used in a [`CombinatorSystem`], this function customizes how
101 /// the two composite systems are invoked and their outputs are combined.
102 ///
103 /// See the trait-level docs for [`Combine`] for an example implementation.
104 fn combine<T>(
105 input: <Self::In as SystemInput>::Inner<'_>,
106 data: &mut T,
107 a: impl FnOnce(SystemIn<'_, A>, &mut T) -> Result<A::Out, RunSystemError>,
108 b: impl FnOnce(SystemIn<'_, B>, &mut T) -> Result<B::Out, RunSystemError>,
109 ) -> Result<Self::Out, RunSystemError>;
110}
111
112/// A [`System`] defined by combining two other systems.
113/// The behavior of this combinator is specified by implementing the [`Combine`] trait.
114/// For a full usage example, see the docs for [`Combine`].
115pub struct CombinatorSystem<Func, A, B> {
116 _marker: PhantomData<fn() -> Func>,
117 a: A,
118 b: B,
119 name: DebugName,
120}
121
122impl<Func, A, B> CombinatorSystem<Func, A, B> {
123 /// Creates a new system that combines two inner systems.
124 ///
125 /// The returned system will only be usable if `Func` implements [`Combine<A, B>`].
126 pub fn new(a: A, b: B, name: DebugName) -> Self {
127 Self {
128 _marker: PhantomData,
129 a,
130 b,
131 name,
132 }
133 }
134}
135
136impl<A, B, Func> System for CombinatorSystem<Func, A, B>
137where
138 Func: Combine<A, B> + 'static,
139 A: System,
140 B: System,
141{
142 type In = Func::In;
143 type Out = Func::Out;
144
145 fn name(&self) -> DebugName {
146 self.name.clone()
147 }
148
149 #[inline]
150 fn flags(&self) -> super::SystemStateFlags {
151 self.a.flags() | self.b.flags()
152 }
153
154 unsafe fn run_unsafe(
155 &mut self,
156 input: SystemIn<'_, Self>,
157 world: UnsafeWorldCell,
158 ) -> Result<Self::Out, RunSystemError> {
159 struct PrivateUnsafeWorldCell<'w>(UnsafeWorldCell<'w>);
160
161 // Since control over handling system run errors is passed on to the
162 // implementation of `Func::combine`, which may run the two closures
163 // however it wants, errors must be intercepted here if they should be
164 // handled by the world's error handler.
165 unsafe fn run_system<S: System>(
166 system: &mut S,
167 input: SystemIn<S>,
168 world: &mut PrivateUnsafeWorldCell,
169 ) -> Result<S::Out, RunSystemError> {
170 // SAFETY: see comment on `Func::combine` call
171 match unsafe { system.run_unsafe(input, world.0) } {
172 // let the world's fallback error handler handle the error if `Failed(_)`
173 Err(RunSystemError::Failed(err)) => {
174 // SAFETY: We registered access to FallbackErrorHandler in `initialize`.
175 (unsafe { world.0.fallback_error_handler() })(
176 err,
177 ErrorContext::System {
178 name: system.name(),
179 last_run: system.get_last_run(),
180 },
181 );
182
183 // Since the error handler takes the error by value, create a new error:
184 // The original error has already been handled, including
185 // the reason for the failure here isn't important.
186 Err(format!("System `{}` failed", system.name()).into())
187 }
188 // `Skipped(_)` and `Ok(_)` are passed through:
189 // system skipping is not an error, and isn't passed to the
190 // world's error handler by the executors.
191 result @ (Ok(_) | Err(RunSystemError::Skipped(_))) => result,
192 }
193 }
194
195 Func::combine(
196 input,
197 &mut PrivateUnsafeWorldCell(world),
198 // SAFETY: The world accesses for both underlying systems have been registered,
199 // so the caller will guarantee that no other systems will conflict with (`a` or `b`) and the `FallbackErrorHandler` resource.
200 // If either system has `is_exclusive()`, then the combined system also has `is_exclusive`.
201 // Since we require a `combine` to pass in a mutable reference to `world` and that's a private type
202 // passed to a function as an unbound non-'static generic argument, they can never be called in parallel
203 // or re-entrantly because that would require forging another instance of `PrivateUnsafeWorldCell`.
204 // This means that the world accesses in the two closures will not conflict with each other.
205 // The closure's access to the FallbackErrorHandler does not
206 // conflict with any potential access to the FallbackErrorHandler by
207 // the systems since the closures are not run in parallel.
208 |input, world| unsafe { run_system(&mut self.a, input, world) },
209 // SAFETY: See the comment above.
210 |input, world| unsafe { run_system(&mut self.b, input, world) },
211 )
212 }
213
214 #[cfg(feature = "hotpatching")]
215 #[inline]
216 fn refresh_hotpatch(&mut self) {
217 self.a.refresh_hotpatch();
218 self.b.refresh_hotpatch();
219 }
220
221 #[inline]
222 fn apply_deferred(&mut self, world: &mut World) {
223 self.a.apply_deferred(world);
224 self.b.apply_deferred(world);
225 }
226
227 #[inline]
228 fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
229 self.a.queue_deferred(world.reborrow());
230 self.b.queue_deferred(world);
231 }
232
233 fn initialize(&mut self, world: &mut World) -> FilteredAccessSet {
234 let mut a_access = self.a.initialize(world);
235 let b_access = self.b.initialize(world);
236 a_access.extend(b_access);
237
238 // We might need to read the fallback error handler after the component
239 // systems have run to report failures.
240 let error_resource = world.register_component::<crate::error::FallbackErrorHandler>();
241 a_access.add_resource_read(error_resource);
242 a_access
243 }
244
245 fn check_change_tick(&mut self, check: CheckChangeTicks) {
246 self.a.check_change_tick(check);
247 self.b.check_change_tick(check);
248 }
249
250 fn default_system_sets(&self) -> Vec<InternedSystemSet> {
251 let mut default_sets = self.a.default_system_sets();
252 default_sets.append(&mut self.b.default_system_sets());
253 default_sets
254 }
255
256 fn get_last_run(&self) -> Tick {
257 self.a.get_last_run()
258 }
259
260 fn set_last_run(&mut self, last_run: Tick) {
261 self.a.set_last_run(last_run);
262 self.b.set_last_run(last_run);
263 }
264}
265
266// SAFETY: Both systems are read-only, so any system created by combining them will only read from the world.
267unsafe impl<Func, A, B> ReadOnlySystem for CombinatorSystem<Func, A, B>
268where
269 Func: Combine<A, B> + 'static,
270 A: ReadOnlySystem,
271 B: ReadOnlySystem,
272{
273}
274
275impl<Func, A, B> Clone for CombinatorSystem<Func, A, B>
276where
277 A: Clone,
278 B: Clone,
279{
280 /// Clone the combined system. The cloned instance must be `.initialize()`d before it can run.
281 fn clone(&self) -> Self {
282 CombinatorSystem::new(self.a.clone(), self.b.clone(), self.name.clone())
283 }
284}
285
286/// An [`IntoSystem`] creating an instance of [`PipeSystem`].
287#[derive(Clone)]
288pub struct IntoPipeSystem<A, B> {
289 a: A,
290 b: B,
291}
292
293impl<A, B> IntoPipeSystem<A, B> {
294 /// Creates a new [`IntoSystem`] that pipes two inner systems.
295 pub const fn new(a: A, b: B) -> Self {
296 Self { a, b }
297 }
298}
299
300#[doc(hidden)]
301pub struct IsPipeSystemMarker;
302
303impl<A, B, IA, OA, IB, OB, MA, MB> IntoSystem<IA, OB, (IsPipeSystemMarker, OA, IB, MA, MB)>
304 for IntoPipeSystem<A, B>
305where
306 IA: SystemInput,
307 A: IntoSystem<IA, OA, MA>,
308 B: IntoSystem<IB, OB, MB>,
309 for<'a> IB: SystemInput<Inner<'a> = OA>,
310{
311 type System = PipeSystem<A::System, B::System>;
312
313 fn into_system(this: Self) -> Self::System {
314 let system_a = IntoSystem::into_system(this.a);
315 let system_b = IntoSystem::into_system(this.b);
316 let name = format!("Pipe({}, {})", system_a.name(), system_b.name());
317 PipeSystem::new(system_a, system_b, DebugName::owned(name))
318 }
319}
320
321/// A [`System`] created by piping the output of the first system into the input of the second.
322///
323/// This can be repeated indefinitely, but system pipes cannot branch: the output is consumed by the receiving system.
324///
325/// Given two systems `A` and `B`, A may be piped into `B` as `A.pipe(B)` if the output type of `A` is
326/// equal to the input type of `B`.
327///
328/// Note that for [`FunctionSystem`](crate::system::FunctionSystem)s the output is the return value
329/// of the function and the input is the first [`SystemParam`](crate::system::SystemParam) if it is
330/// tagged with [`In`](crate::system::In) or `()` if the function has no designated input parameter.
331///
332/// # Examples
333///
334/// ```
335/// use std::num::ParseIntError;
336///
337/// use bevy_ecs::prelude::*;
338///
339/// fn main() {
340/// let mut world = World::default();
341/// world.insert_resource(Message("42".to_string()));
342///
343/// // pipe the `parse_message_system`'s output into the `filter_system`s input
344/// let mut piped_system = IntoSystem::into_system(parse_message_system.pipe(filter_system));
345/// piped_system.initialize(&mut world);
346/// assert_eq!(piped_system.run((), &mut world).unwrap(), Some(42));
347/// }
348///
349/// #[derive(Resource)]
350/// struct Message(String);
351///
352/// fn parse_message_system(message: Res<Message>) -> Result<usize, ParseIntError> {
353/// message.0.parse::<usize>()
354/// }
355///
356/// fn filter_system(In(result): In<Result<usize, ParseIntError>>) -> Option<usize> {
357/// result.ok().filter(|&n| n < 100)
358/// }
359/// ```
360pub struct PipeSystem<A, B> {
361 a: A,
362 b: B,
363 name: DebugName,
364}
365
366impl<A, B> PipeSystem<A, B>
367where
368 A: System,
369 B: System,
370 for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
371{
372 /// Creates a new system that pipes two inner systems.
373 pub fn new(a: A, b: B, name: DebugName) -> Self {
374 Self { a, b, name }
375 }
376}
377
378impl<A, B> System for PipeSystem<A, B>
379where
380 A: System,
381 B: System,
382 for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
383{
384 type In = A::In;
385 type Out = B::Out;
386
387 fn name(&self) -> DebugName {
388 self.name.clone()
389 }
390
391 #[inline]
392 fn flags(&self) -> super::SystemStateFlags {
393 self.a.flags() | self.b.flags()
394 }
395
396 unsafe fn run_unsafe(
397 &mut self,
398 input: SystemIn<'_, Self>,
399 world: UnsafeWorldCell,
400 ) -> Result<Self::Out, RunSystemError> {
401 // SAFETY: Upheld by caller
402 unsafe {
403 let value = self.a.run_unsafe(input, world)?;
404 self.b.run_unsafe(value, world)
405 }
406 }
407
408 #[cfg(feature = "hotpatching")]
409 #[inline]
410 fn refresh_hotpatch(&mut self) {
411 self.a.refresh_hotpatch();
412 self.b.refresh_hotpatch();
413 }
414
415 fn apply_deferred(&mut self, world: &mut World) {
416 self.a.apply_deferred(world);
417 self.b.apply_deferred(world);
418 }
419
420 fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
421 self.a.queue_deferred(world.reborrow());
422 self.b.queue_deferred(world);
423 }
424
425 fn initialize(&mut self, world: &mut World) -> FilteredAccessSet {
426 let mut a_access = self.a.initialize(world);
427 let b_access = self.b.initialize(world);
428 a_access.extend(b_access);
429 a_access
430 }
431
432 fn check_change_tick(&mut self, check: CheckChangeTicks) {
433 self.a.check_change_tick(check);
434 self.b.check_change_tick(check);
435 }
436
437 fn default_system_sets(&self) -> Vec<InternedSystemSet> {
438 let mut default_sets = self.a.default_system_sets();
439 default_sets.append(&mut self.b.default_system_sets());
440 default_sets
441 }
442
443 fn get_last_run(&self) -> Tick {
444 self.a.get_last_run()
445 }
446
447 fn set_last_run(&mut self, last_run: Tick) {
448 self.a.set_last_run(last_run);
449 self.b.set_last_run(last_run);
450 }
451}
452
453// SAFETY: Both systems are read-only, so any system created by piping them will only read from the world.
454unsafe impl<A, B> ReadOnlySystem for PipeSystem<A, B>
455where
456 A: ReadOnlySystem,
457 B: ReadOnlySystem,
458 for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
459{
460}
461
462#[cfg(test)]
463mod tests {
464 use crate::error::FallbackErrorHandler;
465 use crate::prelude::*;
466 use bevy_utils::prelude::DebugName;
467
468 use crate::{
469 schedule::OrElseMarker,
470 system::{assert_system_does_not_conflict, CombinatorSystem},
471 };
472
473 #[test]
474 fn combinator_with_error_handler_access() {
475 fn my_system(_: ResMut<FallbackErrorHandler>) {}
476 fn a() -> bool {
477 true
478 }
479 fn b(_: ResMut<FallbackErrorHandler>) -> bool {
480 true
481 }
482 fn asdf(_: In<bool>) {}
483
484 let mut world = World::new();
485 world.insert_resource(FallbackErrorHandler::default());
486
487 let system = CombinatorSystem::<OrElseMarker, _, _>::new(
488 IntoSystem::into_system(a),
489 IntoSystem::into_system(b),
490 DebugName::borrowed("a OR b"),
491 );
492
493 // `system` should not conflict with itself by mutably accessing the error handler resource.
494 assert_system_does_not_conflict(system.clone());
495
496 let mut schedule = Schedule::default();
497 schedule.add_systems((my_system, system.pipe(asdf)));
498 schedule.initialize(&mut world).unwrap();
499
500 // `my_system` should conflict with the combinator system because the combinator reads the error handler resource.
501 assert!(!schedule.graph().conflicting_systems().is_empty());
502
503 schedule.run(&mut world);
504 }
505
506 #[test]
507 fn exclusive_system_piping_is_possible() {
508 fn my_exclusive_system(_world: &mut World) -> u32 {
509 1
510 }
511
512 fn out_pipe(input: In<u32>) {
513 assert!(input.0 == 1);
514 }
515
516 let mut world = World::new();
517
518 let mut schedule = Schedule::default();
519 schedule.add_systems(my_exclusive_system.pipe(out_pipe));
520
521 schedule.run(&mut world);
522 }
523}