bevy_asset/processor/
log.rs1use crate::AssetPath;
2use alloc::{
3 boxed::Box,
4 format,
5 string::{String, ToString},
6 vec::Vec,
7};
8use async_fs::File;
9use bevy_ecs::error::BevyError;
10use bevy_platform::collections::HashSet;
11use bevy_tasks::BoxedFuture;
12use futures_lite::{AsyncReadExt, AsyncWriteExt};
13use std::path::PathBuf;
14use thiserror::Error;
15use tracing::error;
16
17#[derive(Debug)]
19pub enum LogEntry {
20 BeginProcessing(AssetPath<'static>),
21 EndProcessing(AssetPath<'static>),
22 UnrecoverableError,
23}
24
25pub trait ProcessorTransactionLogFactory: Send + Sync + 'static {
30 fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>>;
34
35 fn create_new_log(
39 &self,
40 ) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>>;
41}
42
43pub trait ProcessorTransactionLog: Send + Sync + 'static {
49 fn begin_processing<'a>(
55 &'a mut self,
56 asset: &'a AssetPath<'_>,
57 ) -> BoxedFuture<'a, Result<(), BevyError>>;
58
59 fn end_processing<'a>(
62 &'a mut self,
63 asset: &'a AssetPath<'_>,
64 ) -> BoxedFuture<'a, Result<(), BevyError>>;
65
66 fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>>;
72}
73
74pub(crate) async fn validate_transaction_log(
77 log_factory: &dyn ProcessorTransactionLogFactory,
78) -> Result<(), ValidateLogError> {
79 let mut transactions: HashSet<AssetPath<'static>> = Default::default();
80 let mut errors: Vec<LogEntryError> = Vec::new();
81 let entries = log_factory
82 .read()
83 .await
84 .map_err(ValidateLogError::ReadLogError)?;
85 for entry in entries {
86 match entry {
87 LogEntry::BeginProcessing(path) => {
88 if !transactions.insert(path.clone()) {
93 errors.push(LogEntryError::DuplicateTransaction(path));
94 }
95 }
96 LogEntry::EndProcessing(path) => {
97 if !transactions.remove(&path) {
98 errors.push(LogEntryError::EndedMissingTransaction(path));
99 }
100 }
101 LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
102 }
103 }
104 for transaction in transactions {
105 errors.push(LogEntryError::UnfinishedTransaction(transaction));
106 }
107 if !errors.is_empty() {
108 return Err(ValidateLogError::EntryErrors(errors));
109 }
110 Ok(())
111}
112
113pub struct FileTransactionLogFactory {
115 pub file_path: PathBuf,
117}
118
119const LOG_PATH: &str = "imported_assets/log";
120
121impl Default for FileTransactionLogFactory {
122 fn default() -> Self {
123 #[cfg(not(target_arch = "wasm32"))]
124 let base_path = crate::io::file::get_base_path();
125 #[cfg(target_arch = "wasm32")]
126 let base_path = PathBuf::new();
127 let file_path = base_path.join(LOG_PATH);
128 Self { file_path }
129 }
130}
131
132impl ProcessorTransactionLogFactory for FileTransactionLogFactory {
133 fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>> {
134 let path = self.file_path.clone();
135 Box::pin(async move {
136 let mut log_lines = Vec::new();
137 let mut file = match File::open(path).await {
138 Ok(file) => file,
139 Err(err) => {
140 if err.kind() == futures_io::ErrorKind::NotFound {
141 return Ok(log_lines);
143 }
144 return Err(err.into());
145 }
146 };
147 let mut string = String::new();
148 file.read_to_string(&mut string).await?;
149 for line in string.lines() {
150 if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
151 log_lines.push(LogEntry::BeginProcessing(
152 AssetPath::parse(path_str).into_owned(),
153 ));
154 } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
155 log_lines.push(LogEntry::EndProcessing(
156 AssetPath::parse(path_str).into_owned(),
157 ));
158 } else if line.is_empty() {
159 continue;
160 } else {
161 return Err(ReadLogError::InvalidLine(line.to_string()).into());
162 }
163 }
164 Ok(log_lines)
165 })
166 }
167
168 fn create_new_log(
169 &self,
170 ) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>> {
171 let path = self.file_path.clone();
172 Box::pin(async move {
173 match async_fs::remove_file(&path).await {
174 Ok(_) => { }
175 Err(err) => {
176 if err.kind() != futures_io::ErrorKind::NotFound {
178 error!("Failed to remove previous log file {}", err);
179 }
180 }
181 }
182
183 if let Some(parent_folder) = path.parent() {
184 async_fs::create_dir_all(parent_folder).await?;
185 }
186
187 Ok(Box::new(FileProcessorTransactionLog {
188 log_file: File::create(path).await?,
189 }) as _)
190 })
191 }
192}
193
194struct FileProcessorTransactionLog {
200 log_file: File,
202}
203
204impl FileProcessorTransactionLog {
205 async fn write(&mut self, line: &str) -> Result<(), BevyError> {
207 self.log_file.write_all(line.as_bytes()).await?;
208 self.log_file.flush().await?;
209 Ok(())
210 }
211}
212
213const ENTRY_BEGIN: &str = "Begin ";
214const ENTRY_END: &str = "End ";
215const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
216
217impl ProcessorTransactionLog for FileProcessorTransactionLog {
218 fn begin_processing<'a>(
219 &'a mut self,
220 asset: &'a AssetPath<'_>,
221 ) -> BoxedFuture<'a, Result<(), BevyError>> {
222 Box::pin(async move { self.write(&format!("{ENTRY_BEGIN}{asset}\n")).await })
223 }
224
225 fn end_processing<'a>(
226 &'a mut self,
227 asset: &'a AssetPath<'_>,
228 ) -> BoxedFuture<'a, Result<(), BevyError>> {
229 Box::pin(async move { self.write(&format!("{ENTRY_END}{asset}\n")).await })
230 }
231
232 fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> {
233 Box::pin(async move { self.write(UNRECOVERABLE_ERROR).await })
234 }
235}
236
237#[derive(Error, Debug)]
239pub enum ReadLogError {
240 #[error("Encountered an invalid log line: '{0}'")]
242 InvalidLine(String),
243 #[error("Failed to read log file: {0}")]
245 Io(#[from] futures_io::Error),
246}
247
248#[derive(Error, Debug)]
250#[error(
251 "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
252)]
253pub(crate) struct WriteLogError {
254 pub(crate) log_entry: LogEntry,
255 pub(crate) error: BevyError,
256}
257
258#[derive(Error, Debug)]
260pub enum ValidateLogError {
261 #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
263 UnrecoverableError,
264 #[error("Failed to read log entries: {0}")]
266 ReadLogError(BevyError),
267 #[error("Encountered a duplicate process asset transaction: {0:?}")]
269 EntryErrors(Vec<LogEntryError>),
270}
271
272#[derive(Error, Debug)]
274pub enum LogEntryError {
275 #[error("Encountered a duplicate process asset transaction: {0}")]
277 DuplicateTransaction(AssetPath<'static>),
278 #[error("A transaction was ended that never started {0}")]
280 EndedMissingTransaction(AssetPath<'static>),
281 #[error("An asset started processing but never finished: {0}")]
283 UnfinishedTransaction(AssetPath<'static>),
284}