bevy_asset/io/
processor_gated.rs1use crate::{
2 io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
3 processor::{AssetProcessorData, ProcessStatus},
4 AssetPath,
5};
6use alloc::sync::Arc;
7use async_lock::RwLockReadGuardArc;
8use bevy_utils::tracing::trace;
9use core::{pin::Pin, task::Poll};
10use futures_io::AsyncRead;
11use std::path::Path;
12
13use super::{AsyncSeekForward, ErasedAssetReader};
14
15pub struct ProcessorGatedReader {
20 reader: Box<dyn ErasedAssetReader>,
21 source: AssetSourceId<'static>,
22 processor_data: Arc<AssetProcessorData>,
23}
24
25impl ProcessorGatedReader {
26 pub fn new(
28 source: AssetSourceId<'static>,
29 reader: Box<dyn ErasedAssetReader>,
30 processor_data: Arc<AssetProcessorData>,
31 ) -> Self {
32 Self {
33 source,
34 processor_data,
35 reader,
36 }
37 }
38
39 async fn get_transaction_lock(
42 &self,
43 path: &AssetPath<'static>,
44 ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
45 let infos = self.processor_data.asset_infos.read().await;
46 let info = infos
47 .get(path)
48 .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
49 Ok(info.file_transaction_lock.read_arc().await)
50 }
51}
52
53impl AssetReader for ProcessorGatedReader {
54 async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
55 let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
56 trace!("Waiting for processing to finish before reading {asset_path}");
57 let process_result = self
58 .processor_data
59 .wait_until_processed(asset_path.clone())
60 .await;
61 match process_result {
62 ProcessStatus::Processed => {}
63 ProcessStatus::Failed | ProcessStatus::NonExistent => {
64 return Err(AssetReaderError::NotFound(path.to_owned()));
65 }
66 }
67 trace!("Processing finished with {asset_path}, reading {process_result:?}",);
68 let lock = self.get_transaction_lock(&asset_path).await?;
69 let asset_reader = self.reader.read(path).await?;
70 let reader = TransactionLockedReader::new(asset_reader, lock);
71 Ok(reader)
72 }
73
74 async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
75 let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
76 trace!("Waiting for processing to finish before reading meta for {asset_path}",);
77 let process_result = self
78 .processor_data
79 .wait_until_processed(asset_path.clone())
80 .await;
81 match process_result {
82 ProcessStatus::Processed => {}
83 ProcessStatus::Failed | ProcessStatus::NonExistent => {
84 return Err(AssetReaderError::NotFound(path.to_owned()));
85 }
86 }
87 trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
88 let lock = self.get_transaction_lock(&asset_path).await?;
89 let meta_reader = self.reader.read_meta(path).await?;
90 let reader = TransactionLockedReader::new(meta_reader, lock);
91 Ok(reader)
92 }
93
94 async fn read_directory<'a>(
95 &'a self,
96 path: &'a Path,
97 ) -> Result<Box<PathStream>, AssetReaderError> {
98 trace!(
99 "Waiting for processing to finish before reading directory {:?}",
100 path
101 );
102 self.processor_data.wait_until_finished().await;
103 trace!("Processing finished, reading directory {:?}", path);
104 let result = self.reader.read_directory(path).await?;
105 Ok(result)
106 }
107
108 async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
109 trace!(
110 "Waiting for processing to finish before reading directory {:?}",
111 path
112 );
113 self.processor_data.wait_until_finished().await;
114 trace!("Processing finished, getting directory status {:?}", path);
115 let result = self.reader.is_directory(path).await?;
116 Ok(result)
117 }
118}
119
120pub struct TransactionLockedReader<'a> {
122 reader: Box<dyn Reader + 'a>,
123 _file_transaction_lock: RwLockReadGuardArc<()>,
124}
125
126impl<'a> TransactionLockedReader<'a> {
127 fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
128 Self {
129 reader,
130 _file_transaction_lock: file_transaction_lock,
131 }
132 }
133}
134
135impl AsyncRead for TransactionLockedReader<'_> {
136 fn poll_read(
137 mut self: Pin<&mut Self>,
138 cx: &mut core::task::Context<'_>,
139 buf: &mut [u8],
140 ) -> Poll<futures_io::Result<usize>> {
141 Pin::new(&mut self.reader).poll_read(cx, buf)
142 }
143}
144
145impl AsyncSeekForward for TransactionLockedReader<'_> {
146 fn poll_seek_forward(
147 mut self: Pin<&mut Self>,
148 cx: &mut core::task::Context<'_>,
149 offset: u64,
150 ) -> Poll<std::io::Result<u64>> {
151 Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
152 }
153}
154
155impl Reader for TransactionLockedReader<'_> {
156 fn read_to_end<'a>(
157 &'a mut self,
158 buf: &'a mut Vec<u8>,
159 ) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
160 self.reader.read_to_end(buf)
161 }
162}