bevy_asset/io/
processor_gated.rs

1use crate::{
2    io::{
3        AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader, ReaderNotSeekableError,
4        SeekableReader,
5    },
6    processor::{ProcessStatus, ProcessingState},
7    AssetPath,
8};
9use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
10use async_lock::RwLockReadGuardArc;
11use core::{pin::Pin, task::Poll};
12use futures_io::AsyncRead;
13use std::path::Path;
14use tracing::trace;
15
16use super::ErasedAssetReader;
17
18/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
19/// given path until that path has been processed by [`AssetProcessor`].
20///
21/// [`AssetProcessor`]: crate::processor::AssetProcessor
22pub(crate) struct ProcessorGatedReader {
23    reader: Arc<dyn ErasedAssetReader>,
24    source: AssetSourceId<'static>,
25    processing_state: Arc<ProcessingState>,
26}
27
28impl ProcessorGatedReader {
29    /// Creates a new [`ProcessorGatedReader`].
30    pub(crate) fn new(
31        source: AssetSourceId<'static>,
32        reader: Arc<dyn ErasedAssetReader>,
33        processing_state: Arc<ProcessingState>,
34    ) -> Self {
35        Self {
36            source,
37            reader,
38            processing_state,
39        }
40    }
41}
42
43impl AssetReader for ProcessorGatedReader {
44    async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
45        let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
46        trace!("Waiting for processing to finish before reading {asset_path}");
47        let process_result = self
48            .processing_state
49            .wait_until_processed(asset_path.clone())
50            .await;
51        match process_result {
52            ProcessStatus::Processed => {}
53            ProcessStatus::Failed | ProcessStatus::NonExistent => {
54                return Err(AssetReaderError::NotFound(path.to_owned()));
55            }
56        }
57        trace!("Processing finished with {asset_path}, reading {process_result:?}",);
58        let lock = self
59            .processing_state
60            .get_transaction_lock(&asset_path)
61            .await?;
62        let asset_reader = self.reader.read(path).await?;
63        let reader = TransactionLockedReader::new(asset_reader, lock);
64        Ok(reader)
65    }
66
67    async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
68        let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
69        trace!("Waiting for processing to finish before reading meta for {asset_path}",);
70        let process_result = self
71            .processing_state
72            .wait_until_processed(asset_path.clone())
73            .await;
74        match process_result {
75            ProcessStatus::Processed => {}
76            ProcessStatus::Failed | ProcessStatus::NonExistent => {
77                return Err(AssetReaderError::NotFound(path.to_owned()));
78            }
79        }
80        trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
81        let lock = self
82            .processing_state
83            .get_transaction_lock(&asset_path)
84            .await?;
85        let meta_reader = self.reader.read_meta(path).await?;
86        let reader = TransactionLockedReader::new(meta_reader, lock);
87        Ok(reader)
88    }
89
90    async fn read_directory<'a>(
91        &'a self,
92        path: &'a Path,
93    ) -> Result<Box<PathStream>, AssetReaderError> {
94        trace!(
95            "Waiting for processing to finish before reading directory {:?}",
96            path
97        );
98        self.processing_state.wait_until_finished().await;
99        trace!("Processing finished, reading directory {:?}", path);
100        let result = self.reader.read_directory(path).await?;
101        Ok(result)
102    }
103
104    async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
105        trace!(
106            "Waiting for processing to finish before reading directory {:?}",
107            path
108        );
109        self.processing_state.wait_until_finished().await;
110        trace!("Processing finished, getting directory status {:?}", path);
111        let result = self.reader.is_directory(path).await?;
112        Ok(result)
113    }
114}
115
116/// An [`AsyncRead`] impl that will hold its asset's transaction lock until [`TransactionLockedReader`] is dropped.
117pub struct TransactionLockedReader<'a> {
118    reader: Box<dyn Reader + 'a>,
119    _file_transaction_lock: RwLockReadGuardArc<()>,
120}
121
122impl<'a> TransactionLockedReader<'a> {
123    fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
124        Self {
125            reader,
126            _file_transaction_lock: file_transaction_lock,
127        }
128    }
129}
130
131impl AsyncRead for TransactionLockedReader<'_> {
132    fn poll_read(
133        mut self: Pin<&mut Self>,
134        cx: &mut core::task::Context<'_>,
135        buf: &mut [u8],
136    ) -> Poll<futures_io::Result<usize>> {
137        Pin::new(&mut self.reader).poll_read(cx, buf)
138    }
139}
140
141impl Reader for TransactionLockedReader<'_> {
142    fn read_to_end<'a>(
143        &'a mut self,
144        buf: &'a mut Vec<u8>,
145    ) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
146        self.reader.read_to_end(buf)
147    }
148
149    fn seekable(&mut self) -> Result<&mut dyn SeekableReader, ReaderNotSeekableError> {
150        self.reader.seekable()
151    }
152}