wgpu/util/belt.rs
1use crate::{
2 util::align_to, Buffer, BufferAddress, BufferDescriptor, BufferSize, BufferUsages,
3 BufferViewMut, CommandEncoder, Device, MapMode,
4};
5use std::fmt;
6use std::sync::{mpsc, Arc};
7
8struct Chunk {
9 buffer: Arc<Buffer>,
10 size: BufferAddress,
11 offset: BufferAddress,
12}
13
14/// `Sync` wrapper that works by providing only exclusive access.
15///
16/// See https://doc.rust-lang.org/nightly/std/sync/struct.Exclusive.html
17struct Exclusive<T>(T);
18
19unsafe impl<T> Sync for Exclusive<T> {}
20
21impl<T> Exclusive<T> {
22 fn new(value: T) -> Self {
23 Self(value)
24 }
25
26 fn get_mut(&mut self) -> &mut T {
27 &mut self.0
28 }
29}
30
31/// Efficiently performs many buffer writes by sharing and reusing temporary buffers.
32///
33/// Internally it uses a ring-buffer of staging buffers that are sub-allocated.
34/// It has an advantage over [`Queue::write_buffer()`] in a way that it returns a mutable slice,
35/// which you can fill to avoid an extra data copy.
36///
37/// Using a staging belt is slightly complicated, and generally goes as follows:
38/// 1. Write to buffers that need writing to using [`StagingBelt::write_buffer()`].
39/// 2. Call [`StagingBelt::finish()`].
40/// 3. Submit all command encoders that were used in step 1.
41/// 4. Call [`StagingBelt::recall()`].
42///
43/// [`Queue::write_buffer()`]: crate::Queue::write_buffer
44pub struct StagingBelt {
45 chunk_size: BufferAddress,
46 /// Chunks into which we are accumulating data to be transferred.
47 active_chunks: Vec<Chunk>,
48 /// Chunks that have scheduled transfers already; they are unmapped and some
49 /// command encoder has one or more `copy_buffer_to_buffer` commands with them
50 /// as source.
51 closed_chunks: Vec<Chunk>,
52 /// Chunks that are back from the GPU and ready to be mapped for write and put
53 /// into `active_chunks`.
54 free_chunks: Vec<Chunk>,
55 /// When closed chunks are mapped again, the map callback sends them here.
56 sender: Exclusive<mpsc::Sender<Chunk>>,
57 /// Free chunks are received here to be put on `self.free_chunks`.
58 receiver: Exclusive<mpsc::Receiver<Chunk>>,
59}
60
61impl StagingBelt {
62 /// Create a new staging belt.
63 ///
64 /// The `chunk_size` is the unit of internal buffer allocation; writes will be
65 /// sub-allocated within each chunk. Therefore, for optimal use of memory, the
66 /// chunk size should be:
67 ///
68 /// * larger than the largest single [`StagingBelt::write_buffer()`] operation;
69 /// * 1-4 times less than the total amount of data uploaded per submission
70 /// (per [`StagingBelt::finish()`]); and
71 /// * bigger is better, within these bounds.
72 pub fn new(chunk_size: BufferAddress) -> Self {
73 let (sender, receiver) = std::sync::mpsc::channel();
74 StagingBelt {
75 chunk_size,
76 active_chunks: Vec::new(),
77 closed_chunks: Vec::new(),
78 free_chunks: Vec::new(),
79 sender: Exclusive::new(sender),
80 receiver: Exclusive::new(receiver),
81 }
82 }
83
84 /// Allocate the staging belt slice of `size` to be uploaded into the `target` buffer
85 /// at the specified offset.
86 ///
87 /// The upload will be placed into the provided command encoder. This encoder
88 /// must be submitted after [`StagingBelt::finish()`] is called and before
89 /// [`StagingBelt::recall()`] is called.
90 ///
91 /// If the `size` is greater than the size of any free internal buffer, a new buffer
92 /// will be allocated for it. Therefore, the `chunk_size` passed to [`StagingBelt::new()`]
93 /// should ideally be larger than every such size.
94 pub fn write_buffer(
95 &mut self,
96 encoder: &mut CommandEncoder,
97 target: &Buffer,
98 offset: BufferAddress,
99 size: BufferSize,
100 device: &Device,
101 ) -> BufferViewMut<'_> {
102 let mut chunk = if let Some(index) = self
103 .active_chunks
104 .iter()
105 .position(|chunk| chunk.offset + size.get() <= chunk.size)
106 {
107 self.active_chunks.swap_remove(index)
108 } else {
109 self.receive_chunks(); // ensure self.free_chunks is up to date
110
111 if let Some(index) = self
112 .free_chunks
113 .iter()
114 .position(|chunk| size.get() <= chunk.size)
115 {
116 self.free_chunks.swap_remove(index)
117 } else {
118 let size = self.chunk_size.max(size.get());
119 Chunk {
120 #[allow(clippy::arc_with_non_send_sync)] // False positive on emscripten
121 buffer: Arc::new(device.create_buffer(&BufferDescriptor {
122 label: Some("(wgpu internal) StagingBelt staging buffer"),
123 size,
124 usage: BufferUsages::MAP_WRITE | BufferUsages::COPY_SRC,
125 mapped_at_creation: true,
126 })),
127 size,
128 offset: 0,
129 }
130 }
131 };
132
133 encoder.copy_buffer_to_buffer(&chunk.buffer, chunk.offset, target, offset, size.get());
134 let old_offset = chunk.offset;
135 chunk.offset = align_to(chunk.offset + size.get(), crate::MAP_ALIGNMENT);
136
137 self.active_chunks.push(chunk);
138 self.active_chunks
139 .last()
140 .unwrap()
141 .buffer
142 .slice(old_offset..old_offset + size.get())
143 .get_mapped_range_mut()
144 }
145
146 /// Prepare currently mapped buffers for use in a submission.
147 ///
148 /// This must be called before the command encoder(s) provided to
149 /// [`StagingBelt::write_buffer()`] are submitted.
150 ///
151 /// At this point, all the partially used staging buffers are closed (cannot be used for
152 /// further writes) until after [`StagingBelt::recall()`] is called *and* the GPU is done
153 /// copying the data from them.
154 pub fn finish(&mut self) {
155 for chunk in self.active_chunks.drain(..) {
156 chunk.buffer.unmap();
157 self.closed_chunks.push(chunk);
158 }
159 }
160
161 /// Recall all of the closed buffers back to be reused.
162 ///
163 /// This must only be called after the command encoder(s) provided to
164 /// [`StagingBelt::write_buffer()`] are submitted. Additional calls are harmless.
165 /// Not calling this as soon as possible may result in increased buffer memory usage.
166 pub fn recall(&mut self) {
167 self.receive_chunks();
168
169 for chunk in self.closed_chunks.drain(..) {
170 let sender = self.sender.get_mut().clone();
171 chunk
172 .buffer
173 .clone()
174 .slice(..)
175 .map_async(MapMode::Write, move |_| {
176 let _ = sender.send(chunk);
177 });
178 }
179 }
180
181 /// Move all chunks that the GPU is done with (and are now mapped again)
182 /// from `self.receiver` to `self.free_chunks`.
183 fn receive_chunks(&mut self) {
184 while let Ok(mut chunk) = self.receiver.get_mut().try_recv() {
185 chunk.offset = 0;
186 self.free_chunks.push(chunk);
187 }
188 }
189}
190
191impl fmt::Debug for StagingBelt {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 f.debug_struct("StagingBelt")
194 .field("chunk_size", &self.chunk_size)
195 .field("active_chunks", &self.active_chunks.len())
196 .field("closed_chunks", &self.closed_chunks.len())
197 .field("free_chunks", &self.free_chunks.len())
198 .finish_non_exhaustive()
199 }
200}