bevy_ecs/system/
combinator.rs

1use alloc::{format, vec::Vec};
2use bevy_utils::prelude::DebugName;
3use core::marker::PhantomData;
4
5use crate::{
6    change_detection::{CheckChangeTicks, Tick},
7    error::ErrorContext,
8    prelude::World,
9    query::FilteredAccessSet,
10    schedule::InternedSystemSet,
11    system::{input::SystemInput, SystemIn, SystemParamValidationError},
12    world::unsafe_world_cell::UnsafeWorldCell,
13};
14
15use super::{IntoSystem, ReadOnlySystem, RunSystemError, System};
16
17/// Customizes the behavior of a [`CombinatorSystem`].
18///
19/// # Examples
20///
21/// ```
22/// use bevy_ecs::prelude::*;
23/// use bevy_ecs::system::{CombinatorSystem, Combine, RunSystemError};
24///
25/// // A system combinator that performs an exclusive-or (XOR)
26/// // operation on the output of two systems.
27/// pub type Xor<A, B> = CombinatorSystem<XorMarker, A, B>;
28///
29/// // This struct is used to customize the behavior of our combinator.
30/// pub struct XorMarker;
31///
32/// impl<A, B> Combine<A, B> for XorMarker
33/// where
34///     A: System<In = (), Out = bool>,
35///     B: System<In = (), Out = bool>,
36/// {
37///     type In = ();
38///     type Out = bool;
39///
40///     fn combine<T>(
41///         _input: Self::In,
42///         data: &mut T,
43///         a: impl FnOnce(A::In, &mut T) -> Result<A::Out, RunSystemError>,
44///         b: impl FnOnce(B::In, &mut T) -> Result<B::Out, RunSystemError>,
45///     ) -> Result<Self::Out, RunSystemError> {
46///         Ok(a((), data)? ^ b((), data)?)
47///     }
48/// }
49///
50/// # #[derive(Resource, PartialEq, Eq)] struct A(u32);
51/// # #[derive(Resource, PartialEq, Eq)] struct B(u32);
52/// # #[derive(Resource, Default)] struct RanFlag(bool);
53/// # let mut world = World::new();
54/// # world.init_resource::<RanFlag>();
55/// #
56/// # let mut app = Schedule::default();
57/// app.add_systems(my_system.run_if(Xor::new(
58///     IntoSystem::into_system(resource_equals(A(1))),
59///     IntoSystem::into_system(resource_equals(B(1))),
60///     // The name of the combined system.
61///     "a ^ b".into(),
62/// )));
63/// # fn my_system(mut flag: ResMut<RanFlag>) { flag.0 = true; }
64/// #
65/// # world.insert_resource(A(0));
66/// # world.insert_resource(B(0));
67/// # app.run(&mut world);
68/// # // Neither condition passes, so the system does not run.
69/// # assert!(!world.resource::<RanFlag>().0);
70/// #
71/// # world.insert_resource(A(1));
72/// # app.run(&mut world);
73/// # // Only the first condition passes, so the system runs.
74/// # assert!(world.resource::<RanFlag>().0);
75/// # world.resource_mut::<RanFlag>().0 = false;
76/// #
77/// # world.insert_resource(B(1));
78/// # app.run(&mut world);
79/// # // Both conditions pass, so the system does not run.
80/// # assert!(!world.resource::<RanFlag>().0);
81/// #
82/// # world.insert_resource(A(0));
83/// # app.run(&mut world);
84/// # // Only the second condition passes, so the system runs.
85/// # assert!(world.resource::<RanFlag>().0);
86/// # world.resource_mut::<RanFlag>().0 = false;
87/// ```
88#[diagnostic::on_unimplemented(
89    message = "`{Self}` can not combine systems `{A}` and `{B}`",
90    label = "invalid system combination",
91    note = "the inputs and outputs of `{A}` and `{B}` are not compatible with this combiner"
92)]
93pub trait Combine<A: System, B: System> {
94    /// The [input](System::In) type for a [`CombinatorSystem`].
95    type In: SystemInput;
96
97    /// The [output](System::Out) type for a [`CombinatorSystem`].
98    type Out;
99
100    /// When used in a [`CombinatorSystem`], this function customizes how
101    /// the two composite systems are invoked and their outputs are combined.
102    ///
103    /// See the trait-level docs for [`Combine`] for an example implementation.
104    fn combine<T>(
105        input: <Self::In as SystemInput>::Inner<'_>,
106        data: &mut T,
107        a: impl FnOnce(SystemIn<'_, A>, &mut T) -> Result<A::Out, RunSystemError>,
108        b: impl FnOnce(SystemIn<'_, B>, &mut T) -> Result<B::Out, RunSystemError>,
109    ) -> Result<Self::Out, RunSystemError>;
110}
111
112/// A [`System`] defined by combining two other systems.
113/// The behavior of this combinator is specified by implementing the [`Combine`] trait.
114/// For a full usage example, see the docs for [`Combine`].
115pub struct CombinatorSystem<Func, A, B> {
116    _marker: PhantomData<fn() -> Func>,
117    a: A,
118    b: B,
119    name: DebugName,
120}
121
122impl<Func, A, B> CombinatorSystem<Func, A, B> {
123    /// Creates a new system that combines two inner systems.
124    ///
125    /// The returned system will only be usable if `Func` implements [`Combine<A, B>`].
126    pub fn new(a: A, b: B, name: DebugName) -> Self {
127        Self {
128            _marker: PhantomData,
129            a,
130            b,
131            name,
132        }
133    }
134}
135
136impl<A, B, Func> System for CombinatorSystem<Func, A, B>
137where
138    Func: Combine<A, B> + 'static,
139    A: System,
140    B: System,
141{
142    type In = Func::In;
143    type Out = Func::Out;
144
145    fn name(&self) -> DebugName {
146        self.name.clone()
147    }
148
149    #[inline]
150    fn flags(&self) -> super::SystemStateFlags {
151        self.a.flags() | self.b.flags()
152    }
153
154    unsafe fn run_unsafe(
155        &mut self,
156        input: SystemIn<'_, Self>,
157        world: UnsafeWorldCell,
158    ) -> Result<Self::Out, RunSystemError> {
159        struct PrivateUnsafeWorldCell<'w>(UnsafeWorldCell<'w>);
160
161        // Since control over handling system run errors is passed on to the
162        // implementation of `Func::combine`, which may run the two closures
163        // however it wants, errors must be intercepted here if they should be
164        // handled by the world's error handler.
165        unsafe fn run_system<S: System>(
166            system: &mut S,
167            input: SystemIn<S>,
168            world: &mut PrivateUnsafeWorldCell,
169        ) -> Result<S::Out, RunSystemError> {
170            #![deny(unsafe_op_in_unsafe_fn)]
171
172            // SAFETY: see comment on `Func::combine` call
173            match (|| unsafe {
174                system.validate_param_unsafe(world.0)?;
175                system.run_unsafe(input, world.0)
176            })() {
177                // let the world's default error handler handle the error if `Failed(_)`
178                Err(RunSystemError::Failed(err)) => {
179                    // SAFETY: We registered access to DefaultErrorHandler in `initialize`.
180                    (unsafe { world.0.default_error_handler() })(
181                        err,
182                        ErrorContext::System {
183                            name: system.name(),
184                            last_run: system.get_last_run(),
185                        },
186                    );
187
188                    // Since the error handler takes the error by value, create a new error:
189                    // The original error has already been handled, including
190                    // the reason for the failure here isn't important.
191                    Err(format!("System `{}` failed", system.name()).into())
192                }
193                // `Skipped(_)` and `Ok(_)` are passed through:
194                // system skipping is not an error, and isn't passed to the
195                // world's error handler by the executors.
196                result @ (Ok(_) | Err(RunSystemError::Skipped(_))) => result,
197            }
198        }
199
200        Func::combine(
201            input,
202            &mut PrivateUnsafeWorldCell(world),
203            // SAFETY: The world accesses for both underlying systems have been registered,
204            // so the caller will guarantee that no other systems will conflict with (`a` or `b`) and the `DefaultErrorHandler` resource.
205            // If either system has `is_exclusive()`, then the combined system also has `is_exclusive`.
206            // Since we require a `combine` to pass in a mutable reference to `world` and that's a private type
207            // passed to a function as an unbound non-'static generic argument, they can never be called in parallel
208            // or re-entrantly because that would require forging another instance of `PrivateUnsafeWorldCell`.
209            // This means that the world accesses in the two closures will not conflict with each other.
210            // The closure's access to the DefaultErrorHandler does not
211            // conflict with any potential access to the DefaultErrorHandler by
212            // the systems since the closures are not run in parallel.
213            |input, world| unsafe { run_system(&mut self.a, input, world) },
214            // SAFETY: See the comment above.
215            |input, world| unsafe { run_system(&mut self.b, input, world) },
216        )
217    }
218
219    #[cfg(feature = "hotpatching")]
220    #[inline]
221    fn refresh_hotpatch(&mut self) {
222        self.a.refresh_hotpatch();
223        self.b.refresh_hotpatch();
224    }
225
226    #[inline]
227    fn apply_deferred(&mut self, world: &mut World) {
228        self.a.apply_deferred(world);
229        self.b.apply_deferred(world);
230    }
231
232    #[inline]
233    fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
234        self.a.queue_deferred(world.reborrow());
235        self.b.queue_deferred(world);
236    }
237
238    #[inline]
239    unsafe fn validate_param_unsafe(
240        &mut self,
241        _world: UnsafeWorldCell,
242    ) -> Result<(), SystemParamValidationError> {
243        // Both systems are validated in `Self::run_unsafe`, so that we get the
244        // chance to run the second system even if the first one fails to
245        // validate.
246        Ok(())
247    }
248
249    fn initialize(&mut self, world: &mut World) -> FilteredAccessSet {
250        let mut a_access = self.a.initialize(world);
251        let b_access = self.b.initialize(world);
252        a_access.extend(b_access);
253
254        // We might need to read the default error handler after the component
255        // systems have run to report failures.
256        let error_resource = world.register_resource::<crate::error::DefaultErrorHandler>();
257        a_access.add_unfiltered_resource_read(error_resource);
258        a_access
259    }
260
261    fn check_change_tick(&mut self, check: CheckChangeTicks) {
262        self.a.check_change_tick(check);
263        self.b.check_change_tick(check);
264    }
265
266    fn default_system_sets(&self) -> Vec<InternedSystemSet> {
267        let mut default_sets = self.a.default_system_sets();
268        default_sets.append(&mut self.b.default_system_sets());
269        default_sets
270    }
271
272    fn get_last_run(&self) -> Tick {
273        self.a.get_last_run()
274    }
275
276    fn set_last_run(&mut self, last_run: Tick) {
277        self.a.set_last_run(last_run);
278        self.b.set_last_run(last_run);
279    }
280}
281
282/// SAFETY: Both systems are read-only, so any system created by combining them will only read from the world.
283unsafe impl<Func, A, B> ReadOnlySystem for CombinatorSystem<Func, A, B>
284where
285    Func: Combine<A, B> + 'static,
286    A: ReadOnlySystem,
287    B: ReadOnlySystem,
288{
289}
290
291impl<Func, A, B> Clone for CombinatorSystem<Func, A, B>
292where
293    A: Clone,
294    B: Clone,
295{
296    /// Clone the combined system. The cloned instance must be `.initialize()`d before it can run.
297    fn clone(&self) -> Self {
298        CombinatorSystem::new(self.a.clone(), self.b.clone(), self.name.clone())
299    }
300}
301
302/// An [`IntoSystem`] creating an instance of [`PipeSystem`].
303#[derive(Clone)]
304pub struct IntoPipeSystem<A, B> {
305    a: A,
306    b: B,
307}
308
309impl<A, B> IntoPipeSystem<A, B> {
310    /// Creates a new [`IntoSystem`] that pipes two inner systems.
311    pub const fn new(a: A, b: B) -> Self {
312        Self { a, b }
313    }
314}
315
316#[doc(hidden)]
317pub struct IsPipeSystemMarker;
318
319impl<A, B, IA, OA, IB, OB, MA, MB> IntoSystem<IA, OB, (IsPipeSystemMarker, OA, IB, MA, MB)>
320    for IntoPipeSystem<A, B>
321where
322    IA: SystemInput,
323    A: IntoSystem<IA, OA, MA>,
324    B: IntoSystem<IB, OB, MB>,
325    for<'a> IB: SystemInput<Inner<'a> = OA>,
326{
327    type System = PipeSystem<A::System, B::System>;
328
329    fn into_system(this: Self) -> Self::System {
330        let system_a = IntoSystem::into_system(this.a);
331        let system_b = IntoSystem::into_system(this.b);
332        let name = format!("Pipe({}, {})", system_a.name(), system_b.name());
333        PipeSystem::new(system_a, system_b, DebugName::owned(name))
334    }
335}
336
337/// A [`System`] created by piping the output of the first system into the input of the second.
338///
339/// This can be repeated indefinitely, but system pipes cannot branch: the output is consumed by the receiving system.
340///
341/// Given two systems `A` and `B`, A may be piped into `B` as `A.pipe(B)` if the output type of `A` is
342/// equal to the input type of `B`.
343///
344/// Note that for [`FunctionSystem`](crate::system::FunctionSystem)s the output is the return value
345/// of the function and the input is the first [`SystemParam`](crate::system::SystemParam) if it is
346/// tagged with [`In`](crate::system::In) or `()` if the function has no designated input parameter.
347///
348/// # Examples
349///
350/// ```
351/// use std::num::ParseIntError;
352///
353/// use bevy_ecs::prelude::*;
354///
355/// fn main() {
356///     let mut world = World::default();
357///     world.insert_resource(Message("42".to_string()));
358///
359///     // pipe the `parse_message_system`'s output into the `filter_system`s input
360///     let mut piped_system = IntoSystem::into_system(parse_message_system.pipe(filter_system));
361///     piped_system.initialize(&mut world);
362///     assert_eq!(piped_system.run((), &mut world).unwrap(), Some(42));
363/// }
364///
365/// #[derive(Resource)]
366/// struct Message(String);
367///
368/// fn parse_message_system(message: Res<Message>) -> Result<usize, ParseIntError> {
369///     message.0.parse::<usize>()
370/// }
371///
372/// fn filter_system(In(result): In<Result<usize, ParseIntError>>) -> Option<usize> {
373///     result.ok().filter(|&n| n < 100)
374/// }
375/// ```
376pub struct PipeSystem<A, B> {
377    a: A,
378    b: B,
379    name: DebugName,
380}
381
382impl<A, B> PipeSystem<A, B>
383where
384    A: System,
385    B: System,
386    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
387{
388    /// Creates a new system that pipes two inner systems.
389    pub fn new(a: A, b: B, name: DebugName) -> Self {
390        Self { a, b, name }
391    }
392}
393
394impl<A, B> System for PipeSystem<A, B>
395where
396    A: System,
397    B: System,
398    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
399{
400    type In = A::In;
401    type Out = B::Out;
402
403    fn name(&self) -> DebugName {
404        self.name.clone()
405    }
406
407    #[inline]
408    fn flags(&self) -> super::SystemStateFlags {
409        self.a.flags() | self.b.flags()
410    }
411
412    unsafe fn run_unsafe(
413        &mut self,
414        input: SystemIn<'_, Self>,
415        world: UnsafeWorldCell,
416    ) -> Result<Self::Out, RunSystemError> {
417        // SAFETY: Upheld by caller
418        unsafe {
419            let value = self.a.run_unsafe(input, world)?;
420            // `Self::validate_param_unsafe` already validated the first system,
421            // but we still need to validate the second system once the first one runs.
422            self.b.validate_param_unsafe(world)?;
423            self.b.run_unsafe(value, world)
424        }
425    }
426
427    #[cfg(feature = "hotpatching")]
428    #[inline]
429    fn refresh_hotpatch(&mut self) {
430        self.a.refresh_hotpatch();
431        self.b.refresh_hotpatch();
432    }
433
434    fn apply_deferred(&mut self, world: &mut World) {
435        self.a.apply_deferred(world);
436        self.b.apply_deferred(world);
437    }
438
439    fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
440        self.a.queue_deferred(world.reborrow());
441        self.b.queue_deferred(world);
442    }
443
444    unsafe fn validate_param_unsafe(
445        &mut self,
446        world: UnsafeWorldCell,
447    ) -> Result<(), SystemParamValidationError> {
448        // We only validate parameters for the first system,
449        // since it may make changes to the world that affect
450        // whether the second system has valid parameters.
451        // The second system will be validated in `Self::run_unsafe`.
452        // SAFETY: Delegate to the `System` implementation for `a`.
453        unsafe { self.a.validate_param_unsafe(world) }
454    }
455
456    fn initialize(&mut self, world: &mut World) -> FilteredAccessSet {
457        let mut a_access = self.a.initialize(world);
458        let b_access = self.b.initialize(world);
459        a_access.extend(b_access);
460        a_access
461    }
462
463    fn check_change_tick(&mut self, check: CheckChangeTicks) {
464        self.a.check_change_tick(check);
465        self.b.check_change_tick(check);
466    }
467
468    fn default_system_sets(&self) -> Vec<InternedSystemSet> {
469        let mut default_sets = self.a.default_system_sets();
470        default_sets.append(&mut self.b.default_system_sets());
471        default_sets
472    }
473
474    fn get_last_run(&self) -> Tick {
475        self.a.get_last_run()
476    }
477
478    fn set_last_run(&mut self, last_run: Tick) {
479        self.a.set_last_run(last_run);
480        self.b.set_last_run(last_run);
481    }
482}
483
484/// SAFETY: Both systems are read-only, so any system created by piping them will only read from the world.
485unsafe impl<A, B> ReadOnlySystem for PipeSystem<A, B>
486where
487    A: ReadOnlySystem,
488    B: ReadOnlySystem,
489    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
490{
491}
492
493#[cfg(test)]
494mod tests {
495    use crate::error::DefaultErrorHandler;
496    use crate::prelude::*;
497    use bevy_utils::prelude::DebugName;
498
499    use crate::{
500        schedule::OrMarker,
501        system::{assert_system_does_not_conflict, CombinatorSystem},
502    };
503
504    #[test]
505    fn combinator_with_error_handler_access() {
506        fn my_system(_: ResMut<DefaultErrorHandler>) {}
507        fn a() -> bool {
508            true
509        }
510        fn b(_: ResMut<DefaultErrorHandler>) -> bool {
511            true
512        }
513        fn asdf(_: In<bool>) {}
514
515        let mut world = World::new();
516        world.insert_resource(DefaultErrorHandler::default());
517
518        let system = CombinatorSystem::<OrMarker, _, _>::new(
519            IntoSystem::into_system(a),
520            IntoSystem::into_system(b),
521            DebugName::borrowed("a OR b"),
522        );
523
524        // `system` should not conflict with itself by mutably accessing the error handler resource.
525        assert_system_does_not_conflict(system.clone());
526
527        let mut schedule = Schedule::default();
528        schedule.add_systems((my_system, system.pipe(asdf)));
529        schedule.initialize(&mut world).unwrap();
530
531        // `my_system` should conflict with the combinator system because the combinator reads the error handler resource.
532        assert!(!schedule.graph().conflicting_systems().is_empty());
533
534        schedule.run(&mut world);
535    }
536
537    #[test]
538    fn exclusive_system_piping_is_possible() {
539        fn my_exclusive_system(_world: &mut World) -> u32 {
540            1
541        }
542
543        fn out_pipe(input: In<u32>) {
544            assert!(input.0 == 1);
545        }
546
547        let mut world = World::new();
548
549        let mut schedule = Schedule::default();
550        schedule.add_systems(my_exclusive_system.pipe(out_pipe));
551
552        schedule.run(&mut world);
553    }
554}