async_task/
runnable.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
7use core::task::Waker;
8
9use alloc::boxed::Box;
10
11use crate::header::Header;
12use crate::raw::RawTask;
13use crate::state::*;
14use crate::Task;
15
16mod sealed {
17    use super::*;
18    pub trait Sealed<M> {}
19
20    impl<M, F> Sealed<M> for F where F: Fn(Runnable<M>) {}
21
22    impl<M, F> Sealed<M> for WithInfo<F> where F: Fn(Runnable<M>, ScheduleInfo) {}
23}
24
25/// A builder that creates a new task.
26#[derive(Debug)]
27pub struct Builder<M> {
28    /// The metadata associated with the task.
29    pub(crate) metadata: M,
30
31    /// Whether or not a panic that occurs in the task should be propagated.
32    #[cfg(feature = "std")]
33    pub(crate) propagate_panic: bool,
34}
35
36impl<M: Default> Default for Builder<M> {
37    fn default() -> Self {
38        Builder::new().metadata(M::default())
39    }
40}
41
42/// Extra scheduling information that can be passed to the scheduling function.
43///
44/// The data source of this struct is directly from the actual implementation
45/// of the crate itself, different from [`Runnable`]'s metadata, which is
46/// managed by the caller.
47///
48/// # Examples
49///
50/// ```
51/// use async_task::{Runnable, ScheduleInfo, WithInfo};
52/// use std::sync::{Arc, Mutex};
53///
54/// // The future inside the task.
55/// let future = async {
56///     println!("Hello, world!");
57/// };
58///
59/// // If the task gets woken up while running, it will be sent into this channel.
60/// let (s, r) = flume::unbounded();
61/// // Otherwise, it will be placed into this slot.
62/// let lifo_slot = Arc::new(Mutex::new(None));
63/// let schedule = move |runnable: Runnable, info: ScheduleInfo| {
64///     if info.woken_while_running {
65///         s.send(runnable).unwrap()
66///     } else {
67///         let last = lifo_slot.lock().unwrap().replace(runnable);
68///         if let Some(last) = last {
69///             s.send(last).unwrap()
70///         }
71///     }
72/// };
73///
74/// // Create the actual scheduler to be spawned with some future.
75/// let scheduler = WithInfo(schedule);
76/// // Create a task with the future and the scheduler.
77/// let (runnable, task) = async_task::spawn(future, scheduler);
78/// ```
79#[derive(Debug, Copy, Clone)]
80#[non_exhaustive]
81pub struct ScheduleInfo {
82    /// Indicates whether the task gets woken up while running.
83    ///
84    /// It is set to true usually because the task has yielded itself to the
85    /// scheduler.
86    pub woken_while_running: bool,
87}
88
89impl ScheduleInfo {
90    pub(crate) fn new(woken_while_running: bool) -> Self {
91        ScheduleInfo {
92            woken_while_running,
93        }
94    }
95}
96
97/// The trait for scheduling functions.
98pub trait Schedule<M = ()>: sealed::Sealed<M> {
99    /// The actual scheduling procedure.
100    fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo);
101}
102
103impl<M, F> Schedule<M> for F
104where
105    F: Fn(Runnable<M>),
106{
107    fn schedule(&self, runnable: Runnable<M>, _: ScheduleInfo) {
108        self(runnable)
109    }
110}
111
112/// Pass a scheduling function with more scheduling information - a.k.a.
113/// [`ScheduleInfo`].
114///
115/// Sometimes, it's useful to pass the runnable's state directly to the
116/// scheduling function, such as whether it's woken up while running. The
117/// scheduler can thus use the information to determine its scheduling
118/// strategy.
119///
120/// The data source of [`ScheduleInfo`] is directly from the actual
121/// implementation of the crate itself, different from [`Runnable`]'s metadata,
122/// which is managed by the caller.
123///
124/// # Examples
125///
126/// ```
127/// use async_task::{ScheduleInfo, WithInfo};
128/// use std::sync::{Arc, Mutex};
129///
130/// // The future inside the task.
131/// let future = async {
132///     println!("Hello, world!");
133/// };
134///
135/// // If the task gets woken up while running, it will be sent into this channel.
136/// let (s, r) = flume::unbounded();
137/// // Otherwise, it will be placed into this slot.
138/// let lifo_slot = Arc::new(Mutex::new(None));
139/// let schedule = move |runnable, info: ScheduleInfo| {
140///     if info.woken_while_running {
141///         s.send(runnable).unwrap()
142///     } else {
143///         let last = lifo_slot.lock().unwrap().replace(runnable);
144///         if let Some(last) = last {
145///             s.send(last).unwrap()
146///         }
147///     }
148/// };
149///
150/// // Create a task with the future and the schedule function.
151/// let (runnable, task) = async_task::spawn(future, WithInfo(schedule));
152/// ```
153#[derive(Debug)]
154pub struct WithInfo<F>(pub F);
155
156impl<F> From<F> for WithInfo<F> {
157    fn from(value: F) -> Self {
158        WithInfo(value)
159    }
160}
161
162impl<M, F> Schedule<M> for WithInfo<F>
163where
164    F: Fn(Runnable<M>, ScheduleInfo),
165{
166    fn schedule(&self, runnable: Runnable<M>, info: ScheduleInfo) {
167        (self.0)(runnable, info)
168    }
169}
170
171impl Builder<()> {
172    /// Creates a new task builder.
173    ///
174    /// By default, this task builder has no metadata. Use the [`metadata`] method to
175    /// set the metadata.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use async_task::Builder;
181    ///
182    /// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {});
183    /// ```
184    pub fn new() -> Builder<()> {
185        Builder {
186            metadata: (),
187            #[cfg(feature = "std")]
188            propagate_panic: false,
189        }
190    }
191
192    /// Adds metadata to the task.
193    ///
194    /// In certain cases, it may be useful to associate some metadata with a task. For instance,
195    /// you may want to associate a name with a task, or a priority for a priority queue. This
196    /// method allows the user to attach arbitrary metadata to a task that is available through
197    /// the [`Runnable`] or the [`Task`].
198    ///
199    /// # Examples
200    ///
201    /// This example creates an executor that associates a "priority" number with each task, and
202    /// then runs the tasks in order of priority.
203    ///
204    /// ```
205    /// use async_task::{Builder, Runnable};
206    /// use once_cell::sync::Lazy;
207    /// use std::cmp;
208    /// use std::collections::BinaryHeap;
209    /// use std::sync::Mutex;
210    ///
211    /// # smol::future::block_on(async {
212    /// /// A wrapper around a `Runnable<usize>` that implements `Ord` so that it can be used in a
213    /// /// priority queue.
214    /// struct TaskWrapper(Runnable<usize>);
215    ///
216    /// impl PartialEq for TaskWrapper {
217    ///     fn eq(&self, other: &Self) -> bool {
218    ///         self.0.metadata() == other.0.metadata()
219    ///     }
220    /// }
221    ///
222    /// impl Eq for TaskWrapper {}
223    ///
224    /// impl PartialOrd for TaskWrapper {
225    ///    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
226    ///       Some(self.cmp(other))
227    ///    }
228    /// }
229    ///
230    /// impl Ord for TaskWrapper {
231    ///    fn cmp(&self, other: &Self) -> cmp::Ordering {
232    ///        self.0.metadata().cmp(other.0.metadata())
233    ///    }
234    /// }
235    ///
236    /// static EXECUTOR: Lazy<Mutex<BinaryHeap<TaskWrapper>>> = Lazy::new(|| {
237    ///     Mutex::new(BinaryHeap::new())
238    /// });
239    ///
240    /// let schedule = |runnable| {
241    ///     EXECUTOR.lock().unwrap().push(TaskWrapper(runnable));
242    /// };
243    ///
244    /// // Spawn a few tasks with different priorities.
245    /// let spawn_task = move |priority| {
246    ///     let (runnable, task) = Builder::new().metadata(priority).spawn(
247    ///         move |_| async move { priority },
248    ///         schedule,
249    ///     );
250    ///     runnable.schedule();
251    ///     task
252    /// };
253    ///
254    /// let t1 = spawn_task(1);
255    /// let t2 = spawn_task(2);
256    /// let t3 = spawn_task(3);
257    ///
258    /// // Run the tasks in order of priority.
259    /// let mut metadata_seen = vec![];
260    /// while let Some(TaskWrapper(runnable)) = EXECUTOR.lock().unwrap().pop() {
261    ///     metadata_seen.push(*runnable.metadata());
262    ///     runnable.run();
263    /// }
264    ///
265    /// assert_eq!(metadata_seen, vec![3, 2, 1]);
266    /// assert_eq!(t1.await, 1);
267    /// assert_eq!(t2.await, 2);
268    /// assert_eq!(t3.await, 3);
269    /// # });
270    /// ```
271    pub fn metadata<M>(self, metadata: M) -> Builder<M> {
272        Builder {
273            metadata,
274            #[cfg(feature = "std")]
275            propagate_panic: self.propagate_panic,
276        }
277    }
278}
279
280impl<M> Builder<M> {
281    /// Propagates panics that occur in the task.
282    ///
283    /// When this is `true`, panics that occur in the task will be propagated to the caller of
284    /// the [`Task`]. When this is false, no special action is taken when a panic occurs in the
285    /// task, meaning that the caller of [`Runnable::run`] will observe a panic.
286    ///
287    /// This is only available when the `std` feature is enabled. By default, this is `false`.
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// use async_task::Builder;
293    /// use futures_lite::future::poll_fn;
294    /// use std::future::Future;
295    /// use std::panic;
296    /// use std::pin::Pin;
297    /// use std::task::{Context, Poll};
298    ///
299    /// fn did_panic<F: FnOnce()>(f: F) -> bool {
300    ///     panic::catch_unwind(panic::AssertUnwindSafe(f)).is_err()
301    /// }
302    ///
303    /// # smol::future::block_on(async {
304    /// let (runnable1, mut task1) = Builder::new()
305    ///    .propagate_panic(true)
306    ///    .spawn(|()| async move { panic!() }, |_| {});
307    ///
308    /// let (runnable2, mut task2) = Builder::new()
309    ///    .propagate_panic(false)
310    ///    .spawn(|()| async move { panic!() }, |_| {});
311    ///
312    /// assert!(!did_panic(|| { runnable1.run(); }));
313    /// assert!(did_panic(|| { runnable2.run(); }));
314    ///
315    /// let waker = poll_fn(|cx| Poll::Ready(cx.waker().clone())).await;
316    /// let mut cx = Context::from_waker(&waker);
317    /// assert!(did_panic(|| { let _ = Pin::new(&mut task1).poll(&mut cx); }));
318    /// assert!(did_panic(|| { let _ = Pin::new(&mut task2).poll(&mut cx); }));
319    /// # });
320    /// ```
321    #[cfg(feature = "std")]
322    pub fn propagate_panic(self, propagate_panic: bool) -> Builder<M> {
323        Builder {
324            metadata: self.metadata,
325            propagate_panic,
326        }
327    }
328
329    /// Creates a new task.
330    ///
331    /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
332    /// output.
333    ///
334    /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
335    /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
336    /// again.
337    ///
338    /// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
339    /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
340    /// should push it into a task queue so that it can be processed later.
341    ///
342    /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
343    /// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
344    ///
345    /// # Examples
346    ///
347    /// ```
348    /// use async_task::Builder;
349    ///
350    /// // The future inside the task.
351    /// let future = async {
352    ///     println!("Hello, world!");
353    /// };
354    ///
355    /// // A function that schedules the task when it gets woken up.
356    /// let (s, r) = flume::unbounded();
357    /// let schedule = move |runnable| s.send(runnable).unwrap();
358    ///
359    /// // Create a task with the future and the schedule function.
360    /// let (runnable, task) = Builder::new().spawn(|()| future, schedule);
361    /// ```
362    pub fn spawn<F, Fut, S>(self, future: F, schedule: S) -> (Runnable<M>, Task<Fut::Output, M>)
363    where
364        F: FnOnce(&M) -> Fut,
365        Fut: Future + Send + 'static,
366        Fut::Output: Send + 'static,
367        S: Schedule<M> + Send + Sync + 'static,
368    {
369        unsafe { self.spawn_unchecked(future, schedule) }
370    }
371
372    /// Creates a new thread-local task.
373    ///
374    /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
375    /// [`Runnable`] is used or dropped on another thread, a panic will occur.
376    ///
377    /// This function is only available when the `std` feature for this crate is enabled.
378    ///
379    /// # Examples
380    ///
381    /// ```
382    /// use async_task::{Builder, Runnable};
383    /// use flume::{Receiver, Sender};
384    /// use std::rc::Rc;
385    ///
386    /// thread_local! {
387    ///     // A queue that holds scheduled tasks.
388    ///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
389    /// }
390    ///
391    /// // Make a non-Send future.
392    /// let msg: Rc<str> = "Hello, world!".into();
393    /// let future = async move {
394    ///     println!("{}", msg);
395    /// };
396    ///
397    /// // A function that schedules the task when it gets woken up.
398    /// let s = QUEUE.with(|(s, _)| s.clone());
399    /// let schedule = move |runnable| s.send(runnable).unwrap();
400    ///
401    /// // Create a task with the future and the schedule function.
402    /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule);
403    /// ```
404    #[cfg(feature = "std")]
405    pub fn spawn_local<F, Fut, S>(
406        self,
407        future: F,
408        schedule: S,
409    ) -> (Runnable<M>, Task<Fut::Output, M>)
410    where
411        F: FnOnce(&M) -> Fut,
412        Fut: Future + 'static,
413        Fut::Output: 'static,
414        S: Schedule<M> + Send + Sync + 'static,
415    {
416        use std::mem::ManuallyDrop;
417        use std::pin::Pin;
418        use std::task::{Context, Poll};
419        use std::thread::{self, ThreadId};
420
421        #[inline]
422        fn thread_id() -> ThreadId {
423            std::thread_local! {
424                static ID: ThreadId = thread::current().id();
425            }
426            ID.try_with(|id| *id)
427                .unwrap_or_else(|_| thread::current().id())
428        }
429
430        struct Checked<F> {
431            id: ThreadId,
432            inner: ManuallyDrop<F>,
433        }
434
435        impl<F> Drop for Checked<F> {
436            fn drop(&mut self) {
437                assert!(
438                    self.id == thread_id(),
439                    "local task dropped by a thread that didn't spawn it"
440                );
441                unsafe {
442                    ManuallyDrop::drop(&mut self.inner);
443                }
444            }
445        }
446
447        impl<F: Future> Future for Checked<F> {
448            type Output = F::Output;
449
450            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451                assert!(
452                    self.id == thread_id(),
453                    "local task polled by a thread that didn't spawn it"
454                );
455                unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
456            }
457        }
458
459        // Wrap the future into one that checks which thread it's on.
460        let future = move |meta| {
461            let future = future(meta);
462
463            Checked {
464                id: thread_id(),
465                inner: ManuallyDrop::new(future),
466            }
467        };
468
469        unsafe { self.spawn_unchecked(future, schedule) }
470    }
471
472    /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
473    ///
474    /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
475    /// `'static` on `future` and `schedule`.
476    ///
477    /// # Safety
478    ///
479    /// - If `Fut` is not [`Send`], its [`Runnable`] must be used and dropped on the original
480    ///   thread.
481    /// - If `Fut` is not `'static`, borrowed non-metadata variables must outlive its [`Runnable`].
482    /// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
483    ///   must be used and dropped on the original thread.
484    /// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
485    ///   [`Runnable`]'s [`Waker`].
486    ///
487    /// # Examples
488    ///
489    /// ```
490    /// use async_task::Builder;
491    ///
492    /// // The future inside the task.
493    /// let future = async {
494    ///     println!("Hello, world!");
495    /// };
496    ///
497    /// // If the task gets woken up, it will be sent into this channel.
498    /// let (s, r) = flume::unbounded();
499    /// let schedule = move |runnable| s.send(runnable).unwrap();
500    ///
501    /// // Create a task with the future and the schedule function.
502    /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) };
503    /// ```
504    pub unsafe fn spawn_unchecked<'a, F, Fut, S>(
505        self,
506        future: F,
507        schedule: S,
508    ) -> (Runnable<M>, Task<Fut::Output, M>)
509    where
510        F: FnOnce(&'a M) -> Fut,
511        Fut: Future + 'a,
512        S: Schedule<M>,
513        M: 'a,
514    {
515        // Allocate large futures on the heap.
516        let ptr = if mem::size_of::<Fut>() >= 2048 {
517            let future = |meta| {
518                let future = future(meta);
519                Box::pin(future)
520            };
521
522            RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self)
523        } else {
524            RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, self)
525        };
526
527        let runnable = Runnable::from_raw(ptr);
528        let task = Task {
529            ptr,
530            _marker: PhantomData,
531        };
532        (runnable, task)
533    }
534}
535
536/// Creates a new task.
537///
538/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
539/// output.
540///
541/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
542/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
543/// again.
544///
545/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
546/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
547/// should push it into a task queue so that it can be processed later.
548///
549/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
550/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
551///
552/// # Examples
553///
554/// ```
555/// // The future inside the task.
556/// let future = async {
557///     println!("Hello, world!");
558/// };
559///
560/// // A function that schedules the task when it gets woken up.
561/// let (s, r) = flume::unbounded();
562/// let schedule = move |runnable| s.send(runnable).unwrap();
563///
564/// // Create a task with the future and the schedule function.
565/// let (runnable, task) = async_task::spawn(future, schedule);
566/// ```
567pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
568where
569    F: Future + Send + 'static,
570    F::Output: Send + 'static,
571    S: Schedule + Send + Sync + 'static,
572{
573    unsafe { spawn_unchecked(future, schedule) }
574}
575
576/// Creates a new thread-local task.
577///
578/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
579/// [`Runnable`] is used or dropped on another thread, a panic will occur.
580///
581/// This function is only available when the `std` feature for this crate is enabled.
582///
583/// # Examples
584///
585/// ```
586/// use async_task::Runnable;
587/// use flume::{Receiver, Sender};
588/// use std::rc::Rc;
589///
590/// thread_local! {
591///     // A queue that holds scheduled tasks.
592///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
593/// }
594///
595/// // Make a non-Send future.
596/// let msg: Rc<str> = "Hello, world!".into();
597/// let future = async move {
598///     println!("{}", msg);
599/// };
600///
601/// // A function that schedules the task when it gets woken up.
602/// let s = QUEUE.with(|(s, _)| s.clone());
603/// let schedule = move |runnable| s.send(runnable).unwrap();
604///
605/// // Create a task with the future and the schedule function.
606/// let (runnable, task) = async_task::spawn_local(future, schedule);
607/// ```
608#[cfg(feature = "std")]
609pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
610where
611    F: Future + 'static,
612    F::Output: 'static,
613    S: Schedule + Send + Sync + 'static,
614{
615    Builder::new().spawn_local(move |()| future, schedule)
616}
617
618/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
619///
620/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
621/// `'static` on `future` and `schedule`.
622///
623/// # Safety
624///
625/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
626///   thread.
627/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
628/// - If `schedule` is not [`Send`] and [`Sync`], all instances of the [`Runnable`]'s [`Waker`]
629///   must be used and dropped on the original thread.
630/// - If `schedule` is not `'static`, borrowed variables must outlive all instances of the
631///   [`Runnable`]'s [`Waker`].
632///
633/// # Examples
634///
635/// ```
636/// // The future inside the task.
637/// let future = async {
638///     println!("Hello, world!");
639/// };
640///
641/// // If the task gets woken up, it will be sent into this channel.
642/// let (s, r) = flume::unbounded();
643/// let schedule = move |runnable| s.send(runnable).unwrap();
644///
645/// // Create a task with the future and the schedule function.
646/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
647/// ```
648pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
649where
650    F: Future,
651    S: Schedule,
652{
653    Builder::new().spawn_unchecked(move |()| future, schedule)
654}
655
656/// A handle to a runnable task.
657///
658/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
659/// scheduled for running.
660///
661/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
662/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
663/// again.
664///
665/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
666/// awaiting the [`Task`] after that will result in a panic.
667///
668/// # Examples
669///
670/// ```
671/// use async_task::Runnable;
672/// use once_cell::sync::Lazy;
673/// use std::{panic, thread};
674///
675/// // A simple executor.
676/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
677///     let (sender, receiver) = flume::unbounded::<Runnable>();
678///     thread::spawn(|| {
679///         for runnable in receiver {
680///             let _ignore_panic = panic::catch_unwind(|| runnable.run());
681///         }
682///     });
683///     sender
684/// });
685///
686/// // Create a task with a simple future.
687/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
688/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
689///
690/// // Schedule the task and await its output.
691/// runnable.schedule();
692/// assert_eq!(smol::future::block_on(task), 3);
693/// ```
694pub struct Runnable<M = ()> {
695    /// A pointer to the heap-allocated task.
696    pub(crate) ptr: NonNull<()>,
697
698    /// A marker capturing generic type `M`.
699    pub(crate) _marker: PhantomData<M>,
700}
701
702unsafe impl<M: Send + Sync> Send for Runnable<M> {}
703unsafe impl<M: Send + Sync> Sync for Runnable<M> {}
704
705#[cfg(feature = "std")]
706impl<M> std::panic::UnwindSafe for Runnable<M> {}
707#[cfg(feature = "std")]
708impl<M> std::panic::RefUnwindSafe for Runnable<M> {}
709
710impl<M> Runnable<M> {
711    /// Get the metadata associated with this task.
712    ///
713    /// Tasks can be created with a metadata object associated with them; by default, this
714    /// is a `()` value. See the [`Builder::metadata()`] method for more information.
715    pub fn metadata(&self) -> &M {
716        &self.header().metadata
717    }
718
719    /// Schedules the task.
720    ///
721    /// This is a convenience method that passes the [`Runnable`] to the schedule function.
722    ///
723    /// # Examples
724    ///
725    /// ```
726    /// // A function that schedules the task when it gets woken up.
727    /// let (s, r) = flume::unbounded();
728    /// let schedule = move |runnable| s.send(runnable).unwrap();
729    ///
730    /// // Create a task with a simple future and the schedule function.
731    /// let (runnable, task) = async_task::spawn(async {}, schedule);
732    ///
733    /// // Schedule the task.
734    /// assert_eq!(r.len(), 0);
735    /// runnable.schedule();
736    /// assert_eq!(r.len(), 1);
737    /// ```
738    pub fn schedule(self) {
739        let ptr = self.ptr.as_ptr();
740        let header = ptr as *const Header<M>;
741        mem::forget(self);
742
743        unsafe {
744            ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
745        }
746    }
747
748    /// Runs the task by polling its future.
749    ///
750    /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
751    /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
752    /// [`Runnable`] vanishes until the task is woken.
753    /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
754    /// it woke itself and then gave the control back to the executor.
755    ///
756    /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
757    /// this method simply destroys the task.
758    ///
759    /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
760    /// after that will also result in a panic.
761    ///
762    /// # Examples
763    ///
764    /// ```
765    /// // A function that schedules the task when it gets woken up.
766    /// let (s, r) = flume::unbounded();
767    /// let schedule = move |runnable| s.send(runnable).unwrap();
768    ///
769    /// // Create a task with a simple future and the schedule function.
770    /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
771    ///
772    /// // Run the task and check its output.
773    /// runnable.run();
774    /// assert_eq!(smol::future::block_on(task), 3);
775    /// ```
776    pub fn run(self) -> bool {
777        let ptr = self.ptr.as_ptr();
778        let header = ptr as *const Header<M>;
779        mem::forget(self);
780
781        unsafe { ((*header).vtable.run)(ptr) }
782    }
783
784    /// Returns a waker associated with this task.
785    ///
786    /// # Examples
787    ///
788    /// ```
789    /// use smol::future;
790    ///
791    /// // A function that schedules the task when it gets woken up.
792    /// let (s, r) = flume::unbounded();
793    /// let schedule = move |runnable| s.send(runnable).unwrap();
794    ///
795    /// // Create a task with a simple future and the schedule function.
796    /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
797    ///
798    /// // Take a waker and run the task.
799    /// let waker = runnable.waker();
800    /// runnable.run();
801    ///
802    /// // Reschedule the task by waking it.
803    /// assert_eq!(r.len(), 0);
804    /// waker.wake();
805    /// assert_eq!(r.len(), 1);
806    /// ```
807    pub fn waker(&self) -> Waker {
808        let ptr = self.ptr.as_ptr();
809        let header = ptr as *const Header<M>;
810
811        unsafe {
812            let raw_waker = ((*header).vtable.clone_waker)(ptr);
813            Waker::from_raw(raw_waker)
814        }
815    }
816
817    fn header(&self) -> &Header<M> {
818        unsafe { &*(self.ptr.as_ptr() as *const Header<M>) }
819    }
820
821    /// Converts this task into a raw pointer.
822    ///
823    /// To avoid a memory leak the pointer must be converted back to a Runnable using [`Runnable<M>::from_raw`][from_raw].
824    ///
825    /// `into_raw` does not change the state of the [`Task`], but there is no guarantee that it will be in the same state after calling [`Runnable<M>::from_raw`][from_raw],
826    /// as the corresponding [`Task`] might have been dropped or cancelled.
827    ///
828    /// # Examples
829    ///
830    /// ```rust
831    /// use async_task::{Runnable, spawn};
832
833    /// let (runnable, task) = spawn(async {}, |_| {});
834    /// let runnable_pointer = runnable.into_raw();
835    ///
836    /// unsafe {
837    ///     // Convert back to an `Runnable` to prevent leak.
838    ///     let runnable = Runnable::<()>::from_raw(runnable_pointer);
839    ///     runnable.run();
840    ///     // Further calls to `Runnable::from_raw(runnable_pointer)` would be memory-unsafe.
841    /// }
842    /// // The memory was freed when `x` went out of scope above, so `runnable_pointer` is now dangling!
843    /// ```
844    /// [from_raw]: #method.from_raw
845    pub fn into_raw(self) -> NonNull<()> {
846        let ptr = self.ptr;
847        mem::forget(self);
848        ptr
849    }
850
851    /// Converts a raw pointer into a Runnable.
852    ///
853    /// # Safety
854    ///
855    /// This method should only be used with raw pointers returned from [`Runnable<M>::into_raw`][into_raw].
856    /// It is not safe to use the provided pointer once it is passed to `from_raw`.
857    /// Crucially, it is unsafe to call `from_raw` multiple times with the same pointer - even if the resulting [`Runnable`] is not used -
858    /// as internally `async-task` uses reference counting.
859    ///
860    /// It is however safe to call [`Runnable<M>::into_raw`][into_raw] on a [`Runnable`] created with `from_raw` or
861    /// after the [`Task`] associated with a given Runnable has been dropped or cancelled.
862    ///
863    /// The state of the [`Runnable`] created with `from_raw` is not specified.
864    ///
865    /// # Examples
866    ///
867    /// ```rust
868    /// use async_task::{Runnable, spawn};
869
870    /// let (runnable, task) = spawn(async {}, |_| {});
871    /// let runnable_pointer = runnable.into_raw();
872    ///
873    /// drop(task);
874    /// unsafe {
875    ///     // Convert back to an `Runnable` to prevent leak.
876    ///     let runnable = Runnable::<()>::from_raw(runnable_pointer);
877    ///     let did_poll = runnable.run();
878    ///     assert!(!did_poll);
879    ///     // Further calls to `Runnable::from_raw(runnable_pointer)` would be memory-unsafe.
880    /// }
881    /// // The memory was freed when `x` went out of scope above, so `runnable_pointer` is now dangling!
882    /// ```
883
884    /// [into_raw]: #method.into_raw
885    pub unsafe fn from_raw(ptr: NonNull<()>) -> Self {
886        Self {
887            ptr,
888            _marker: Default::default(),
889        }
890    }
891}
892
893impl<M> Drop for Runnable<M> {
894    fn drop(&mut self) {
895        let ptr = self.ptr.as_ptr();
896        let header = self.header();
897
898        unsafe {
899            let mut state = header.state.load(Ordering::Acquire);
900
901            loop {
902                // If the task has been completed or closed, it can't be canceled.
903                if state & (COMPLETED | CLOSED) != 0 {
904                    break;
905                }
906
907                // Mark the task as closed.
908                match header.state.compare_exchange_weak(
909                    state,
910                    state | CLOSED,
911                    Ordering::AcqRel,
912                    Ordering::Acquire,
913                ) {
914                    Ok(_) => break,
915                    Err(s) => state = s,
916                }
917            }
918
919            // Drop the future.
920            (header.vtable.drop_future)(ptr);
921
922            // Mark the task as unscheduled.
923            let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel);
924
925            // Notify the awaiter that the future has been dropped.
926            if state & AWAITER != 0 {
927                (*header).notify(None);
928            }
929
930            // Drop the task reference.
931            (header.vtable.drop_ref)(ptr);
932        }
933    }
934}
935
936impl<M: fmt::Debug> fmt::Debug for Runnable<M> {
937    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
938        let ptr = self.ptr.as_ptr();
939        let header = ptr as *const Header<M>;
940
941        f.debug_struct("Runnable")
942            .field("header", unsafe { &(*header) })
943            .finish()
944    }
945}