bevy_ecs/system/
combinator.rs

1use alloc::{borrow::Cow, format, vec::Vec};
2use core::marker::PhantomData;
3
4use crate::{
5    archetype::ArchetypeComponentId,
6    component::{ComponentId, Tick},
7    prelude::World,
8    query::Access,
9    schedule::InternedSystemSet,
10    system::{input::SystemInput, SystemIn, SystemParamValidationError},
11    world::unsafe_world_cell::UnsafeWorldCell,
12};
13
14use super::{IntoSystem, ReadOnlySystem, System};
15
16/// Customizes the behavior of a [`CombinatorSystem`].
17///
18/// # Examples
19///
20/// ```
21/// use bevy_ecs::prelude::*;
22/// use bevy_ecs::system::{CombinatorSystem, Combine};
23///
24/// // A system combinator that performs an exclusive-or (XOR)
25/// // operation on the output of two systems.
26/// pub type Xor<A, B> = CombinatorSystem<XorMarker, A, B>;
27///
28/// // This struct is used to customize the behavior of our combinator.
29/// pub struct XorMarker;
30///
31/// impl<A, B> Combine<A, B> for XorMarker
32/// where
33///     A: System<In = (), Out = bool>,
34///     B: System<In = (), Out = bool>,
35/// {
36///     type In = ();
37///     type Out = bool;
38///
39///     fn combine(
40///         _input: Self::In,
41///         a: impl FnOnce(A::In) -> A::Out,
42///         b: impl FnOnce(B::In) -> B::Out,
43///     ) -> Self::Out {
44///         a(()) ^ b(())
45///     }
46/// }
47///
48/// # #[derive(Resource, PartialEq, Eq)] struct A(u32);
49/// # #[derive(Resource, PartialEq, Eq)] struct B(u32);
50/// # #[derive(Resource, Default)] struct RanFlag(bool);
51/// # let mut world = World::new();
52/// # world.init_resource::<RanFlag>();
53/// #
54/// # let mut app = Schedule::default();
55/// app.add_systems(my_system.run_if(Xor::new(
56///     IntoSystem::into_system(resource_equals(A(1))),
57///     IntoSystem::into_system(resource_equals(B(1))),
58///     // The name of the combined system.
59///     std::borrow::Cow::Borrowed("a ^ b"),
60/// )));
61/// # fn my_system(mut flag: ResMut<RanFlag>) { flag.0 = true; }
62/// #
63/// # world.insert_resource(A(0));
64/// # world.insert_resource(B(0));
65/// # app.run(&mut world);
66/// # // Neither condition passes, so the system does not run.
67/// # assert!(!world.resource::<RanFlag>().0);
68/// #
69/// # world.insert_resource(A(1));
70/// # app.run(&mut world);
71/// # // Only the first condition passes, so the system runs.
72/// # assert!(world.resource::<RanFlag>().0);
73/// # world.resource_mut::<RanFlag>().0 = false;
74/// #
75/// # world.insert_resource(B(1));
76/// # app.run(&mut world);
77/// # // Both conditions pass, so the system does not run.
78/// # assert!(!world.resource::<RanFlag>().0);
79/// #
80/// # world.insert_resource(A(0));
81/// # app.run(&mut world);
82/// # // Only the second condition passes, so the system runs.
83/// # assert!(world.resource::<RanFlag>().0);
84/// # world.resource_mut::<RanFlag>().0 = false;
85/// ```
86#[diagnostic::on_unimplemented(
87    message = "`{Self}` can not combine systems `{A}` and `{B}`",
88    label = "invalid system combination",
89    note = "the inputs and outputs of `{A}` and `{B}` are not compatible with this combiner"
90)]
91pub trait Combine<A: System, B: System> {
92    /// The [input](System::In) type for a [`CombinatorSystem`].
93    type In: SystemInput;
94
95    /// The [output](System::Out) type for a [`CombinatorSystem`].
96    type Out;
97
98    /// When used in a [`CombinatorSystem`], this function customizes how
99    /// the two composite systems are invoked and their outputs are combined.
100    ///
101    /// See the trait-level docs for [`Combine`] for an example implementation.
102    fn combine(
103        input: <Self::In as SystemInput>::Inner<'_>,
104        a: impl FnOnce(SystemIn<'_, A>) -> A::Out,
105        b: impl FnOnce(SystemIn<'_, B>) -> B::Out,
106    ) -> Self::Out;
107}
108
109/// A [`System`] defined by combining two other systems.
110/// The behavior of this combinator is specified by implementing the [`Combine`] trait.
111/// For a full usage example, see the docs for [`Combine`].
112pub struct CombinatorSystem<Func, A, B> {
113    _marker: PhantomData<fn() -> Func>,
114    a: A,
115    b: B,
116    name: Cow<'static, str>,
117    component_access: Access<ComponentId>,
118    archetype_component_access: Access<ArchetypeComponentId>,
119}
120
121impl<Func, A, B> CombinatorSystem<Func, A, B> {
122    /// Creates a new system that combines two inner systems.
123    ///
124    /// The returned system will only be usable if `Func` implements [`Combine<A, B>`].
125    pub const fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
126        Self {
127            _marker: PhantomData,
128            a,
129            b,
130            name,
131            component_access: Access::new(),
132            archetype_component_access: Access::new(),
133        }
134    }
135}
136
137impl<A, B, Func> System for CombinatorSystem<Func, A, B>
138where
139    Func: Combine<A, B> + 'static,
140    A: System,
141    B: System,
142{
143    type In = Func::In;
144    type Out = Func::Out;
145
146    fn name(&self) -> Cow<'static, str> {
147        self.name.clone()
148    }
149
150    fn component_access(&self) -> &Access<ComponentId> {
151        &self.component_access
152    }
153
154    fn archetype_component_access(&self) -> &Access<ArchetypeComponentId> {
155        &self.archetype_component_access
156    }
157
158    fn is_send(&self) -> bool {
159        self.a.is_send() && self.b.is_send()
160    }
161
162    fn is_exclusive(&self) -> bool {
163        self.a.is_exclusive() || self.b.is_exclusive()
164    }
165
166    fn has_deferred(&self) -> bool {
167        self.a.has_deferred() || self.b.has_deferred()
168    }
169
170    unsafe fn run_unsafe(
171        &mut self,
172        input: SystemIn<'_, Self>,
173        world: UnsafeWorldCell,
174    ) -> Self::Out {
175        Func::combine(
176            input,
177            // SAFETY: The world accesses for both underlying systems have been registered,
178            // so the caller will guarantee that no other systems will conflict with `a` or `b`.
179            // If either system has `is_exclusive()`, then the combined system also has `is_exclusive`.
180            // Since these closures are `!Send + !Sync + !'static`, they can never be called
181            // in parallel, so their world accesses will not conflict with each other.
182            // Additionally, `update_archetype_component_access` has been called,
183            // which forwards to the implementations for `self.a` and `self.b`.
184            |input| unsafe { self.a.run_unsafe(input, world) },
185            // SAFETY: See the comment above.
186            |input| unsafe { self.b.run_unsafe(input, world) },
187        )
188    }
189
190    #[inline]
191    fn apply_deferred(&mut self, world: &mut World) {
192        self.a.apply_deferred(world);
193        self.b.apply_deferred(world);
194    }
195
196    #[inline]
197    fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
198        self.a.queue_deferred(world.reborrow());
199        self.b.queue_deferred(world);
200    }
201
202    #[inline]
203    unsafe fn validate_param_unsafe(
204        &mut self,
205        world: UnsafeWorldCell,
206    ) -> Result<(), SystemParamValidationError> {
207        // SAFETY: Delegate to other `System` implementations.
208        unsafe { self.a.validate_param_unsafe(world) }
209    }
210
211    fn initialize(&mut self, world: &mut World) {
212        self.a.initialize(world);
213        self.b.initialize(world);
214        self.component_access.extend(self.a.component_access());
215        self.component_access.extend(self.b.component_access());
216    }
217
218    fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
219        self.a.update_archetype_component_access(world);
220        self.b.update_archetype_component_access(world);
221
222        self.archetype_component_access
223            .extend(self.a.archetype_component_access());
224        self.archetype_component_access
225            .extend(self.b.archetype_component_access());
226    }
227
228    fn check_change_tick(&mut self, change_tick: Tick) {
229        self.a.check_change_tick(change_tick);
230        self.b.check_change_tick(change_tick);
231    }
232
233    fn default_system_sets(&self) -> Vec<InternedSystemSet> {
234        let mut default_sets = self.a.default_system_sets();
235        default_sets.append(&mut self.b.default_system_sets());
236        default_sets
237    }
238
239    fn get_last_run(&self) -> Tick {
240        self.a.get_last_run()
241    }
242
243    fn set_last_run(&mut self, last_run: Tick) {
244        self.a.set_last_run(last_run);
245        self.b.set_last_run(last_run);
246    }
247}
248
249/// SAFETY: Both systems are read-only, so any system created by combining them will only read from the world.
250unsafe impl<Func, A, B> ReadOnlySystem for CombinatorSystem<Func, A, B>
251where
252    Func: Combine<A, B> + 'static,
253    A: ReadOnlySystem,
254    B: ReadOnlySystem,
255{
256}
257
258impl<Func, A, B> Clone for CombinatorSystem<Func, A, B>
259where
260    A: Clone,
261    B: Clone,
262{
263    /// Clone the combined system. The cloned instance must be `.initialize()`d before it can run.
264    fn clone(&self) -> Self {
265        CombinatorSystem::new(self.a.clone(), self.b.clone(), self.name.clone())
266    }
267}
268
269/// An [`IntoSystem`] creating an instance of [`PipeSystem`].
270pub struct IntoPipeSystem<A, B> {
271    a: A,
272    b: B,
273}
274
275impl<A, B> IntoPipeSystem<A, B> {
276    /// Creates a new [`IntoSystem`] that pipes two inner systems.
277    pub const fn new(a: A, b: B) -> Self {
278        Self { a, b }
279    }
280}
281
282#[doc(hidden)]
283pub struct IsPipeSystemMarker;
284
285impl<A, B, IA, OA, IB, OB, MA, MB> IntoSystem<IA, OB, (IsPipeSystemMarker, OA, IB, MA, MB)>
286    for IntoPipeSystem<A, B>
287where
288    IA: SystemInput,
289    A: IntoSystem<IA, OA, MA>,
290    B: IntoSystem<IB, OB, MB>,
291    for<'a> IB: SystemInput<Inner<'a> = OA>,
292{
293    type System = PipeSystem<A::System, B::System>;
294
295    fn into_system(this: Self) -> Self::System {
296        let system_a = IntoSystem::into_system(this.a);
297        let system_b = IntoSystem::into_system(this.b);
298        let name = format!("Pipe({}, {})", system_a.name(), system_b.name());
299        PipeSystem::new(system_a, system_b, Cow::Owned(name))
300    }
301}
302
303/// A [`System`] created by piping the output of the first system into the input of the second.
304///
305/// This can be repeated indefinitely, but system pipes cannot branch: the output is consumed by the receiving system.
306///
307/// Given two systems `A` and `B`, A may be piped into `B` as `A.pipe(B)` if the output type of `A` is
308/// equal to the input type of `B`.
309///
310/// Note that for [`FunctionSystem`](crate::system::FunctionSystem)s the output is the return value
311/// of the function and the input is the first [`SystemParam`](crate::system::SystemParam) if it is
312/// tagged with [`In`](crate::system::In) or `()` if the function has no designated input parameter.
313///
314/// # Examples
315///
316/// ```
317/// use std::num::ParseIntError;
318///
319/// use bevy_ecs::prelude::*;
320///
321/// fn main() {
322///     let mut world = World::default();
323///     world.insert_resource(Message("42".to_string()));
324///
325///     // pipe the `parse_message_system`'s output into the `filter_system`s input
326///     let mut piped_system = IntoSystem::into_system(parse_message_system.pipe(filter_system));
327///     piped_system.initialize(&mut world);
328///     assert_eq!(piped_system.run((), &mut world), Some(42));
329/// }
330///
331/// #[derive(Resource)]
332/// struct Message(String);
333///
334/// fn parse_message_system(message: Res<Message>) -> Result<usize, ParseIntError> {
335///     message.0.parse::<usize>()
336/// }
337///
338/// fn filter_system(In(result): In<Result<usize, ParseIntError>>) -> Option<usize> {
339///     result.ok().filter(|&n| n < 100)
340/// }
341/// ```
342pub struct PipeSystem<A, B> {
343    a: A,
344    b: B,
345    name: Cow<'static, str>,
346    component_access: Access<ComponentId>,
347    archetype_component_access: Access<ArchetypeComponentId>,
348}
349
350impl<A, B> PipeSystem<A, B>
351where
352    A: System,
353    B: System,
354    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
355{
356    /// Creates a new system that pipes two inner systems.
357    pub const fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
358        Self {
359            a,
360            b,
361            name,
362            component_access: Access::new(),
363            archetype_component_access: Access::new(),
364        }
365    }
366}
367
368impl<A, B> System for PipeSystem<A, B>
369where
370    A: System,
371    B: System,
372    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
373{
374    type In = A::In;
375    type Out = B::Out;
376
377    fn name(&self) -> Cow<'static, str> {
378        self.name.clone()
379    }
380
381    fn component_access(&self) -> &Access<ComponentId> {
382        &self.component_access
383    }
384
385    fn archetype_component_access(&self) -> &Access<ArchetypeComponentId> {
386        &self.archetype_component_access
387    }
388
389    fn is_send(&self) -> bool {
390        self.a.is_send() && self.b.is_send()
391    }
392
393    fn is_exclusive(&self) -> bool {
394        self.a.is_exclusive() || self.b.is_exclusive()
395    }
396
397    fn has_deferred(&self) -> bool {
398        self.a.has_deferred() || self.b.has_deferred()
399    }
400
401    unsafe fn run_unsafe(
402        &mut self,
403        input: SystemIn<'_, Self>,
404        world: UnsafeWorldCell,
405    ) -> Self::Out {
406        let value = self.a.run_unsafe(input, world);
407        self.b.run_unsafe(value, world)
408    }
409
410    fn apply_deferred(&mut self, world: &mut World) {
411        self.a.apply_deferred(world);
412        self.b.apply_deferred(world);
413    }
414
415    fn queue_deferred(&mut self, mut world: crate::world::DeferredWorld) {
416        self.a.queue_deferred(world.reborrow());
417        self.b.queue_deferred(world);
418    }
419
420    /// This method uses "early out" logic: if the first system fails validation,
421    /// the second system is not validated.
422    ///
423    /// Because the system validation is performed upfront, this can lead to situations
424    /// where later systems pass validation, but fail at runtime due to changes made earlier
425    /// in the piped systems.
426    // TODO: ensure that systems are only validated just before they are run.
427    // Fixing this will require fundamentally rethinking how piped systems work:
428    // they're currently treated as a single system from the perspective of the scheduler.
429    // See https://github.com/bevyengine/bevy/issues/18796
430    unsafe fn validate_param_unsafe(
431        &mut self,
432        world: UnsafeWorldCell,
433    ) -> Result<(), SystemParamValidationError> {
434        // SAFETY: Delegate to the `System` implementation for `a`.
435        unsafe { self.a.validate_param_unsafe(world) }?;
436
437        // SAFETY: Delegate to the `System` implementation for `b`.
438        unsafe { self.b.validate_param_unsafe(world) }?;
439
440        Ok(())
441    }
442
443    fn initialize(&mut self, world: &mut World) {
444        self.a.initialize(world);
445        self.b.initialize(world);
446        self.component_access.extend(self.a.component_access());
447        self.component_access.extend(self.b.component_access());
448    }
449
450    fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
451        self.a.update_archetype_component_access(world);
452        self.b.update_archetype_component_access(world);
453
454        self.archetype_component_access
455            .extend(self.a.archetype_component_access());
456        self.archetype_component_access
457            .extend(self.b.archetype_component_access());
458    }
459
460    fn check_change_tick(&mut self, change_tick: Tick) {
461        self.a.check_change_tick(change_tick);
462        self.b.check_change_tick(change_tick);
463    }
464
465    fn default_system_sets(&self) -> Vec<InternedSystemSet> {
466        let mut default_sets = self.a.default_system_sets();
467        default_sets.append(&mut self.b.default_system_sets());
468        default_sets
469    }
470
471    fn get_last_run(&self) -> Tick {
472        self.a.get_last_run()
473    }
474
475    fn set_last_run(&mut self, last_run: Tick) {
476        self.a.set_last_run(last_run);
477        self.b.set_last_run(last_run);
478    }
479}
480
481/// SAFETY: Both systems are read-only, so any system created by piping them will only read from the world.
482unsafe impl<A, B> ReadOnlySystem for PipeSystem<A, B>
483where
484    A: ReadOnlySystem,
485    B: ReadOnlySystem,
486    for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
487{
488}
489
490#[cfg(test)]
491mod tests {
492
493    #[test]
494    fn exclusive_system_piping_is_possible() {
495        use crate::prelude::*;
496
497        fn my_exclusive_system(_world: &mut World) -> u32 {
498            1
499        }
500
501        fn out_pipe(input: In<u32>) {
502            assert!(input.0 == 1);
503        }
504
505        let mut world = World::new();
506
507        let mut schedule = Schedule::default();
508        schedule.add_systems(my_exclusive_system.pipe(out_pipe));
509
510        schedule.run(&mut world);
511    }
512}