bevy_asset/processor/
log.rs1use crate::AssetPath;
2use alloc::{
3 format,
4 string::{String, ToString},
5 vec::Vec,
6};
7use async_fs::File;
8use bevy_platform::collections::HashSet;
9use futures_lite::{AsyncReadExt, AsyncWriteExt};
10use std::path::PathBuf;
11use thiserror::Error;
12use tracing::error;
13
14#[derive(Debug)]
16pub(crate) enum LogEntry {
17 BeginProcessing(AssetPath<'static>),
18 EndProcessing(AssetPath<'static>),
19 UnrecoverableError,
20}
21
22pub struct ProcessorTransactionLog {
29 log_file: File,
30}
31
32#[derive(Error, Debug)]
34pub enum ReadLogError {
35 #[error("Encountered an invalid log line: '{0}'")]
37 InvalidLine(String),
38 #[error("Failed to read log file: {0}")]
40 Io(#[from] futures_io::Error),
41}
42
43#[derive(Error, Debug)]
45#[error(
46 "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
47)]
48pub struct WriteLogError {
49 log_entry: LogEntry,
50 error: futures_io::Error,
51}
52
53#[derive(Error, Debug)]
55pub enum ValidateLogError {
56 #[error("Encountered an unrecoverable error. All assets will be reprocessed.")]
58 UnrecoverableError,
59 #[error(transparent)]
61 ReadLogError(#[from] ReadLogError),
62 #[error("Encountered a duplicate process asset transaction: {0:?}")]
64 EntryErrors(Vec<LogEntryError>),
65}
66
67#[derive(Error, Debug)]
69pub enum LogEntryError {
70 #[error("Encountered a duplicate process asset transaction: {0}")]
72 DuplicateTransaction(AssetPath<'static>),
73 #[error("A transaction was ended that never started {0}")]
75 EndedMissingTransaction(AssetPath<'static>),
76 #[error("An asset started processing but never finished: {0}")]
78 UnfinishedTransaction(AssetPath<'static>),
79}
80
81const LOG_PATH: &str = "imported_assets/log";
82const ENTRY_BEGIN: &str = "Begin ";
83const ENTRY_END: &str = "End ";
84const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
85
86impl ProcessorTransactionLog {
87 fn full_log_path() -> PathBuf {
88 #[cfg(not(target_arch = "wasm32"))]
89 let base_path = crate::io::file::get_base_path();
90 #[cfg(target_arch = "wasm32")]
91 let base_path = PathBuf::new();
92 base_path.join(LOG_PATH)
93 }
94 pub(crate) async fn new() -> Result<Self, futures_io::Error> {
96 let path = Self::full_log_path();
97 match async_fs::remove_file(&path).await {
98 Ok(_) => { }
99 Err(err) => {
100 if err.kind() != futures_io::ErrorKind::NotFound {
102 error!("Failed to remove previous log file {}", err);
103 }
104 }
105 }
106
107 if let Some(parent_folder) = path.parent() {
108 async_fs::create_dir_all(parent_folder).await?;
109 }
110
111 Ok(Self {
112 log_file: File::create(path).await?,
113 })
114 }
115
116 pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
117 let mut log_lines = Vec::new();
118 let mut file = match File::open(Self::full_log_path()).await {
119 Ok(file) => file,
120 Err(err) => {
121 if err.kind() == futures_io::ErrorKind::NotFound {
122 return Ok(log_lines);
124 }
125 return Err(err.into());
126 }
127 };
128 let mut string = String::new();
129 file.read_to_string(&mut string).await?;
130 for line in string.lines() {
131 if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
132 log_lines.push(LogEntry::BeginProcessing(
133 AssetPath::parse(path_str).into_owned(),
134 ));
135 } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
136 log_lines.push(LogEntry::EndProcessing(
137 AssetPath::parse(path_str).into_owned(),
138 ));
139 } else if line.is_empty() {
140 continue;
141 } else {
142 return Err(ReadLogError::InvalidLine(line.to_string()));
143 }
144 }
145 Ok(log_lines)
146 }
147
148 pub(crate) async fn validate() -> Result<(), ValidateLogError> {
149 let mut transactions: HashSet<AssetPath<'static>> = Default::default();
150 let mut errors: Vec<LogEntryError> = Vec::new();
151 let entries = Self::read().await?;
152 for entry in entries {
153 match entry {
154 LogEntry::BeginProcessing(path) => {
155 if !transactions.insert(path.clone()) {
160 errors.push(LogEntryError::DuplicateTransaction(path));
161 }
162 }
163 LogEntry::EndProcessing(path) => {
164 if !transactions.remove(&path) {
165 errors.push(LogEntryError::EndedMissingTransaction(path));
166 }
167 }
168 LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
169 }
170 }
171 for transaction in transactions {
172 errors.push(LogEntryError::UnfinishedTransaction(transaction));
173 }
174 if !errors.is_empty() {
175 return Err(ValidateLogError::EntryErrors(errors));
176 }
177 Ok(())
178 }
179
180 pub(crate) async fn begin_processing(
183 &mut self,
184 path: &AssetPath<'_>,
185 ) -> Result<(), WriteLogError> {
186 self.write(&format!("{ENTRY_BEGIN}{path}\n"))
187 .await
188 .map_err(|e| WriteLogError {
189 log_entry: LogEntry::BeginProcessing(path.clone_owned()),
190 error: e,
191 })
192 }
193
194 pub(crate) async fn end_processing(
196 &mut self,
197 path: &AssetPath<'_>,
198 ) -> Result<(), WriteLogError> {
199 self.write(&format!("{ENTRY_END}{path}\n"))
200 .await
201 .map_err(|e| WriteLogError {
202 log_entry: LogEntry::EndProcessing(path.clone_owned()),
203 error: e,
204 })
205 }
206
207 pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
210 self.write(UNRECOVERABLE_ERROR)
211 .await
212 .map_err(|e| WriteLogError {
213 log_entry: LogEntry::UnrecoverableError,
214 error: e,
215 })
216 }
217
218 async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
219 self.log_file.write_all(line.as_bytes()).await?;
220 self.log_file.flush().await?;
221 Ok(())
222 }
223}