bevy_asset/processor/
log.rsuse crate::AssetPath;
use async_fs::File;
use bevy_utils::{tracing::error, HashSet};
use derive_more::derive::{Display, Error, From};
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use std::path::PathBuf;
#[derive(Debug)]
pub(crate) enum LogEntry {
BeginProcessing(AssetPath<'static>),
EndProcessing(AssetPath<'static>),
UnrecoverableError,
}
pub struct ProcessorTransactionLog {
log_file: File,
}
#[derive(Error, Display, Debug, From)]
pub enum ReadLogError {
#[display("Encountered an invalid log line: '{_0}'")]
#[error(ignore)]
InvalidLine(String),
#[display("Failed to read log file: {_0}")]
Io(futures_io::Error),
}
#[derive(Error, Display, Debug)]
#[display(
"Failed to write {log_entry:?} to the asset processor log. This is not recoverable. {error}"
)]
pub struct WriteLogError {
log_entry: LogEntry,
error: futures_io::Error,
}
#[derive(Error, Display, Debug, From)]
pub enum ValidateLogError {
#[display("Encountered an unrecoverable error. All assets will be reprocessed.")]
UnrecoverableError,
ReadLogError(ReadLogError),
#[display("Encountered a duplicate process asset transaction: {_0:?}")]
#[error(ignore)]
EntryErrors(Vec<LogEntryError>),
}
#[derive(Error, Display, Debug)]
pub enum LogEntryError {
#[display("Encountered a duplicate process asset transaction: {_0}")]
#[error(ignore)]
DuplicateTransaction(AssetPath<'static>),
#[display("A transaction was ended that never started {_0}")]
#[error(ignore)]
EndedMissingTransaction(AssetPath<'static>),
#[display("An asset started processing but never finished: {_0}")]
#[error(ignore)]
UnfinishedTransaction(AssetPath<'static>),
}
const LOG_PATH: &str = "imported_assets/log";
const ENTRY_BEGIN: &str = "Begin ";
const ENTRY_END: &str = "End ";
const UNRECOVERABLE_ERROR: &str = "UnrecoverableError";
impl ProcessorTransactionLog {
fn full_log_path() -> PathBuf {
#[cfg(not(target_arch = "wasm32"))]
let base_path = crate::io::file::get_base_path();
#[cfg(target_arch = "wasm32")]
let base_path = PathBuf::new();
base_path.join(LOG_PATH)
}
pub(crate) async fn new() -> Result<Self, futures_io::Error> {
let path = Self::full_log_path();
match async_fs::remove_file(&path).await {
Ok(_) => { }
Err(err) => {
if err.kind() != futures_io::ErrorKind::NotFound {
error!("Failed to remove previous log file {}", err);
}
}
}
if let Some(parent_folder) = path.parent() {
async_fs::create_dir_all(parent_folder).await?;
}
Ok(Self {
log_file: File::create(path).await?,
})
}
pub(crate) async fn read() -> Result<Vec<LogEntry>, ReadLogError> {
let mut log_lines = Vec::new();
let mut file = match File::open(Self::full_log_path()).await {
Ok(file) => file,
Err(err) => {
if err.kind() == futures_io::ErrorKind::NotFound {
return Ok(log_lines);
}
return Err(err.into());
}
};
let mut string = String::new();
file.read_to_string(&mut string).await?;
for line in string.lines() {
if let Some(path_str) = line.strip_prefix(ENTRY_BEGIN) {
log_lines.push(LogEntry::BeginProcessing(
AssetPath::parse(path_str).into_owned(),
));
} else if let Some(path_str) = line.strip_prefix(ENTRY_END) {
log_lines.push(LogEntry::EndProcessing(
AssetPath::parse(path_str).into_owned(),
));
} else if line.is_empty() {
continue;
} else {
return Err(ReadLogError::InvalidLine(line.to_string()));
}
}
Ok(log_lines)
}
pub(crate) async fn validate() -> Result<(), ValidateLogError> {
let mut transactions: HashSet<AssetPath<'static>> = Default::default();
let mut errors: Vec<LogEntryError> = Vec::new();
let entries = Self::read().await?;
for entry in entries {
match entry {
LogEntry::BeginProcessing(path) => {
if !transactions.insert(path.clone()) {
errors.push(LogEntryError::DuplicateTransaction(path));
}
}
LogEntry::EndProcessing(path) => {
if !transactions.remove(&path) {
errors.push(LogEntryError::EndedMissingTransaction(path));
}
}
LogEntry::UnrecoverableError => return Err(ValidateLogError::UnrecoverableError),
}
}
for transaction in transactions {
errors.push(LogEntryError::UnfinishedTransaction(transaction));
}
if !errors.is_empty() {
return Err(ValidateLogError::EntryErrors(errors));
}
Ok(())
}
pub(crate) async fn begin_processing(
&mut self,
path: &AssetPath<'_>,
) -> Result<(), WriteLogError> {
self.write(&format!("{ENTRY_BEGIN}{path}\n"))
.await
.map_err(|e| WriteLogError {
log_entry: LogEntry::BeginProcessing(path.clone_owned()),
error: e,
})
}
pub(crate) async fn end_processing(
&mut self,
path: &AssetPath<'_>,
) -> Result<(), WriteLogError> {
self.write(&format!("{ENTRY_END}{path}\n"))
.await
.map_err(|e| WriteLogError {
log_entry: LogEntry::EndProcessing(path.clone_owned()),
error: e,
})
}
pub(crate) async fn unrecoverable(&mut self) -> Result<(), WriteLogError> {
self.write(UNRECOVERABLE_ERROR)
.await
.map_err(|e| WriteLogError {
log_entry: LogEntry::UnrecoverableError,
error: e,
})
}
async fn write(&mut self, line: &str) -> Result<(), futures_io::Error> {
self.log_file.write_all(line.as_bytes()).await?;
self.log_file.flush().await?;
Ok(())
}
}