bevy_asset/io/
processor_gated.rs

1use crate::{
2    io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader},
3    processor::{AssetProcessorData, ProcessStatus},
4    AssetPath,
5};
6use alloc::sync::Arc;
7use async_lock::RwLockReadGuardArc;
8use bevy_utils::tracing::trace;
9use core::{pin::Pin, task::Poll};
10use futures_io::AsyncRead;
11use std::path::Path;
12
13use super::{AsyncSeekForward, ErasedAssetReader};
14
15/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
16/// given path until that path has been processed by [`AssetProcessor`].
17///
18/// [`AssetProcessor`]: crate::processor::AssetProcessor
19pub struct ProcessorGatedReader {
20    reader: Box<dyn ErasedAssetReader>,
21    source: AssetSourceId<'static>,
22    processor_data: Arc<AssetProcessorData>,
23}
24
25impl ProcessorGatedReader {
26    /// Creates a new [`ProcessorGatedReader`].
27    pub fn new(
28        source: AssetSourceId<'static>,
29        reader: Box<dyn ErasedAssetReader>,
30        processor_data: Arc<AssetProcessorData>,
31    ) -> Self {
32        Self {
33            source,
34            processor_data,
35            reader,
36        }
37    }
38
39    /// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
40    /// while it is held.
41    async fn get_transaction_lock(
42        &self,
43        path: &AssetPath<'static>,
44    ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
45        let infos = self.processor_data.asset_infos.read().await;
46        let info = infos
47            .get(path)
48            .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
49        Ok(info.file_transaction_lock.read_arc().await)
50    }
51}
52
53impl AssetReader for ProcessorGatedReader {
54    async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
55        let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
56        trace!("Waiting for processing to finish before reading {asset_path}");
57        let process_result = self
58            .processor_data
59            .wait_until_processed(asset_path.clone())
60            .await;
61        match process_result {
62            ProcessStatus::Processed => {}
63            ProcessStatus::Failed | ProcessStatus::NonExistent => {
64                return Err(AssetReaderError::NotFound(path.to_owned()));
65            }
66        }
67        trace!("Processing finished with {asset_path}, reading {process_result:?}",);
68        let lock = self.get_transaction_lock(&asset_path).await?;
69        let asset_reader = self.reader.read(path).await?;
70        let reader = TransactionLockedReader::new(asset_reader, lock);
71        Ok(reader)
72    }
73
74    async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
75        let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
76        trace!("Waiting for processing to finish before reading meta for {asset_path}",);
77        let process_result = self
78            .processor_data
79            .wait_until_processed(asset_path.clone())
80            .await;
81        match process_result {
82            ProcessStatus::Processed => {}
83            ProcessStatus::Failed | ProcessStatus::NonExistent => {
84                return Err(AssetReaderError::NotFound(path.to_owned()));
85            }
86        }
87        trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
88        let lock = self.get_transaction_lock(&asset_path).await?;
89        let meta_reader = self.reader.read_meta(path).await?;
90        let reader = TransactionLockedReader::new(meta_reader, lock);
91        Ok(reader)
92    }
93
94    async fn read_directory<'a>(
95        &'a self,
96        path: &'a Path,
97    ) -> Result<Box<PathStream>, AssetReaderError> {
98        trace!(
99            "Waiting for processing to finish before reading directory {:?}",
100            path
101        );
102        self.processor_data.wait_until_finished().await;
103        trace!("Processing finished, reading directory {:?}", path);
104        let result = self.reader.read_directory(path).await?;
105        Ok(result)
106    }
107
108    async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
109        trace!(
110            "Waiting for processing to finish before reading directory {:?}",
111            path
112        );
113        self.processor_data.wait_until_finished().await;
114        trace!("Processing finished, getting directory status {:?}", path);
115        let result = self.reader.is_directory(path).await?;
116        Ok(result)
117    }
118}
119
120/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.
121pub struct TransactionLockedReader<'a> {
122    reader: Box<dyn Reader + 'a>,
123    _file_transaction_lock: RwLockReadGuardArc<()>,
124}
125
126impl<'a> TransactionLockedReader<'a> {
127    fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
128        Self {
129            reader,
130            _file_transaction_lock: file_transaction_lock,
131        }
132    }
133}
134
135impl AsyncRead for TransactionLockedReader<'_> {
136    fn poll_read(
137        mut self: Pin<&mut Self>,
138        cx: &mut core::task::Context<'_>,
139        buf: &mut [u8],
140    ) -> Poll<futures_io::Result<usize>> {
141        Pin::new(&mut self.reader).poll_read(cx, buf)
142    }
143}
144
145impl AsyncSeekForward for TransactionLockedReader<'_> {
146    fn poll_seek_forward(
147        mut self: Pin<&mut Self>,
148        cx: &mut core::task::Context<'_>,
149        offset: u64,
150    ) -> Poll<std::io::Result<u64>> {
151        Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
152    }
153}
154
155impl Reader for TransactionLockedReader<'_> {
156    fn read_to_end<'a>(
157        &'a mut self,
158        buf: &'a mut Vec<u8>,
159    ) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
160        self.reader.read_to_end(buf)
161    }
162}