bevy_asset/processor/
log.rs

1use crate::AssetPath;
2use alloc::{
3    format,
4    string::{String, ToString},
5    vec::Vec,
6};
7use async_fs::File;
8use bevy_platform::collections::HashSet;
9use futures_lite::{AsyncReadExt, AsyncWriteExt};
10use std::path::PathBuf;
11use thiserror::Error;
12use tracing::error;
13
14/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
15#[derive(Debug)]
16pub(crate) enum LogEntry {
17    BeginProcessing(AssetPath<'static>),
18    EndProcessing(AssetPath<'static>),
19    UnrecoverableError,
20}
21
22/// A "write ahead" logger that helps ensure asset importing is transactional.
23///
24/// Prior to processing an asset, we write to the log to indicate it has started
25/// After processing an asset, we write to the log to indicate it has finished.
26/// On startup, the log can be read to determine if any transactions were incomplete.
27// TODO: this should be a trait
28pub struct ProcessorTransactionLog {
29    log_file: File,
30}
31
32/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
33#[derive(Error, Debug)]
34pub enum ReadLogError {
35    /// An invalid log line was encountered, consisting of the contained string.
36    #[error("Encountered an invalid log line: '{0}'")]
37    InvalidLine(String),
38    /// A file-system-based error occurred while reading the log file.
39    #[error("Failed to read log file: {0}")]
40    Io(#[from] futures_io::Error),
41}
42
43/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
44#[derive(Error, Debug)]
45#[error(
46    "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
47)]
48pub struct WriteLogError {
49    log_entry: LogEntry,
50    error: futures_io::Error,
51}
52
53/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
54#[derive(Error, Debug)]
55pub enum ValidateLogError {
56    /// An error that could not be recovered from. All assets will be reprocessed.
57    #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
58    UnrecoverableError,
59    /// A [`ReadLogError`].
60    #[error(transparent)]
61    ReadLogError(#[from] ReadLogError),
62    /// Duplicated process asset transactions occurred.
63    #[error("Encountered a duplicate process asset transaction: {0:?}")]
64    EntryErrors(Vec<LogEntryError>),
65}
66
67/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
68#[derive(Error, Debug)]
69pub enum LogEntryError {
70    /// A duplicate process asset transaction occurred for the given asset path.
71    #[error("Encountered a duplicate process asset transaction: {0}")]
72    DuplicateTransaction(AssetPath<'static>),
73    /// A transaction was ended that never started for the given asset path.
74    #[error("A transaction was ended that never started {0}")]
75    EndedMissingTransaction(AssetPath<'static>),
76    /// An asset started processing but never finished at the given asset path.
77    #[error("An asset started processing but never finished: {0}")]
78    UnfinishedTransaction(AssetPath<'static>),
79}
80
81const LOG_PATH: &str = "imported_assets/log";
82const ENTRY_BEGIN: &str = "Begin ";
83const ENTRY_END: &str = "End ";
84const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
85
86impl ProcessorTransactionLog {
87    fn full_log_path() -> PathBuf {
88        #[cfg(not(target_arch = "wasm32"))]
89        let base_path = crate::io::file::get_base_path();
90        #[cfg(target_arch = "wasm32")]
91        let base_path = PathBuf::new();
92        base_path.join(LOG_PATH)
93    }
94    /// Create a new, fresh log file. This will delete the previous log file if it exists.
95    pub(crate) async fn new() -> Result<Self, futures_io::Error> {
96        let path = Self::full_log_path();
97        match async_fs::remove_file(&path).await {
98            Ok(_) => { /* successfully removed file */ }
99            Err(err) => {
100                // if the log file is not found, we assume we are starting in a fresh (or good) state
101                if err.kind() != futures_io::ErrorKind::NotFound {
102                    error!("Failed to remove previous log file {}", err);
103                }
104            }
105        }
106
107        if let Some(parent_folder) = path.parent() {
108            async_fs::create_dir_all(parent_folder).await?;
109        }
110
111        Ok(Self {
112            log_file: File::create(path).await?,
113        })
114    }
115
116    pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
117        let mut log_lines = Vec::new();
118        let mut file = match File::open(Self::full_log_path()).await {
119            Ok(file) => file,
120            Err(err) => {
121                if err.kind() == futures_io::ErrorKind::NotFound {
122                    // if the log file doesn't exist, this is equivalent to an empty file
123                    return Ok(log_lines);
124                }
125                return Err(err.into());
126            }
127        };
128        let mut string = String::new();
129        file.read_to_string(&mut string).await?;
130        for line in string.lines() {
131            if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
132                log_lines.push(LogEntry::BeginProcessing(
133                    AssetPath::parse(path_str).into_owned(),
134                ));
135            } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
136                log_lines.push(LogEntry::EndProcessing(
137                    AssetPath::parse(path_str).into_owned(),
138                ));
139            } else if line.is_empty() {
140                continue;
141            } else {
142                return Err(ReadLogError::InvalidLine(line.to_string()));
143            }
144        }
145        Ok(log_lines)
146    }
147
148    pub(crate) async fn validate() -> Result<(), ValidateLogError> {
149        let mut transactions: HashSet<AssetPath<'static>> = Default::default();
150        let mut errors: Vec<LogEntryError> = Vec::new();
151        let entries = Self::read().await?;
152        for entry in entries {
153            match entry {
154                LogEntry::BeginProcessing(path) => {
155                    // There should never be duplicate "start transactions" in a log
156                    // Every start should be followed by:
157                    //    * nothing (if there was an abrupt stop)
158                    //    * an End (if the transaction was completed)
159                    if !transactions.insert(path.clone()) {
160                        errors.push(LogEntryError::DuplicateTransaction(path));
161                    }
162                }
163                LogEntry::EndProcessing(path) => {
164                    if !transactions.remove(&path) {
165                        errors.push(LogEntryError::EndedMissingTransaction(path));
166                    }
167                }
168                LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
169            }
170        }
171        for transaction in transactions {
172            errors.push(LogEntryError::UnfinishedTransaction(transaction));
173        }
174        if !errors.is_empty() {
175            return Err(ValidateLogError::EntryErrors(errors));
176        }
177        Ok(())
178    }
179
180    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`ProcessorTransactionLog::end_processing`],
181    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
182    pub(crate) async fn begin_processing(
183        &mut self,
184        path: &AssetPath<'_>,
185    ) -> Result<(), WriteLogError> {
186        self.write(&format!("{ENTRY_BEGIN}{path}\n"))
187            .await
188            .map_err(|e| WriteLogError {
189                log_entry: LogEntry::BeginProcessing(path.clone_owned()),
190                error: e,
191            })
192    }
193
194    /// Logs the end of an asset being successfully processed. See [`ProcessorTransactionLog::begin_processing`].
195    pub(crate) async fn end_processing(
196        &mut self,
197        path: &AssetPath<'_>,
198    ) -> Result<(), WriteLogError> {
199        self.write(&format!("{ENTRY_END}{path}\n"))
200            .await
201            .map_err(|e| WriteLogError {
202                log_entry: LogEntry::EndProcessing(path.clone_owned()),
203                error: e,
204            })
205    }
206
207    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
208    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
209    pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
210        self.write(UNRECOVERABLE_ERROR)
211            .await
212            .map_err(|e| WriteLogError {
213                log_entry: LogEntry::UnrecoverableError,
214                error: e,
215            })
216    }
217
218    async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
219        self.log_file.write_all(line.as_bytes()).await?;
220        self.log_file.flush().await?;
221        Ok(())
222    }
223}