bevy_asset/processor/
log.rs1use crate::AssetPath;
2use async_fs::File;
3use bevy_utils::{tracing::error, HashSet};
4use derive_more::derive::{Display, Error, From};
5use futures_lite::{AsyncReadExt, AsyncWriteExt};
6use std::path::PathBuf;
7
8#[derive(Debug)]
10pub(crate) enum LogEntry {
11 BeginProcessing(AssetPath<'static>),
12 EndProcessing(AssetPath<'static>),
13 UnrecoverableError,
14}
15
16pub struct ProcessorTransactionLog {
23 log_file: File,
24}
25
26#[derive(Error, Display, Debug, From)]
28pub enum ReadLogError {
29 #[display("Encountered an invalid log line: '{_0}'")]
30 #[error(ignore)]
31 InvalidLine(String),
32 #[display("Failed to read log file: {_0}")]
33 Io(futures_io::Error),
34}
35
36#[derive(Error, Display, Debug)]
38#[display(
39 "Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
40)]
41pub struct WriteLogError {
42 log_entry: LogEntry,
43 error: futures_io::Error,
44}
45
46#[derive(Error, Display, Debug, From)]
48pub enum ValidateLogError {
49 #[display("Encountered an unrecoverable error. All assets will be reprocessed.")]
50 UnrecoverableError,
51 ReadLogError(ReadLogError),
52 #[display("Encountered a duplicate process asset transaction: {_0:?}")]
53 #[error(ignore)]
54 EntryErrors(Vec<LogEntryError>),
55}
56
57#[derive(Error, Display, Debug)]
59pub enum LogEntryError {
60 #[display("Encountered a duplicate process asset transaction: {_0}")]
61 #[error(ignore)]
62 DuplicateTransaction(AssetPath<'static>),
63 #[display("A transaction was ended that never started {_0}")]
64 #[error(ignore)]
65 EndedMissingTransaction(AssetPath<'static>),
66 #[display("An asset started processing but never finished: {_0}")]
67 #[error(ignore)]
68 UnfinishedTransaction(AssetPath<'static>),
69}
70
71const LOG_PATH: &str = "imported_assets/log";
72const ENTRY_BEGIN: &str = "Begin ";
73const ENTRY_END: &str = "End ";
74const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
75
76impl ProcessorTransactionLog {
77 fn full_log_path() -> PathBuf {
78 #[cfg(not(target_arch = "wasm32"))]
79 let base_path = crate::io::file::get_base_path();
80 #[cfg(target_arch = "wasm32")]
81 let base_path = PathBuf::new();
82 base_path.join(LOG_PATH)
83 }
84 pub(crate) async fn new() -> Result<Self, futures_io::Error> {
86 let path = Self::full_log_path();
87 match async_fs::remove_file(&path).await {
88 Ok(_) => { }
89 Err(err) => {
90 if err.kind() != futures_io::ErrorKind::NotFound {
92 error!("Failed to remove previous log file {}", err);
93 }
94 }
95 }
96
97 if let Some(parent_folder) = path.parent() {
98 async_fs::create_dir_all(parent_folder).await?;
99 }
100
101 Ok(Self {
102 log_file: File::create(path).await?,
103 })
104 }
105
106 pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
107 let mut log_lines = Vec::new();
108 let mut file = match File::open(Self::full_log_path()).await {
109 Ok(file) => file,
110 Err(err) => {
111 if err.kind() == futures_io::ErrorKind::NotFound {
112 return Ok(log_lines);
114 }
115 return Err(err.into());
116 }
117 };
118 let mut string = String::new();
119 file.read_to_string(&mut string).await?;
120 for line in string.lines() {
121 if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
122 log_lines.push(LogEntry::BeginProcessing(
123 AssetPath::parse(path_str).into_owned(),
124 ));
125 } else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
126 log_lines.push(LogEntry::EndProcessing(
127 AssetPath::parse(path_str).into_owned(),
128 ));
129 } else if line.is_empty() {
130 continue;
131 } else {
132 return Err(ReadLogError::InvalidLine(line.to_string()));
133 }
134 }
135 Ok(log_lines)
136 }
137
138 pub(crate) async fn validate() -> Result<(), ValidateLogError> {
139 let mut transactions: HashSet<AssetPath<'static>> = Default::default();
140 let mut errors: Vec<LogEntryError> = Vec::new();
141 let entries = Self::read().await?;
142 for entry in entries {
143 match entry {
144 LogEntry::BeginProcessing(path) => {
145 if !transactions.insert(path.clone()) {
150 errors.push(LogEntryError::DuplicateTransaction(path));
151 }
152 }
153 LogEntry::EndProcessing(path) => {
154 if !transactions.remove(&path) {
155 errors.push(LogEntryError::EndedMissingTransaction(path));
156 }
157 }
158 LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
159 }
160 }
161 for transaction in transactions {
162 errors.push(LogEntryError::UnfinishedTransaction(transaction));
163 }
164 if !errors.is_empty() {
165 return Err(ValidateLogError::EntryErrors(errors));
166 }
167 Ok(())
168 }
169
170 pub(crate) async fn begin_processing(
173 &mut self,
174 path: &AssetPath<'_>,
175 ) -> Result<(), WriteLogError> {
176 self.write(&format!("{ENTRY_BEGIN}{path}\n"))
177 .await
178 .map_err(|e| WriteLogError {
179 log_entry: LogEntry::BeginProcessing(path.clone_owned()),
180 error: e,
181 })
182 }
183
184 pub(crate) async fn end_processing(
186 &mut self,
187 path: &AssetPath<'_>,
188 ) -> Result<(), WriteLogError> {
189 self.write(&format!("{ENTRY_END}{path}\n"))
190 .await
191 .map_err(|e| WriteLogError {
192 log_entry: LogEntry::EndProcessing(path.clone_owned()),
193 error: e,
194 })
195 }
196
197 pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
200 self.write(UNRECOVERABLE_ERROR)
201 .await
202 .map_err(|e| WriteLogError {
203 log_entry: LogEntry::UnrecoverableError,
204 error: e,
205 })
206 }
207
208 async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
209 self.log_file.write_all(line.as_bytes()).await?;
210 self.log_file.flush().await?;
211 Ok(())
212 }
213}