bevy_utils/
parallel_queue.rs

1use alloc::vec::Vec;
2use core::{cell::RefCell, ops::DerefMut};
3use thread_local::ThreadLocal;
4
5/// A cohesive set of thread-local values of a given type.
6///
7/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
8#[derive(Default)]
9pub struct Parallel<T: Send> {
10    locals: ThreadLocal<RefCell<T>>,
11}
12
13/// A scope guard of a `Parallel`, when this struct is dropped ,the value will writeback to its `Parallel`
14impl<T: Send> Parallel<T> {
15    /// Gets a mutable iterator over all of the per-thread queues.
16    pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
17        self.locals.iter_mut().map(RefCell::get_mut)
18    }
19
20    /// Clears all of the stored thread local values.
21    pub fn clear(&mut self) {
22        self.locals.clear();
23    }
24}
25
26impl<T: Default + Send> Parallel<T> {
27    /// Retrieves the thread-local value for the current thread and runs `f` on it.
28    ///
29    /// If there is no thread-local value, it will be initialized to its default.
30    pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
31        let mut cell = self.locals.get_or_default().borrow_mut();
32        let ret = f(cell.deref_mut());
33        ret
34    }
35
36    /// Mutably borrows the thread-local value.
37    ///
38    /// If there is no thread-local value, it will be initialized to it's default.
39    pub fn borrow_local_mut(&self) -> impl DerefMut<Target = T> + '_ {
40        self.locals.get_or_default().borrow_mut()
41    }
42}
43
44impl<T, I> Parallel<I>
45where
46    I: IntoIterator<Item = T> + Default + Send + 'static,
47{
48    /// Drains all enqueued items from all threads and returns an iterator over them.
49    ///
50    /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
51    /// If iteration is terminated part way, the rest of the enqueued items in the same
52    /// chunk will be dropped, and the rest of the undrained elements will remain.
53    ///
54    /// The ordering is not guaranteed.
55    pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
56        self.locals.iter_mut().flat_map(|item| item.take())
57    }
58}
59
60#[cfg(feature = "alloc")]
61impl<T: Send> Parallel<Vec<T>> {
62    /// Collect all enqueued items from all threads and appends them to the end of a
63    /// single Vec.
64    ///
65    /// The ordering is not guaranteed.
66    pub fn drain_into(&mut self, out: &mut Vec<T>) {
67        let size = self
68            .locals
69            .iter_mut()
70            .map(|queue| queue.get_mut().len())
71            .sum();
72        out.reserve(size);
73        for queue in self.locals.iter_mut() {
74            out.append(queue.get_mut());
75        }
76    }
77}