bevy_ecs/schedule/
auto_insert_apply_deferred.rs

1use alloc::{boxed::Box, collections::BTreeSet, vec::Vec};
2
3use bevy_platform::collections::HashMap;
4
5use crate::system::IntoSystem;
6use crate::world::World;
7
8use super::{
9    is_apply_deferred, ApplyDeferred, DiGraph, Direction, NodeId, ReportCycles, ScheduleBuildError,
10    ScheduleBuildPass, ScheduleGraph, SystemNode,
11};
12
13/// A [`ScheduleBuildPass`] that inserts [`ApplyDeferred`] systems into the schedule graph
14/// when there are [`Deferred`](crate::prelude::Deferred)
15/// in one system and there are ordering dependencies on that system. [`Commands`](crate::system::Commands) is one
16/// such deferred buffer.
17///
18/// This pass is typically automatically added to the schedule. You can disable this by setting
19/// [`ScheduleBuildSettings::auto_insert_apply_deferred`](crate::schedule::ScheduleBuildSettings::auto_insert_apply_deferred)
20/// to `false`. You may want to disable this if you only want to sync deferred params at the end of the schedule,
21/// or want to manually insert all your sync points.
22#[derive(Debug, Default)]
23pub struct AutoInsertApplyDeferredPass {
24    /// Dependency edges that will **not** automatically insert an instance of `ApplyDeferred` on the edge.
25    no_sync_edges: BTreeSet<(NodeId, NodeId)>,
26    auto_sync_node_ids: HashMap<u32, NodeId>,
27}
28
29/// If added to a dependency edge, the edge will not be considered for auto sync point insertions.
30pub struct IgnoreDeferred;
31
32impl AutoInsertApplyDeferredPass {
33    /// Returns the `NodeId` of the cached auto sync point. Will create
34    /// a new one if needed.
35    fn get_sync_point(&mut self, graph: &mut ScheduleGraph, distance: u32) -> NodeId {
36        self.auto_sync_node_ids
37            .get(&distance)
38            .copied()
39            .or_else(|| {
40                let node_id = self.add_auto_sync(graph);
41                self.auto_sync_node_ids.insert(distance, node_id);
42                Some(node_id)
43            })
44            .unwrap()
45    }
46    /// add an [`ApplyDeferred`] system with no config
47    fn add_auto_sync(&mut self, graph: &mut ScheduleGraph) -> NodeId {
48        let id = NodeId::System(graph.systems.len());
49
50        graph
51            .systems
52            .push(SystemNode::new(Box::new(IntoSystem::into_system(
53                ApplyDeferred,
54            ))));
55        graph.system_conditions.push(Vec::new());
56
57        // ignore ambiguities with auto sync points
58        // They aren't under user control, so no one should know or care.
59        graph.ambiguous_with_all.insert(id);
60
61        id
62    }
63}
64
65impl ScheduleBuildPass for AutoInsertApplyDeferredPass {
66    type EdgeOptions = IgnoreDeferred;
67
68    fn add_dependency(&mut self, from: NodeId, to: NodeId, options: Option<&Self::EdgeOptions>) {
69        if options.is_some() {
70            self.no_sync_edges.insert((from, to));
71        }
72    }
73
74    fn build(
75        &mut self,
76        _world: &mut World,
77        graph: &mut ScheduleGraph,
78        dependency_flattened: &mut DiGraph,
79    ) -> Result<(), ScheduleBuildError> {
80        let mut sync_point_graph = dependency_flattened.clone();
81        let topo = graph.topsort_graph(dependency_flattened, ReportCycles::Dependency)?;
82
83        fn set_has_conditions(graph: &ScheduleGraph, node: NodeId) -> bool {
84            !graph.set_conditions_at(node).is_empty()
85                || graph
86                    .hierarchy()
87                    .graph()
88                    .edges_directed(node, Direction::Incoming)
89                    .any(|(parent, _)| set_has_conditions(graph, parent))
90        }
91
92        fn system_has_conditions(graph: &ScheduleGraph, node: NodeId) -> bool {
93            assert!(node.is_system());
94            !graph.system_conditions[node.index()].is_empty()
95                || graph
96                    .hierarchy()
97                    .graph()
98                    .edges_directed(node, Direction::Incoming)
99                    .any(|(parent, _)| set_has_conditions(graph, parent))
100        }
101
102        let mut system_has_conditions_cache = HashMap::<usize, bool>::default();
103        let mut is_valid_explicit_sync_point = |system: NodeId| {
104            let index = system.index();
105            is_apply_deferred(graph.systems[index].get().unwrap())
106                && !*system_has_conditions_cache
107                    .entry(index)
108                    .or_insert_with(|| system_has_conditions(graph, system))
109        };
110
111        // Calculate the distance for each node.
112        // The "distance" is the number of sync points between a node and the beginning of the graph.
113        // Also store if a preceding edge would have added a sync point but was ignored to add it at
114        // a later edge that is not ignored.
115        let mut distances_and_pending_sync: HashMap<usize, (u32, bool)> =
116            HashMap::with_capacity_and_hasher(topo.len(), Default::default());
117
118        // Keep track of any explicit sync nodes for a specific distance.
119        let mut distance_to_explicit_sync_node: HashMap<u32, NodeId> = HashMap::default();
120
121        // Determine the distance for every node and collect the explicit sync points.
122        for node in &topo {
123            let (node_distance, mut node_needs_sync) = distances_and_pending_sync
124                .get(&node.index())
125                .copied()
126                .unwrap_or_default();
127
128            if is_valid_explicit_sync_point(*node) {
129                // The distance of this sync point does not change anymore as the iteration order
130                // makes sure that this node is no unvisited target of another node.
131                // Because of this, the sync point can be stored for this distance to be reused as
132                // automatically added sync points later.
133                distance_to_explicit_sync_node.insert(node_distance, *node);
134
135                // This node just did a sync, so the only reason to do another sync is if one was
136                // explicitly scheduled afterwards.
137                node_needs_sync = false;
138            } else if !node_needs_sync {
139                // No previous node has postponed sync points to add so check if the system itself
140                // has deferred params that require a sync point to apply them.
141                node_needs_sync = graph.systems[node.index()].get().unwrap().has_deferred();
142            }
143
144            for target in dependency_flattened.neighbors_directed(*node, Direction::Outgoing) {
145                let (target_distance, target_pending_sync) = distances_and_pending_sync
146                    .entry(target.index())
147                    .or_default();
148
149                let mut edge_needs_sync = node_needs_sync;
150                if node_needs_sync
151                    && !graph.systems[target.index()].get().unwrap().is_exclusive()
152                    && self.no_sync_edges.contains(&(*node, target))
153                {
154                    // The node has deferred params to apply, but this edge is ignoring sync points.
155                    // Mark the target as 'delaying' those commands to a future edge and the current
156                    // edge as not needing a sync point.
157                    *target_pending_sync = true;
158                    edge_needs_sync = false;
159                }
160
161                let mut weight = 0;
162                if edge_needs_sync || is_valid_explicit_sync_point(target) {
163                    // The target distance grows if a sync point is added between it and the node.
164                    // Also raise the distance if the target is a sync point itself so it then again
165                    // raises the distance of following nodes as that is what the distance is about.
166                    weight = 1;
167                }
168
169                // The target cannot have fewer sync points in front of it than the preceding node.
170                *target_distance = (node_distance + weight).max(*target_distance);
171            }
172        }
173
174        // Find any edges which have a different number of sync points between them and make sure
175        // there is a sync point between them.
176        for node in &topo {
177            let (node_distance, _) = distances_and_pending_sync
178                .get(&node.index())
179                .copied()
180                .unwrap_or_default();
181
182            for target in dependency_flattened.neighbors_directed(*node, Direction::Outgoing) {
183                let (target_distance, _) = distances_and_pending_sync
184                    .get(&target.index())
185                    .copied()
186                    .unwrap_or_default();
187
188                if node_distance == target_distance {
189                    // These nodes are the same distance, so they don't need an edge between them.
190                    continue;
191                }
192
193                if is_apply_deferred(graph.systems[target.index()].get().unwrap()) {
194                    // We don't need to insert a sync point since ApplyDeferred is a sync point
195                    // already!
196                    continue;
197                }
198
199                let sync_point = distance_to_explicit_sync_node
200                    .get(&target_distance)
201                    .copied()
202                    .unwrap_or_else(|| self.get_sync_point(graph, target_distance));
203
204                sync_point_graph.add_edge(*node, sync_point);
205                sync_point_graph.add_edge(sync_point, target);
206
207                // The edge without the sync point is now redundant.
208                sync_point_graph.remove_edge(*node, target);
209            }
210        }
211
212        *dependency_flattened = sync_point_graph;
213        Ok(())
214    }
215
216    fn collapse_set(
217        &mut self,
218        set: NodeId,
219        systems: &[NodeId],
220        dependency_flattened: &DiGraph,
221    ) -> impl Iterator<Item = (NodeId, NodeId)> {
222        if systems.is_empty() {
223            // collapse dependencies for empty sets
224            for a in dependency_flattened.neighbors_directed(set, Direction::Incoming) {
225                for b in dependency_flattened.neighbors_directed(set, Direction::Outgoing) {
226                    if self.no_sync_edges.contains(&(a, set))
227                        && self.no_sync_edges.contains(&(set, b))
228                    {
229                        self.no_sync_edges.insert((a, b));
230                    }
231                }
232            }
233        } else {
234            for a in dependency_flattened.neighbors_directed(set, Direction::Incoming) {
235                for &sys in systems {
236                    if self.no_sync_edges.contains(&(a, set)) {
237                        self.no_sync_edges.insert((a, sys));
238                    }
239                }
240            }
241
242            for b in dependency_flattened.neighbors_directed(set, Direction::Outgoing) {
243                for &sys in systems {
244                    if self.no_sync_edges.contains(&(set, b)) {
245                        self.no_sync_edges.insert((sys, b));
246                    }
247                }
248            }
249        }
250        core::iter::empty()
251    }
252}