bevy_asset/processor/
log.rs

1use crate::AssetPath;
2use alloc::{
3    boxed::Box,
4    format,
5    string::{String, ToString},
6    vec::Vec,
7};
8use async_fs::File;
9use bevy_ecs::error::BevyError;
10use bevy_platform::collections::HashSet;
11use bevy_tasks::BoxedFuture;
12use futures_lite::{AsyncReadExt, AsyncWriteExt};
13use std::path::PathBuf;
14use thiserror::Error;
15use tracing::error;
16
17/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
18#[derive(Debug)]
19pub enum LogEntry {
20    BeginProcessing(AssetPath<'static>),
21    EndProcessing(AssetPath<'static>),
22    UnrecoverableError,
23}
24
25/// A factory of [`ProcessorTransactionLog`] that handles the state before the log has been started.
26///
27/// This trait also assists in recovering from partial processing by fetching the previous state of
28/// the transaction log.
29pub trait ProcessorTransactionLogFactory: Send + Sync + 'static {
30    /// Reads all entries in a previous transaction log if present.
31    ///
32    /// If there is no previous transaction log, this method should return an empty Vec of entries.
33    fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>>;
34
35    /// Creates a new transaction log to write to.
36    ///
37    /// This should remove any previous entries if they exist.
38    fn create_new_log(
39        &self,
40    ) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>>;
41}
42
43/// A "write ahead" logger that helps ensure asset importing is transactional.
44///
45/// Prior to processing an asset, we write to the log to indicate it has started. After processing
46/// an asset, we write to the log to indicate it has finished. On startup, the log can be read
47/// through [`ProcessorTransactionLogFactory`] to determine if any transactions were incomplete.
48pub trait ProcessorTransactionLog: Send + Sync + 'static {
49    /// Logs the start of an asset being processed.
50    ///
51    /// If this is not followed at some point in the log by a closing
52    /// [`ProcessorTransactionLog::end_processing`], in the next run of the processor the asset
53    /// processing will be considered "incomplete" and it will be reprocessed.
54    fn begin_processing<'a>(
55        &'a mut self,
56        asset: &'a AssetPath<'_>,
57    ) -> BoxedFuture<'a, Result<(), BevyError>>;
58
59    /// Logs the end of an asset being successfully processed. See
60    /// [`ProcessorTransactionLog::begin_processing`].
61    fn end_processing<'a>(
62        &'a mut self,
63        asset: &'a AssetPath<'_>,
64    ) -> BoxedFuture<'a, Result<(), BevyError>>;
65
66    /// Logs an unrecoverable error.
67    ///
68    /// On the next run of the processor, all assets will be regenerated. This should only be used
69    /// as a last resort. Every call to this should be considered with scrutiny and ideally replaced
70    /// with something more granular.
71    fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>>;
72}
73
74/// Validate the previous state of the transaction log and determine any assets that need to be
75/// reprocessed.
76pub(crate) async fn validate_transaction_log(
77    log_factory: &dyn ProcessorTransactionLogFactory,
78) -> Result<(), ValidateLogError> {
79    let mut transactions: HashSet<AssetPath<'static>> = Default::default();
80    let mut errors: Vec<LogEntryError> = Vec::new();
81    let entries = log_factory
82        .read()
83        .await
84        .map_err(ValidateLogError::ReadLogError)?;
85    for entry in entries {
86        match entry {
87            LogEntry::BeginProcessing(path) => {
88                // There should never be duplicate "start transactions" in a log
89                // Every start should be followed by:
90                //    * nothing (if there was an abrupt stop)
91                //    * an End (if the transaction was completed)
92                if !transactions.insert(path.clone()) {
93                    errors.push(LogEntryError::DuplicateTransaction(path));
94                }
95            }
96            LogEntry::EndProcessing(path) => {
97                if !transactions.remove(&path) {
98                    errors.push(LogEntryError::EndedMissingTransaction(path));
99                }
100            }
101            LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
102        }
103    }
104    for transaction in transactions {
105        errors.push(LogEntryError::UnfinishedTransaction(transaction));
106    }
107    if !errors.is_empty() {
108        return Err(ValidateLogError::EntryErrors(errors));
109    }
110    Ok(())
111}
112
113/// A transaction log factory that uses a file as its storage.
114pub struct FileTransactionLogFactory {
115    /// The file path that the transaction log should write to.
116    pub file_path: PathBuf,
117}
118
119const LOG_PATH: &str = "imported_assets/log";
120
121impl Default for FileTransactionLogFactory {
122    fn default() -> Self {
123        #[cfg(not(target_arch = "wasm32"))]
124        let base_path = crate::io::file::get_base_path();
125        #[cfg(target_arch = "wasm32")]
126        let base_path = PathBuf::new();
127        let file_path = base_path.join(LOG_PATH);
128        Self { file_path }
129    }
130}
131
132impl ProcessorTransactionLogFactory for FileTransactionLogFactory {
133    fn read(&self) -> BoxedFuture<'_, Result<Vec<LogEntry>, BevyError>> {
134        let path = self.file_path.clone();
135        Box::pin(async move {
136            let mut log_lines = Vec::new();
137            let mut file = match File::open(path).await {
138                Ok(file) => file,
139                Err(err) => {
140                    if err.kind() == futures_io::ErrorKind::NotFound {
141                        // if the log file doesn't exist, this is equivalent to an empty file
142                        return Ok(log_lines);
143                    }
144                    return Err(err.into());
145                }
146            };
147            let mut string = String::new();
148            file.read_to_string(&mut string).await?;
149            for line in string.lines() {
150                if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
151                    log_lines.push(LogEntry::BeginProcessing(
152                        AssetPath::parse(path_str).into_owned(),
153                    ));
154                } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
155                    log_lines.push(LogEntry::EndProcessing(
156                        AssetPath::parse(path_str).into_owned(),
157                    ));
158                } else if line.is_empty() {
159                    continue;
160                } else {
161                    return Err(ReadLogError::InvalidLine(line.to_string()).into());
162                }
163            }
164            Ok(log_lines)
165        })
166    }
167
168    fn create_new_log(
169        &self,
170    ) -> BoxedFuture<'_, Result<Box<dyn ProcessorTransactionLog>, BevyError>> {
171        let path = self.file_path.clone();
172        Box::pin(async move {
173            match async_fs::remove_file(&path).await {
174                Ok(_) => { /* successfully removed file */ }
175                Err(err) => {
176                    // if the log file is not found, we assume we are starting in a fresh (or good) state
177                    if err.kind() != futures_io::ErrorKind::NotFound {
178                        error!("Failed to remove previous log file {}", err);
179                    }
180                }
181            }
182
183            if let Some(parent_folder) = path.parent() {
184                async_fs::create_dir_all(parent_folder).await?;
185            }
186
187            Ok(Box::new(FileProcessorTransactionLog {
188                log_file: File::create(path).await?,
189            }) as _)
190        })
191    }
192}
193
194/// A "write ahead" logger that helps ensure asset importing is transactional.
195///
196/// Prior to processing an asset, we write to the log to indicate it has started
197/// After processing an asset, we write to the log to indicate it has finished.
198/// On startup, the log can be read to determine if any transactions were incomplete.
199struct FileProcessorTransactionLog {
200    /// The file to write logs to.
201    log_file: File,
202}
203
204impl FileProcessorTransactionLog {
205    /// Write `line` to the file and flush it.
206    async fn write(&mut self, line: &str) -> Result<(), BevyError> {
207        self.log_file.write_all(line.as_bytes()).await?;
208        self.log_file.flush().await?;
209        Ok(())
210    }
211}
212
213const ENTRY_BEGIN: &str = "Begin ";
214const ENTRY_END: &str = "End ";
215const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
216
217impl ProcessorTransactionLog for FileProcessorTransactionLog {
218    fn begin_processing<'a>(
219        &'a mut self,
220        asset: &'a AssetPath<'_>,
221    ) -> BoxedFuture<'a, Result<(), BevyError>> {
222        Box::pin(async move { self.write(&format!("{ENTRY_BEGIN}{asset}\n")).await })
223    }
224
225    fn end_processing<'a>(
226        &'a mut self,
227        asset: &'a AssetPath<'_>,
228    ) -> BoxedFuture<'a, Result<(), BevyError>> {
229        Box::pin(async move { self.write(&format!("{ENTRY_END}{asset}\n")).await })
230    }
231
232    fn unrecoverable(&mut self) -> BoxedFuture<'_, Result<(), BevyError>> {
233        Box::pin(async move { self.write(UNRECOVERABLE_ERROR).await })
234    }
235}
236
237/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
238#[derive(Error, Debug)]
239pub enum ReadLogError {
240    /// An invalid log line was encountered, consisting of the contained string.
241    #[error("Encountered an invalid log line: '{0}'")]
242    InvalidLine(String),
243    /// A file-system-based error occurred while reading the log file.
244    #[error("Failed to read log file: {0}")]
245    Io(#[from] futures_io::Error),
246}
247
248/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
249#[derive(Error, Debug)]
250#[error(
251    "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
252)]
253pub(crate) struct WriteLogError {
254    pub(crate) log_entry: LogEntry,
255    pub(crate) error: BevyError,
256}
257
258/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
259#[derive(Error, Debug)]
260pub enum ValidateLogError {
261    /// An error that could not be recovered from. All assets will be reprocessed.
262    #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
263    UnrecoverableError,
264    /// A [`ReadLogError`].
265    #[error("Failed to read log entries: {0}")]
266    ReadLogError(BevyError),
267    /// Duplicated process asset transactions occurred.
268    #[error("Encountered a duplicate process asset transaction: {0:?}")]
269    EntryErrors(Vec<LogEntryError>),
270}
271
272/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
273#[derive(Error, Debug)]
274pub enum LogEntryError {
275    /// A duplicate process asset transaction occurred for the given asset path.
276    #[error("Encountered a duplicate process asset transaction: {0}")]
277    DuplicateTransaction(AssetPath<'static>),
278    /// A transaction was ended that never started for the given asset path.
279    #[error("A transaction was ended that never started {0}")]
280    EndedMissingTransaction(AssetPath<'static>),
281    /// An asset started processing but never finished at the given asset path.
282    #[error("An asset started processing but never finished: {0}")]
283    UnfinishedTransaction(AssetPath<'static>),
284}