bevy_asset/io/
processor_gated.rs1use crate::{
2 io::{
3 AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader, ReaderNotSeekableError,
4 SeekableReader,
5 },
6 processor::{ProcessStatus, ProcessingState},
7 AssetPath,
8};
9use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec};
10use async_lock::RwLockReadGuardArc;
11use core::{pin::Pin, task::Poll};
12use futures_io::AsyncRead;
13use std::path::Path;
14use tracing::trace;
15
16use super::ErasedAssetReader;
17
18pub(crate) struct ProcessorGatedReader {
23 reader: Arc<dyn ErasedAssetReader>,
24 source: AssetSourceId<'static>,
25 processing_state: Arc<ProcessingState>,
26}
27
28impl ProcessorGatedReader {
29 pub(crate) fn new(
31 source: AssetSourceId<'static>,
32 reader: Arc<dyn ErasedAssetReader>,
33 processing_state: Arc<ProcessingState>,
34 ) -> Self {
35 Self {
36 source,
37 reader,
38 processing_state,
39 }
40 }
41}
42
43impl AssetReader for ProcessorGatedReader {
44 async fn read<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
45 let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
46 trace!("Waiting for processing to finish before reading {asset_path}");
47 let process_result = self
48 .processing_state
49 .wait_until_processed(asset_path.clone())
50 .await;
51 match process_result {
52 ProcessStatus::Processed => {}
53 ProcessStatus::Failed | ProcessStatus::NonExistent => {
54 return Err(AssetReaderError::NotFound(path.to_owned()));
55 }
56 }
57 trace!("Processing finished with {asset_path}, reading {process_result:?}",);
58 let lock = self
59 .processing_state
60 .get_transaction_lock(&asset_path)
61 .await?;
62 let asset_reader = self.reader.read(path).await?;
63 let reader = TransactionLockedReader::new(asset_reader, lock);
64 Ok(reader)
65 }
66
67 async fn read_meta<'a>(&'a self, path: &'a Path) -> Result<impl Reader + 'a, AssetReaderError> {
68 let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone());
69 trace!("Waiting for processing to finish before reading meta for {asset_path}",);
70 let process_result = self
71 .processing_state
72 .wait_until_processed(asset_path.clone())
73 .await;
74 match process_result {
75 ProcessStatus::Processed => {}
76 ProcessStatus::Failed | ProcessStatus::NonExistent => {
77 return Err(AssetReaderError::NotFound(path.to_owned()));
78 }
79 }
80 trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",);
81 let lock = self
82 .processing_state
83 .get_transaction_lock(&asset_path)
84 .await?;
85 let meta_reader = self.reader.read_meta(path).await?;
86 let reader = TransactionLockedReader::new(meta_reader, lock);
87 Ok(reader)
88 }
89
90 async fn read_directory<'a>(
91 &'a self,
92 path: &'a Path,
93 ) -> Result<Box<PathStream>, AssetReaderError> {
94 trace!(
95 "Waiting for processing to finish before reading directory {:?}",
96 path
97 );
98 self.processing_state.wait_until_finished().await;
99 trace!("Processing finished, reading directory {:?}", path);
100 let result = self.reader.read_directory(path).await?;
101 Ok(result)
102 }
103
104 async fn is_directory<'a>(&'a self, path: &'a Path) -> Result<bool, AssetReaderError> {
105 trace!(
106 "Waiting for processing to finish before reading directory {:?}",
107 path
108 );
109 self.processing_state.wait_until_finished().await;
110 trace!("Processing finished, getting directory status {:?}", path);
111 let result = self.reader.is_directory(path).await?;
112 Ok(result)
113 }
114}
115
116pub struct TransactionLockedReader<'a> {
118 reader: Box<dyn Reader + 'a>,
119 _file_transaction_lock: RwLockReadGuardArc<()>,
120}
121
122impl<'a> TransactionLockedReader<'a> {
123 fn new(reader: Box<dyn Reader + 'a>, file_transaction_lock: RwLockReadGuardArc<()>) -> Self {
124 Self {
125 reader,
126 _file_transaction_lock: file_transaction_lock,
127 }
128 }
129}
130
131impl AsyncRead for TransactionLockedReader<'_> {
132 fn poll_read(
133 mut self: Pin<&mut Self>,
134 cx: &mut core::task::Context<'_>,
135 buf: &mut [u8],
136 ) -> Poll<futures_io::Result<usize>> {
137 Pin::new(&mut self.reader).poll_read(cx, buf)
138 }
139}
140
141impl Reader for TransactionLockedReader<'_> {
142 fn read_to_end<'a>(
143 &'a mut self,
144 buf: &'a mut Vec<u8>,
145 ) -> stackfuture::StackFuture<'a, std::io::Result<usize>, { super::STACK_FUTURE_SIZE }> {
146 self.reader.read_to_end(buf)
147 }
148
149 fn seekable(&mut self) -> Result<&mut dyn SeekableReader, ReaderNotSeekableError> {
150 self.reader.seekable()
151 }
152}