bevy_utils/parallel_queue.rs
1use alloc::vec::Vec;
2use core::{cell::RefCell, ops::DerefMut};
3use thread_local::ThreadLocal;
4
5/// A cohesive set of thread-local values of a given type.
6///
7/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
8pub struct Parallel<T: Send> {
9 locals: ThreadLocal<RefCell<T>>,
10}
11
12impl<T: Send> Parallel<T> {
13 /// Gets a mutable iterator over all of the per-thread queues.
14 pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
15 self.locals.iter_mut().map(RefCell::get_mut)
16 }
17
18 /// Clears all of the stored thread local values.
19 pub fn clear(&mut self) {
20 self.locals.clear();
21 }
22
23 /// Retrieves the thread-local value for the current thread and runs `f` on it.
24 ///
25 /// If there is no thread-local value, it will be initialized to the result
26 /// of `create`.
27 pub fn scope_or<R>(&self, create: impl FnOnce() -> T, f: impl FnOnce(&mut T) -> R) -> R {
28 f(&mut self.borrow_local_mut_or(create))
29 }
30
31 /// Mutably borrows the thread-local value.
32 ///
33 /// If there is no thread-local value, it will be initialized to the result
34 /// of `create`.
35 pub fn borrow_local_mut_or(
36 &self,
37 create: impl FnOnce() -> T,
38 ) -> impl DerefMut<Target = T> + '_ {
39 self.locals.get_or(|| RefCell::new(create())).borrow_mut()
40 }
41}
42
43impl<T: Default + Send> Parallel<T> {
44 /// Retrieves the thread-local value for the current thread and runs `f` on it.
45 ///
46 /// If there is no thread-local value, it will be initialized to its default.
47 pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
48 self.scope_or(Default::default, f)
49 }
50
51 /// Mutably borrows the thread-local value.
52 ///
53 /// If there is no thread-local value, it will be initialized to its default.
54 pub fn borrow_local_mut(&self) -> impl DerefMut<Target = T> + '_ {
55 self.borrow_local_mut_or(Default::default)
56 }
57}
58
59impl<T, I> Parallel<I>
60where
61 I: IntoIterator<Item = T> + Default + Send + 'static,
62{
63 /// Drains all enqueued items from all threads and returns an iterator over them.
64 ///
65 /// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
66 /// If iteration is terminated part way, the rest of the enqueued items in the same
67 /// chunk will be dropped, and the rest of the undrained elements will remain.
68 ///
69 /// The ordering is not guaranteed.
70 pub fn drain(&mut self) -> impl Iterator<Item = T> + '_ {
71 self.locals.iter_mut().flat_map(|item| item.take())
72 }
73}
74
75impl<T: Send> Parallel<Vec<T>> {
76 /// Collect all enqueued items from all threads and appends them to the end of a
77 /// single Vec.
78 ///
79 /// The ordering is not guaranteed.
80 pub fn drain_into(&mut self, out: &mut Vec<T>) {
81 let size = self
82 .locals
83 .iter_mut()
84 .map(|queue| queue.get_mut().len())
85 .sum();
86 out.reserve(size);
87 for queue in self.locals.iter_mut() {
88 out.append(queue.get_mut());
89 }
90 }
91}
92
93// `Default` is manually implemented to avoid the `T: Default` bound.
94impl<T: Send> Default for Parallel<T> {
95 fn default() -> Self {
96 Self {
97 locals: ThreadLocal::default(),
98 }
99 }
100}