async_task/
task.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::raw::Panic;
12use crate::runnable::ScheduleInfo;
13use crate::state::*;
14
15/// A spawned task.
16///
17/// A [`Task`] can be awaited to retrieve the output of its future.
18///
19/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
20/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
21/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
22/// method.
23///
24/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
25/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
26/// [`run()`][`super::Runnable::run()`].
27///
28/// # Examples
29///
30/// ```
31/// use smol::{future, Executor};
32/// use std::thread;
33///
34/// let ex = Executor::new();
35///
36/// // Spawn a future onto the executor.
37/// let task = ex.spawn(async {
38///     println!("Hello from a task!");
39///     1 + 2
40/// });
41///
42/// // Run an executor thread.
43/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
44///
45/// // Wait for the task's output.
46/// assert_eq!(future::block_on(task), 3);
47/// ```
48#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
49pub struct Task<T, M = ()> {
50    /// A raw task pointer.
51    pub(crate) ptr: NonNull<()>,
52
53    /// A marker capturing generic types `T` and `M`.
54    pub(crate) _marker: PhantomData<(T, M)>,
55}
56
57unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {}
58unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {}
59
60impl<T, M> Unpin for Task<T, M> {}
61
62#[cfg(feature = "std")]
63impl<T, M> std::panic::UnwindSafe for Task<T, M> {}
64#[cfg(feature = "std")]
65impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {}
66
67impl<T, M> Task<T, M> {
68    /// Detaches the task to let it keep running in the background.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use smol::{Executor, Timer};
74    /// use std::time::Duration;
75    ///
76    /// let ex = Executor::new();
77    ///
78    /// // Spawn a deamon future.
79    /// ex.spawn(async {
80    ///     loop {
81    ///         println!("I'm a daemon task looping forever.");
82    ///         Timer::after(Duration::from_secs(1)).await;
83    ///     }
84    /// })
85    /// .detach();
86    /// ```
87    pub fn detach(self) {
88        let mut this = self;
89        let _out = this.set_detached();
90        mem::forget(this);
91    }
92
93    /// Cancels the task and waits for it to stop running.
94    ///
95    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
96    /// it didn't complete.
97    ///
98    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
99    /// canceling because it also waits for the task to stop running.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// # if cfg!(miri) { return; } // Miri does not support epoll
105    /// use smol::{future, Executor, Timer};
106    /// use std::thread;
107    /// use std::time::Duration;
108    ///
109    /// let ex = Executor::new();
110    ///
111    /// // Spawn a deamon future.
112    /// let task = ex.spawn(async {
113    ///     loop {
114    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
115    ///         Timer::after(Duration::from_secs(1)).await;
116    ///     }
117    /// });
118    ///
119    /// // Run an executor thread.
120    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
121    ///
122    /// future::block_on(async {
123    ///     Timer::after(Duration::from_secs(3)).await;
124    ///     task.cancel().await;
125    /// });
126    /// ```
127    pub async fn cancel(self) -> Option<T> {
128        let mut this = self;
129        this.set_canceled();
130        this.fallible().await
131    }
132
133    /// Converts this task into a [`FallibleTask`].
134    ///
135    /// Like [`Task`], a fallible task will poll the task's output until it is
136    /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
137    /// dropped without being run. Resolves to the task's output when completed,
138    /// or [`None`] if it didn't complete.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use smol::{future, Executor};
144    /// use std::thread;
145    ///
146    /// let ex = Executor::new();
147    ///
148    /// // Spawn a future onto the executor.
149    /// let task = ex.spawn(async {
150    ///     println!("Hello from a task!");
151    ///     1 + 2
152    /// })
153    /// .fallible();
154    ///
155    /// // Run an executor thread.
156    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
157    ///
158    /// // Wait for the task's output.
159    /// assert_eq!(future::block_on(task), Some(3));
160    /// ```
161    ///
162    /// ```
163    /// use smol::future;
164    ///
165    /// // Schedule function which drops the runnable without running it.
166    /// let schedule = move |runnable| drop(runnable);
167    ///
168    /// // Create a task with the future and the schedule function.
169    /// let (runnable, task) = async_task::spawn(async {
170    ///     println!("Hello from a task!");
171    ///     1 + 2
172    /// }, schedule);
173    /// runnable.schedule();
174    ///
175    /// // Wait for the task's output.
176    /// assert_eq!(future::block_on(task.fallible()), None);
177    /// ```
178    pub fn fallible(self) -> FallibleTask<T, M> {
179        FallibleTask { task: self }
180    }
181
182    /// Puts the task in canceled state.
183    fn set_canceled(&mut self) {
184        let ptr = self.ptr.as_ptr();
185        let header = ptr as *const Header<M>;
186
187        unsafe {
188            let mut state = (*header).state.load(Ordering::Acquire);
189
190            loop {
191                // If the task has been completed or closed, it can't be canceled.
192                if state & (COMPLETED | CLOSED) != 0 {
193                    break;
194                }
195
196                // If the task is not scheduled nor running, we'll need to schedule it.
197                let new = if state & (SCHEDULED | RUNNING) == 0 {
198                    (state | SCHEDULED | CLOSED) + REFERENCE
199                } else {
200                    state | CLOSED
201                };
202
203                // Mark the task as closed.
204                match (*header).state.compare_exchange_weak(
205                    state,
206                    new,
207                    Ordering::AcqRel,
208                    Ordering::Acquire,
209                ) {
210                    Ok(_) => {
211                        // If the task is not scheduled nor running, schedule it one more time so
212                        // that its future gets dropped by the executor.
213                        if state & (SCHEDULED | RUNNING) == 0 {
214                            ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
215                        }
216
217                        // Notify the awaiter that the task has been closed.
218                        if state & AWAITER != 0 {
219                            (*header).notify(None);
220                        }
221
222                        break;
223                    }
224                    Err(s) => state = s,
225                }
226            }
227        }
228    }
229
230    /// Puts the task in detached state.
231    fn set_detached(&mut self) -> Option<Result<T, Panic>> {
232        let ptr = self.ptr.as_ptr();
233        let header = ptr as *const Header<M>;
234
235        unsafe {
236            // A place where the output will be stored in case it needs to be dropped.
237            let mut output = None;
238
239            // Optimistically assume the `Task` is being detached just after creating the task.
240            // This is a common case so if the `Task` is datached, the overhead of it is only one
241            // compare-exchange operation.
242            if let Err(mut state) = (*header).state.compare_exchange_weak(
243                SCHEDULED | TASK | REFERENCE,
244                SCHEDULED | REFERENCE,
245                Ordering::AcqRel,
246                Ordering::Acquire,
247            ) {
248                loop {
249                    // If the task has been completed but not yet closed, that means its output
250                    // must be dropped.
251                    if state & COMPLETED != 0 && state & CLOSED == 0 {
252                        // Mark the task as closed in order to grab its output.
253                        match (*header).state.compare_exchange_weak(
254                            state,
255                            state | CLOSED,
256                            Ordering::AcqRel,
257                            Ordering::Acquire,
258                        ) {
259                            Ok(_) => {
260                                // Read the output.
261                                output = Some(
262                                    (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
263                                        .read(),
264                                );
265
266                                // Update the state variable because we're continuing the loop.
267                                state |= CLOSED;
268                            }
269                            Err(s) => state = s,
270                        }
271                    } else {
272                        // If this is the last reference to the task and it's not closed, then
273                        // close it and schedule one more time so that its future gets dropped by
274                        // the executor.
275                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
276                            SCHEDULED | CLOSED | REFERENCE
277                        } else {
278                            state & !TASK
279                        };
280
281                        // Unset the `TASK` flag.
282                        match (*header).state.compare_exchange_weak(
283                            state,
284                            new,
285                            Ordering::AcqRel,
286                            Ordering::Acquire,
287                        ) {
288                            Ok(_) => {
289                                // If this is the last reference to the task, we need to either
290                                // schedule dropping its future or destroy it.
291                                if state & !(REFERENCE - 1) == 0 {
292                                    if state & CLOSED == 0 {
293                                        ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
294                                    } else {
295                                        ((*header).vtable.destroy)(ptr);
296                                    }
297                                }
298
299                                break;
300                            }
301                            Err(s) => state = s,
302                        }
303                    }
304                }
305            }
306
307            output
308        }
309    }
310
311    /// Polls the task to retrieve its output.
312    ///
313    /// Returns `Some` if the task has completed or `None` if it was closed.
314    ///
315    /// A task becomes closed in the following cases:
316    ///
317    /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
318    /// 2. Its output gets awaited by the `Task`.
319    /// 3. It panics while polling the future.
320    /// 4. It is completed and the `Task` gets dropped.
321    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
322        let ptr = self.ptr.as_ptr();
323        let header = ptr as *const Header<M>;
324
325        unsafe {
326            let mut state = (*header).state.load(Ordering::Acquire);
327
328            loop {
329                // If the task has been closed, notify the awaiter and return `None`.
330                if state & CLOSED != 0 {
331                    // If the task is scheduled or running, we need to wait until its future is
332                    // dropped.
333                    if state & (SCHEDULED | RUNNING) != 0 {
334                        // Replace the waker with one associated with the current task.
335                        (*header).register(cx.waker());
336
337                        // Reload the state after registering. It is possible changes occurred just
338                        // before registration so we need to check for that.
339                        state = (*header).state.load(Ordering::Acquire);
340
341                        // If the task is still scheduled or running, we need to wait because its
342                        // future is not dropped yet.
343                        if state & (SCHEDULED | RUNNING) != 0 {
344                            return Poll::Pending;
345                        }
346                    }
347
348                    // Even though the awaiter is most likely the current task, it could also be
349                    // another task.
350                    (*header).notify(Some(cx.waker()));
351                    return Poll::Ready(None);
352                }
353
354                // If the task is not completed, register the current task.
355                if state & COMPLETED == 0 {
356                    // Replace the waker with one associated with the current task.
357                    (*header).register(cx.waker());
358
359                    // Reload the state after registering. It is possible that the task became
360                    // completed or closed just before registration so we need to check for that.
361                    state = (*header).state.load(Ordering::Acquire);
362
363                    // If the task has been closed, restart.
364                    if state & CLOSED != 0 {
365                        continue;
366                    }
367
368                    // If the task is still not completed, we're blocked on it.
369                    if state & COMPLETED == 0 {
370                        return Poll::Pending;
371                    }
372                }
373
374                // Since the task is now completed, mark it as closed in order to grab its output.
375                match (*header).state.compare_exchange(
376                    state,
377                    state | CLOSED,
378                    Ordering::AcqRel,
379                    Ordering::Acquire,
380                ) {
381                    Ok(_) => {
382                        // Notify the awaiter. Even though the awaiter is most likely the current
383                        // task, it could also be another task.
384                        if state & AWAITER != 0 {
385                            (*header).notify(Some(cx.waker()));
386                        }
387
388                        // Take the output from the task.
389                        let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
390                        let output = output.read();
391
392                        // Propagate the panic if the task panicked.
393                        let output = match output {
394                            Ok(output) => output,
395                            Err(panic) => {
396                                #[cfg(feature = "std")]
397                                std::panic::resume_unwind(panic);
398
399                                #[cfg(not(feature = "std"))]
400                                match panic {}
401                            }
402                        };
403
404                        return Poll::Ready(Some(output));
405                    }
406                    Err(s) => state = s,
407                }
408            }
409        }
410    }
411
412    fn header(&self) -> &Header<M> {
413        let ptr = self.ptr.as_ptr();
414        let header = ptr as *const Header<M>;
415        unsafe { &*header }
416    }
417
418    /// Returns `true` if the current task is finished.
419    ///
420    /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
421    pub fn is_finished(&self) -> bool {
422        let ptr = self.ptr.as_ptr();
423        let header = ptr as *const Header<M>;
424
425        unsafe {
426            let state = (*header).state.load(Ordering::Acquire);
427            state & (CLOSED | COMPLETED) != 0
428        }
429    }
430
431    /// Get the metadata associated with this task.
432    ///
433    /// Tasks can be created with a metadata object associated with them; by default, this
434    /// is a `()` value. See the [`Builder::metadata()`] method for more information.
435    pub fn metadata(&self) -> &M {
436        &self.header().metadata
437    }
438}
439
440impl<T, M> Drop for Task<T, M> {
441    fn drop(&mut self) {
442        self.set_canceled();
443        self.set_detached();
444    }
445}
446
447impl<T, M> Future for Task<T, M> {
448    type Output = T;
449
450    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451        match self.poll_task(cx) {
452            Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")),
453            Poll::Pending => Poll::Pending,
454        }
455    }
456}
457
458impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> {
459    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460        f.debug_struct("Task")
461            .field("header", self.header())
462            .finish()
463    }
464}
465
466/// A spawned task with a fallible response.
467///
468/// This type behaves like [`Task`], however it produces an `Option<T>` when
469/// polled and will return `None` if the executor dropped its
470/// [`Runnable`][`super::Runnable`] without being run.
471///
472/// This can be useful to avoid the panic produced when polling the `Task`
473/// future if the executor dropped its `Runnable`.
474#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
475pub struct FallibleTask<T, M = ()> {
476    task: Task<T, M>,
477}
478
479impl<T, M> FallibleTask<T, M> {
480    /// Detaches the task to let it keep running in the background.
481    ///
482    /// # Examples
483    ///
484    /// ```
485    /// use smol::{Executor, Timer};
486    /// use std::time::Duration;
487    ///
488    /// let ex = Executor::new();
489    ///
490    /// // Spawn a deamon future.
491    /// ex.spawn(async {
492    ///     loop {
493    ///         println!("I'm a daemon task looping forever.");
494    ///         Timer::after(Duration::from_secs(1)).await;
495    ///     }
496    /// })
497    /// .fallible()
498    /// .detach();
499    /// ```
500    pub fn detach(self) {
501        self.task.detach()
502    }
503
504    /// Cancels the task and waits for it to stop running.
505    ///
506    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
507    /// it didn't complete.
508    ///
509    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
510    /// canceling because it also waits for the task to stop running.
511    ///
512    /// # Examples
513    ///
514    /// ```
515    /// # if cfg!(miri) { return; } // Miri does not support epoll
516    /// use smol::{future, Executor, Timer};
517    /// use std::thread;
518    /// use std::time::Duration;
519    ///
520    /// let ex = Executor::new();
521    ///
522    /// // Spawn a deamon future.
523    /// let task = ex.spawn(async {
524    ///     loop {
525    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
526    ///         Timer::after(Duration::from_secs(1)).await;
527    ///     }
528    /// })
529    /// .fallible();
530    ///
531    /// // Run an executor thread.
532    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
533    ///
534    /// future::block_on(async {
535    ///     Timer::after(Duration::from_secs(3)).await;
536    ///     task.cancel().await;
537    /// });
538    /// ```
539    pub async fn cancel(self) -> Option<T> {
540        self.task.cancel().await
541    }
542
543    /// Returns `true` if the current task is finished.
544    ///
545    /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
546    pub fn is_finished(&self) -> bool {
547        self.task.is_finished()
548    }
549}
550
551impl<T, M> Future for FallibleTask<T, M> {
552    type Output = Option<T>;
553
554    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
555        self.task.poll_task(cx)
556    }
557}
558
559impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> {
560    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561        f.debug_struct("FallibleTask")
562            .field("header", self.task.header())
563            .finish()
564    }
565}