bevy_asset/processor/
log.rs

1use crate::AssetPath;
2use async_fs::File;
3use bevy_utils::{tracing::error, HashSet};
4use derive_more::derive::{Display, Error, From};
5use futures_lite::{AsyncReadExt, AsyncWriteExt};
6use std::path::PathBuf;
7
8/// An in-memory representation of a single [`ProcessorTransactionLog`] entry.
9#[derive(Debug)]
10pub(crate) enum LogEntry {
11    BeginProcessing(AssetPath<'static>),
12    EndProcessing(AssetPath<'static>),
13    UnrecoverableError,
14}
15
16/// A "write ahead" logger that helps ensure asset importing is transactional.
17///
18/// Prior to processing an asset, we write to the log to indicate it has started
19/// After processing an asset, we write to the log to indicate it has finished.
20/// On startup, the log can be read to determine if any transactions were incomplete.
21// TODO: this should be a trait
22pub struct ProcessorTransactionLog {
23    log_file: File,
24}
25
26/// An error that occurs when reading from the [`ProcessorTransactionLog`] fails.
27#[derive(Error, Display, Debug, From)]
28pub enum ReadLogError {
29    #[display("Encountered an invalid log line: '{_0}'")]
30    #[error(ignore)]
31    InvalidLine(String),
32    #[display("Failed to read log file: {_0}")]
33    Io(futures_io::Error),
34}
35
36/// An error that occurs when writing to the [`ProcessorTransactionLog`] fails.
37#[derive(Error, Display, Debug)]
38#[display(
39    "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
40)]
41pub struct WriteLogError {
42    log_entry: LogEntry,
43    error: futures_io::Error,
44}
45
46/// An error that occurs when validating the [`ProcessorTransactionLog`] fails.
47#[derive(Error, Display, Debug, From)]
48pub enum ValidateLogError {
49    #[display("Encountered an unrecoverable error. All assets will be reprocessed.")]
50    UnrecoverableError,
51    ReadLogError(ReadLogError),
52    #[display("Encountered a duplicate process asset transaction: {_0:?}")]
53    #[error(ignore)]
54    EntryErrors(Vec<LogEntryError>),
55}
56
57/// An error that occurs when validating individual [`ProcessorTransactionLog`] entries.
58#[derive(Error, Display, Debug)]
59pub enum LogEntryError {
60    #[display("Encountered a duplicate process asset transaction: {_0}")]
61    #[error(ignore)]
62    DuplicateTransaction(AssetPath<'static>),
63    #[display("A transaction was ended that never started {_0}")]
64    #[error(ignore)]
65    EndedMissingTransaction(AssetPath<'static>),
66    #[display("An asset started processing but never finished: {_0}")]
67    #[error(ignore)]
68    UnfinishedTransaction(AssetPath<'static>),
69}
70
71const LOG_PATH: &str = "imported_assets/log";
72const ENTRY_BEGIN: &str = "Begin ";
73const ENTRY_END: &str = "End ";
74const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
75
76impl ProcessorTransactionLog {
77    fn full_log_path() -> PathBuf {
78        #[cfg(not(target_arch = "wasm32"))]
79        let base_path = crate::io::file::get_base_path();
80        #[cfg(target_arch = "wasm32")]
81        let base_path = PathBuf::new();
82        base_path.join(LOG_PATH)
83    }
84    /// Create a new, fresh log file. This will delete the previous log file if it exists.
85    pub(crate) async fn new() -> Result<Self, futures_io::Error> {
86        let path = Self::full_log_path();
87        match async_fs::remove_file(&path).await {
88            Ok(_) => { /* successfully removed file */ }
89            Err(err) => {
90                // if the log file is not found, we assume we are starting in a fresh (or good) state
91                if err.kind() != futures_io::ErrorKind::NotFound {
92                    error!("Failed to remove previous log file {}", err);
93                }
94            }
95        }
96
97        if let Some(parent_folder) = path.parent() {
98            async_fs::create_dir_all(parent_folder).await?;
99        }
100
101        Ok(Self {
102            log_file: File::create(path).await?,
103        })
104    }
105
106    pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
107        let mut log_lines = Vec::new();
108        let mut file = match File::open(Self::full_log_path()).await {
109            Ok(file) => file,
110            Err(err) => {
111                if err.kind() == futures_io::ErrorKind::NotFound {
112                    // if the log file doesn't exist, this is equivalent to an empty file
113                    return Ok(log_lines);
114                }
115                return Err(err.into());
116            }
117        };
118        let mut string = String::new();
119        file.read_to_string(&mut string).await?;
120        for line in string.lines() {
121            if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
122                log_lines.push(LogEntry::BeginProcessing(
123                    AssetPath::parse(path_str).into_owned(),
124                ));
125            } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
126                log_lines.push(LogEntry::EndProcessing(
127                    AssetPath::parse(path_str).into_owned(),
128                ));
129            } else if line.is_empty() {
130                continue;
131            } else {
132                return Err(ReadLogError::InvalidLine(line.to_string()));
133            }
134        }
135        Ok(log_lines)
136    }
137
138    pub(crate) async fn validate() -> Result<(), ValidateLogError> {
139        let mut transactions: HashSet<AssetPath<'static>> = Default::default();
140        let mut errors: Vec<LogEntryError> = Vec::new();
141        let entries = Self::read().await?;
142        for entry in entries {
143            match entry {
144                LogEntry::BeginProcessing(path) => {
145                    // There should never be duplicate "start transactions" in a log
146                    // Every start should be followed by:
147                    //    * nothing (if there was an abrupt stop)
148                    //    * an End (if the transaction was completed)
149                    if !transactions.insert(path.clone()) {
150                        errors.push(LogEntryError::DuplicateTransaction(path));
151                    }
152                }
153                LogEntry::EndProcessing(path) => {
154                    if !transactions.remove(&path) {
155                        errors.push(LogEntryError::EndedMissingTransaction(path));
156                    }
157                }
158                LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
159            }
160        }
161        for transaction in transactions {
162            errors.push(LogEntryError::UnfinishedTransaction(transaction));
163        }
164        if !errors.is_empty() {
165            return Err(ValidateLogError::EntryErrors(errors));
166        }
167        Ok(())
168    }
169
170    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`ProcessorTransactionLog::end_processing`],
171    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
172    pub(crate) async fn begin_processing(
173        &mut self,
174        path: &AssetPath<'_>,
175    ) -> Result<(), WriteLogError> {
176        self.write(&format!("{ENTRY_BEGIN}{path}\n"))
177            .await
178            .map_err(|e| WriteLogError {
179                log_entry: LogEntry::BeginProcessing(path.clone_owned()),
180                error: e,
181            })
182    }
183
184    /// Logs the end of an asset being successfully processed. See [`ProcessorTransactionLog::begin_processing`].
185    pub(crate) async fn end_processing(
186        &mut self,
187        path: &AssetPath<'_>,
188    ) -> Result<(), WriteLogError> {
189        self.write(&format!("{ENTRY_END}{path}\n"))
190            .await
191            .map_err(|e| WriteLogError {
192                log_entry: LogEntry::EndProcessing(path.clone_owned()),
193                error: e,
194            })
195    }
196
197    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
198    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
199    pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
200        self.write(UNRECOVERABLE_ERROR)
201            .await
202            .map_err(|e| WriteLogError {
203                log_entry: LogEntry::UnrecoverableError,
204                error: e,
205            })
206    }
207
208    async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
209        self.log_file.write_all(line.as_bytes()).await?;
210        self.log_file.flush().await?;
211        Ok(())
212    }
213}