bevy_utils/
parallel_queue.rs

1use alloc::vec::Vec;
2use core::{cell::RefCell, ops::DerefMut};
3use thread_local::ThreadLocal;
4
5/// A cohesive set of thread-local values of a given type.
6///
7/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
8pub struct Parallel<T: Send> {
9    locals: ThreadLocal<RefCell<T>>,
10}
11
12impl<T: Send> Parallel<T> {
13    /// Gets a mutable iterator over all of the per-thread queues.
14    pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
15        self.locals.iter_mut().map(RefCell::get_mut)
16    }
17
18    /// Clears all of the stored thread local values.
19    pub fn clear(&mut self) {
20        self.locals.clear();
21    }
22
23    /// Retrieves the thread-local value for the current thread and runs `f` on it.
24    ///
25    /// If there is no thread-local value, it will be initialized to the result
26    /// of `create`.
27    pub fn scope_or<R>(&self, create: impl FnOnce() -> T, f: impl FnOnce(&mut T) -> R) -> R {
28        f(&mut self.borrow_local_mut_or(create))
29    }
30
31    /// Mutably borrows the thread-local value.
32    ///
33    /// If there is no thread-local value, it will be initialized to the result
34    /// of `create`.
35    pub fn borrow_local_mut_or(
36        &self,
37        create: impl FnOnce() -> T,
38    ) -> impl DerefMut<Target = T> + '_ {
39        self.locals.get_or(|| RefCell::new(create())).borrow_mut()
40    }
41}
42
43impl<T: Default + Send> Parallel<T> {
44    /// Retrieves the thread-local value for the current thread and runs `f` on it.
45    ///
46    /// If there is no thread-local value, it will be initialized to its default.
47    pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
48        self.scope_or(Default::default, f)
49    }
50
51    /// Mutably borrows the thread-local value.
52    ///
53    /// If there is no thread-local value, it will be initialized to its default.
54    pub fn borrow_local_mut(&self) -> impl DerefMut<Target = T> + '_ {
55        self.borrow_local_mut_or(Default::default)
56    }
57}
58
59impl<T, I> Parallel<I>
60where
61    I: IntoIterator<Item = T> + Default + Send + 'static,
62{
63    /// Drains all enqueued items from all threads and returns an iterator over them.
64    ///
65    /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
66    /// If iteration is terminated part way, the rest of the enqueued items in the same
67    /// chunk will be dropped, and the rest of the undrained elements will remain.
68    ///
69    /// The ordering is not guaranteed.
70    pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
71        self.locals.iter_mut().flat_map(|item| item.take())
72    }
73}
74
75impl<T: Send> Parallel<Vec<T>> {
76    /// Collect all enqueued items from all threads and appends them to the end of a
77    /// single Vec.
78    ///
79    /// The ordering is not guaranteed.
80    pub fn drain_into(&mut self, out: &mut Vec<T>) {
81        let size = self
82            .locals
83            .iter_mut()
84            .map(|queue| queue.get_mut().len())
85            .sum();
86        out.reserve(size);
87        for queue in self.locals.iter_mut() {
88            out.append(queue.get_mut());
89        }
90    }
91}
92
93// `Default` is manually implemented to avoid the `T: Default` bound.
94impl<T: Send> Default for Parallel<T> {
95    fn default() -> Self {
96        Self {
97            locals: ThreadLocal::default(),
98        }
99    }
100}