bevy_tasks/
task.rs

1use alloc::fmt;
2use core::{
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use crate::cfg;
9
10/// Wraps `async_executor::Task`, a spawned future.
11///
12/// Tasks are also futures themselves and yield the output of the spawned future.
13///
14/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
15/// more gracefully and wait until it stops running, use the [`Task::cancel()`] method.
16///
17/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
18#[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."]
19pub struct Task<T>(
20    cfg::web! {
21        if {
22            async_channel::Receiver<Result<T, Panic>>
23        } else {
24            async_task::Task<T>
25        }
26    },
27);
28
29// Custom constructors for web and non-web platforms
30cfg::web! {
31    if {
32        impl<T: 'static> Task<T> {
33            /// Creates a new task by passing the given future to the web
34            /// runtime as a promise.
35            pub(crate) fn wrap_future(future: impl Future<Output = T> + 'static) -> Self {
36                use bevy_platform::exports::wasm_bindgen_futures::spawn_local;
37                let (sender, receiver) = async_channel::bounded(1);
38                spawn_local(async move {
39                    // Catch any panics that occur when polling the future so they can
40                    // be propagated back to the task handle.
41                    let value = CatchUnwind(AssertUnwindSafe(future)).await;
42                    let _ = sender.send(value).await;
43                });
44                Self(receiver)
45            }
46        }
47    } else {
48        impl<T> Task<T> {
49            /// Creates a new task from a given `async_executor::Task`
50            pub(crate) fn new(task: async_task::Task<T>) -> Self {
51                Self(task)
52            }
53        }
54    }
55}
56
57impl<T> Task<T> {
58    /// Detaches the task to let it keep running in the background.
59    ///
60    /// # Platform-Specific Behavior
61    ///
62    /// When building for the web, this method has no effect.
63    pub fn detach(self) {
64        cfg::web! {
65            if {
66                // Tasks are already treated as detached on the web.
67            } else {
68                self.0.detach();
69            }
70        }
71    }
72
73    /// Cancels the task and waits for it to stop running.
74    ///
75    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
76    /// it didn't complete.
77    ///
78    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
79    /// canceling because it also waits for the task to stop running.
80    ///
81    /// # Platform-Specific Behavior
82    ///
83    /// Canceling tasks is unsupported on the web, and this is the same as awaiting the task.
84    pub async fn cancel(self) -> Option<T> {
85        cfg::web! {
86            if {
87                // Await the task and handle any panics.
88                match self.0.recv().await {
89                    Ok(Ok(value)) => Some(value),
90                    Err(_) => None,
91                    Ok(Err(panic)) => {
92                        // drop this to prevent the panic payload from resuming the panic on drop.
93                        // this also leaks the box but I'm not sure how to avoid that
94                        core::mem::forget(panic);
95                        None
96                    }
97                }
98            } else {
99                // Wait for the task to become canceled
100                self.0.cancel().await
101            }
102        }
103    }
104
105    /// Returns `true` if the current task is finished.
106    ///
107    /// Unlike poll, it doesn't resolve the final value, it just checks if the task has finished.
108    /// Note that in a multithreaded environment, this task can be finished immediately after calling this function.
109    pub fn is_finished(&self) -> bool {
110        cfg::web! {
111            if {
112                // We treat the task as unfinished until the result is sent over the channel.
113                !self.0.is_empty()
114            } else {
115                // Defer to the `async_task` implementation.
116                self.0.is_finished()
117            }
118        }
119    }
120}
121
122impl<T> Future for Task<T> {
123    type Output = T;
124
125    cfg::web! {
126        if {
127            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128                // `recv()` returns a future, so we just poll that and hand the result.
129                let recv = core::pin::pin!(self.0.recv());
130                match recv.poll(cx) {
131                    Poll::Ready(Ok(Ok(value))) => Poll::Ready(value),
132                    // NOTE: Propagating the panic here sorta has parity with the async_executor behavior.
133                    // For those tasks, polling them after a panic returns a `None` which gets `unwrap`ed, so
134                    // using `resume_unwind` here is essentially keeping the same behavior while adding more information.
135                    Poll::Ready(Ok(Err(_panic))) => crate::cfg::switch! {{
136                        crate::cfg::std => {
137                            std::panic::resume_unwind(_panic)
138                        }
139                        _ => {
140                            unreachable!("catching a panic is only possible with std")
141                        }
142                    }},
143                    Poll::Ready(Err(_)) => panic!("Polled a task after it finished running"),
144                    Poll::Pending => Poll::Pending,
145                }
146            }
147        } else {
148            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149                // `async_task` has `Task` implement `Future`, so we just poll it.
150                Pin::new(&mut self.0).poll(cx)
151            }
152        }
153    }
154}
155
156// All variants of Task<T> are expected to implement Unpin
157impl<T> Unpin for Task<T> {}
158
159// Derive doesn't work for macro types, so we have to implement this manually.
160impl<T> fmt::Debug for Task<T> {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        self.0.fmt(f)
163    }
164}
165
166// Utilities for catching unwinds on the web.
167cfg::web! {
168    use alloc::boxed::Box;
169    use core::{
170        panic::{AssertUnwindSafe, UnwindSafe},
171        any::Any,
172    };
173
174    type Panic = Box<dyn Any + Send + 'static>;
175
176    #[pin_project::pin_project]
177    struct CatchUnwind<F: UnwindSafe>(#[pin] F);
178
179    impl<F: Future + UnwindSafe> Future for CatchUnwind<F> {
180        type Output = Result<F::Output, Panic>;
181        fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
182            let f = AssertUnwindSafe(|| self.project().0.poll(cx));
183
184            let result = cfg::std! {
185                if {
186                    std::panic::catch_unwind(f)?
187                } else {
188                    f()
189                }
190            };
191
192            result.map(Ok)
193        }
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use crate::Task;
200
201    #[test]
202    fn task_is_sync() {
203        fn is_sync<T: Sync>() {}
204        is_sync::<Task<()>>();
205    }
206
207    #[test]
208    fn task_is_send() {
209        fn is_send<T: Send>() {}
210        is_send::<Task<()>>();
211    }
212}