1mod log;
41mod process;
42
43pub use log::*;
44pub use process::*;
45
46use crate::{
47 io::{
48 AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
49 AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
50 MissingAssetSourceError,
51 },
52 meta::{
53 get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54 AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55 },
56 AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57 MissingAssetLoaderForExtensionError,
58};
59use alloc::{collections::VecDeque, sync::Arc};
60use bevy_ecs::prelude::*;
61use bevy_tasks::IoTaskPool;
62use bevy_utils::{
63 tracing::{debug, error, trace, warn},
64 HashMap, HashSet,
65};
66#[cfg(feature = "trace")]
67use bevy_utils::{
68 tracing::{info_span, instrument::Instrument},
69 ConditionalSendFuture,
70};
71use derive_more::derive::{Display, Error};
72use futures_io::ErrorKind;
73use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
74use parking_lot::RwLock;
75use std::path::{Path, PathBuf};
76
77#[derive(Resource, Clone)]
93pub struct AssetProcessor {
94 server: AssetServer,
95 pub(crate) data: Arc<AssetProcessorData>,
96}
97
98pub struct AssetProcessorData {
100 pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
101 log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
102 processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
103 default_processors: RwLock<HashMap<Box<str>, &'static str>>,
105 state: async_lock::RwLock<ProcessorState>,
106 sources: AssetSources,
107 initialized_sender: async_broadcast::Sender<()>,
108 initialized_receiver: async_broadcast::Receiver<()>,
109 finished_sender: async_broadcast::Sender<()>,
110 finished_receiver: async_broadcast::Receiver<()>,
111}
112
113impl AssetProcessor {
114 pub fn new(source: &mut AssetSourceBuilders) -> Self {
116 let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
117 let mut sources = source.build_sources(false, false);
119 sources.gate_on_processor(data.clone());
120 let server = AssetServer::new_with_meta_check(
121 sources,
122 AssetServerMode::Processed,
123 AssetMetaCheck::Always,
124 false,
125 );
126 Self { server, data }
127 }
128
129 pub fn data(&self) -> &Arc<AssetProcessorData> {
131 &self.data
132 }
133
134 pub fn server(&self) -> &AssetServer {
137 &self.server
138 }
139
140 async fn set_state(&self, state: ProcessorState) {
141 let mut state_guard = self.data.state.write().await;
142 let last_state = *state_guard;
143 *state_guard = state;
144 if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
145 self.data.finished_sender.broadcast(()).await.unwrap();
146 } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
147 self.data.initialized_sender.broadcast(()).await.unwrap();
148 }
149 }
150
151 pub async fn get_state(&self) -> ProcessorState {
153 *self.data.state.read().await
154 }
155
156 #[inline]
158 pub fn get_source<'a>(
159 &self,
160 id: impl Into<AssetSourceId<'a>>,
161 ) -> Result<&AssetSource, MissingAssetSourceError> {
162 self.data.sources.get(id.into())
163 }
164
165 #[inline]
166 pub fn sources(&self) -> &AssetSources {
167 &self.data.sources
168 }
169
170 async fn log_unrecoverable(&self) {
173 let mut log = self.data.log.write().await;
174 let log = log.as_mut().unwrap();
175 log.unrecoverable().await.unwrap();
176 }
177
178 async fn log_begin_processing(&self, path: &AssetPath<'_>) {
181 let mut log = self.data.log.write().await;
182 let log = log.as_mut().unwrap();
183 log.begin_processing(path).await.unwrap();
184 }
185
186 async fn log_end_processing(&self, path: &AssetPath<'_>) {
188 let mut log = self.data.log.write().await;
189 let log = log.as_mut().unwrap();
190 log.end_processing(path).await.unwrap();
191 }
192
193 pub fn start(_processor: Res<Self>) {
195 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
196 error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
197 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
198 {
199 let processor = _processor.clone();
200 std::thread::spawn(move || {
201 processor.process_assets();
202 bevy_tasks::block_on(processor.listen_for_source_change_events());
203 });
204 }
205 }
206
207 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
215 pub fn process_assets(&self) {
216 let start_time = std::time::Instant::now();
217 debug!("Processing Assets");
218 IoTaskPool::get().scope(|scope| {
219 scope.spawn(async move {
220 self.initialize().await.unwrap();
221 for source in self.sources().iter_processed() {
222 self.process_assets_internal(scope, source, PathBuf::from(""))
223 .await
224 .unwrap();
225 }
226 });
227 });
228 bevy_tasks::block_on(self.finish_processing_assets());
231 let end_time = std::time::Instant::now();
232 debug!("Processing finished in {:?}", end_time - start_time);
233 }
234
235 pub async fn listen_for_source_change_events(&self) {
238 debug!("Listening for changes to source assets");
239 loop {
240 let mut started_processing = false;
241
242 for source in self.data.sources.iter_processed() {
243 if let Some(receiver) = source.event_receiver() {
244 for event in receiver.try_iter() {
245 if !started_processing {
246 self.set_state(ProcessorState::Processing).await;
247 started_processing = true;
248 }
249
250 self.handle_asset_source_event(source, event).await;
251 }
252 }
253 }
254
255 if started_processing {
256 self.finish_processing_assets().await;
257 }
258 }
259 }
260
261 async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
262 trace!("{event:?}");
263 match event {
264 AssetSourceEvent::AddedAsset(path)
265 | AssetSourceEvent::AddedMeta(path)
266 | AssetSourceEvent::ModifiedAsset(path)
267 | AssetSourceEvent::ModifiedMeta(path) => {
268 self.process_asset(source, path).await;
269 }
270 AssetSourceEvent::RemovedAsset(path) => {
271 self.handle_removed_asset(source, path).await;
272 }
273 AssetSourceEvent::RemovedMeta(path) => {
274 self.handle_removed_meta(source, path).await;
275 }
276 AssetSourceEvent::AddedFolder(path) => {
277 self.handle_added_folder(source, path).await;
278 }
279 AssetSourceEvent::RemovedFolder(path) => {
283 self.handle_removed_folder(source, &path).await;
284 }
285 AssetSourceEvent::RenamedAsset { old, new } => {
286 if old == new {
289 self.process_asset(source, new).await;
290 } else {
291 self.handle_renamed_asset(source, old, new).await;
292 }
293 }
294 AssetSourceEvent::RenamedMeta { old, new } => {
295 if old == new {
298 self.process_asset(source, new).await;
299 } else {
300 debug!("Meta renamed from {old:?} to {new:?}");
301 let mut infos = self.data.asset_infos.write().await;
302 let new_asset_path = AssetPath::from(new).with_source(source.id());
305 let old_asset_path = AssetPath::from(old).with_source(source.id());
306 infos.check_reprocess_queue.push_back(old_asset_path);
307 infos.check_reprocess_queue.push_back(new_asset_path);
308 }
309 }
310 AssetSourceEvent::RenamedFolder { old, new } => {
311 if old == new {
314 self.handle_added_folder(source, new).await;
315 } else {
316 self.handle_removed_folder(source, &old).await;
319 self.handle_added_folder(source, new).await;
320 }
321 }
322 AssetSourceEvent::RemovedUnknown { path, is_meta } => {
323 let processed_reader = source.processed_reader().unwrap();
324 match processed_reader.is_directory(&path).await {
325 Ok(is_directory) => {
326 if is_directory {
327 self.handle_removed_folder(source, &path).await;
328 } else if is_meta {
329 self.handle_removed_meta(source, path).await;
330 } else {
331 self.handle_removed_asset(source, path).await;
332 }
333 }
334 Err(err) => {
335 match err {
336 AssetReaderError::NotFound(_) => {
337 }
339 AssetReaderError::Io(err) => {
340 error!(
341 "Path '{}' was removed, but the destination reader could not determine if it \
342 was a folder or a file due to the following error: {err}",
343 AssetPath::from_path(&path).with_source(source.id())
344 );
345 }
346 AssetReaderError::HttpError(status) => {
347 error!(
348 "Path '{}' was removed, but the destination reader could not determine if it \
349 was a folder or a file due to receiving an unexpected HTTP Status {status}",
350 AssetPath::from_path(&path).with_source(source.id())
351 );
352 }
353 }
354 }
355 }
356 }
357 }
358 }
359
360 async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
361 debug!(
362 "Folder {} was added. Attempting to re-process",
363 AssetPath::from_path(&path).with_source(source.id())
364 );
365 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
366 error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
367 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
368 IoTaskPool::get().scope(|scope| {
369 scope.spawn(async move {
370 self.process_assets_internal(scope, source, path)
371 .await
372 .unwrap();
373 });
374 });
375 }
376
377 async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
379 debug!(
384 "Meta for asset {:?} was removed. Attempting to re-process",
385 AssetPath::from_path(&path).with_source(source.id())
386 );
387 self.process_asset(source, path).await;
388 }
389
390 async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
392 debug!("Removing folder {:?} because source was removed", path);
393 let processed_reader = source.processed_reader().unwrap();
394 match processed_reader.read_directory(path).await {
395 Ok(mut path_stream) => {
396 while let Some(child_path) = path_stream.next().await {
397 self.handle_removed_asset(source, child_path).await;
398 }
399 }
400 Err(err) => match err {
401 AssetReaderError::NotFound(_err) => {
402 }
404 AssetReaderError::HttpError(status) => {
405 self.log_unrecoverable().await;
406 error!(
407 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
408 in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
409 );
410 }
411 AssetReaderError::Io(err) => {
412 self.log_unrecoverable().await;
413 error!(
414 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
415 in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
416 );
417 }
418 },
419 }
420 let processed_writer = source.processed_writer().unwrap();
421 if let Err(err) = processed_writer.remove_directory(path).await {
422 match err {
423 AssetWriterError::Io(err) => {
424 if err.kind() != ErrorKind::NotFound {
427 let asset_path = AssetPath::from_path(path).with_source(source.id());
428 error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
429 }
430 }
431 }
432 }
433 }
434
435 async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
438 let asset_path = AssetPath::from(path).with_source(source.id());
439 debug!("Removing processed {asset_path} because source was removed");
440 let mut infos = self.data.asset_infos.write().await;
441 if let Some(info) = infos.get(&asset_path) {
442 let _write_lock = info.file_transaction_lock.write();
445 self.remove_processed_asset_and_meta(source, asset_path.path())
446 .await;
447 }
448 infos.remove(&asset_path).await;
449 }
450
451 async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
454 let mut infos = self.data.asset_infos.write().await;
455 let old = AssetPath::from(old).with_source(source.id());
456 let new = AssetPath::from(new).with_source(source.id());
457 let processed_writer = source.processed_writer().unwrap();
458 if let Some(info) = infos.get(&old) {
459 let _write_lock = info.file_transaction_lock.write();
462 processed_writer
463 .rename(old.path(), new.path())
464 .await
465 .unwrap();
466 processed_writer
467 .rename_meta(old.path(), new.path())
468 .await
469 .unwrap();
470 }
471 infos.rename(&old, &new).await;
472 }
473
474 async fn finish_processing_assets(&self) {
475 self.try_reprocessing_queued().await;
476 self.server.data.infos.write().consume_handle_drop_events();
478 self.set_state(ProcessorState::Finished).await;
479 }
480
481 #[allow(unused)]
482 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
483 async fn process_assets_internal<'scope>(
484 &'scope self,
485 scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
486 source: &'scope AssetSource,
487 path: PathBuf,
488 ) -> Result<(), AssetReaderError> {
489 if source.reader().is_directory(&path).await? {
490 let mut path_stream = source.reader().read_directory(&path).await?;
491 while let Some(path) = path_stream.next().await {
492 Box::pin(self.process_assets_internal(scope, source, path)).await?;
493 }
494 } else {
495 let processor = self.clone();
497 scope.spawn(async move {
498 processor.process_asset(source, path).await;
499 });
500 }
501 Ok(())
502 }
503
504 async fn try_reprocessing_queued(&self) {
505 loop {
506 let mut check_reprocess_queue =
507 core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
508 IoTaskPool::get().scope(|scope| {
509 for path in check_reprocess_queue.drain(..) {
510 let processor = self.clone();
511 let source = self.get_source(path.source()).unwrap();
512 scope.spawn(async move {
513 processor.process_asset(source, path.into()).await;
514 });
515 }
516 });
517 let infos = self.data.asset_infos.read().await;
518 if infos.check_reprocess_queue.is_empty() {
519 break;
520 }
521 }
522 }
523
524 pub fn register_processor<P: Process>(&self, processor: P) {
526 let mut process_plans = self.data.processors.write();
527 #[cfg(feature = "trace")]
528 let processor = InstrumentedAssetProcessor(processor);
529 process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
530 }
531
532 pub fn set_default_processor<P: Process>(&self, extension: &str) {
534 let mut default_processors = self.data.default_processors.write();
535 default_processors.insert(extension.into(), core::any::type_name::<P>());
536 }
537
538 pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
540 let default_processors = self.data.default_processors.read();
541 let key = default_processors.get(extension)?;
542 self.data.processors.read().get(key).cloned()
543 }
544
545 pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
547 let processors = self.data.processors.read();
548 processors.get(processor_type_name).cloned()
549 }
550
551 #[cfg_attr(
556 any(target_arch = "wasm32", not(feature = "multi_threaded")),
557 expect(
558 dead_code,
559 reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
560 )
561 )]
562 async fn initialize(&self) -> Result<(), InitializeError> {
563 self.validate_transaction_log_and_recover().await;
564 let mut asset_infos = self.data.asset_infos.write().await;
565
566 async fn get_asset_paths(
569 reader: &dyn ErasedAssetReader,
570 clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
571 path: PathBuf,
572 paths: &mut Vec<PathBuf>,
573 ) -> Result<bool, AssetReaderError> {
574 if reader.is_directory(&path).await? {
575 let mut path_stream = reader.read_directory(&path).await?;
576 let mut contains_files = false;
577
578 while let Some(child_path) = path_stream.next().await {
579 contains_files |= Box::pin(get_asset_paths(
580 reader,
581 clean_empty_folders_writer,
582 child_path,
583 paths,
584 ))
585 .await?;
586 }
587 if !contains_files && path.parent().is_some() {
588 if let Some(writer) = clean_empty_folders_writer {
589 let _ = writer.remove_empty_directory(&path).await;
591 }
592 }
593 Ok(contains_files)
594 } else {
595 paths.push(path);
596 Ok(true)
597 }
598 }
599
600 for source in self.sources().iter_processed() {
601 let Ok(processed_reader) = source.processed_reader() else {
602 continue;
603 };
604 let Ok(processed_writer) = source.processed_writer() else {
605 continue;
606 };
607 let mut unprocessed_paths = Vec::new();
608 get_asset_paths(
609 source.reader(),
610 None,
611 PathBuf::from(""),
612 &mut unprocessed_paths,
613 )
614 .await
615 .map_err(InitializeError::FailedToReadSourcePaths)?;
616
617 let mut processed_paths = Vec::new();
618 get_asset_paths(
619 processed_reader,
620 Some(processed_writer),
621 PathBuf::from(""),
622 &mut processed_paths,
623 )
624 .await
625 .map_err(InitializeError::FailedToReadDestinationPaths)?;
626
627 for path in unprocessed_paths {
628 asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
629 }
630
631 for path in processed_paths {
632 let mut dependencies = Vec::new();
633 let asset_path = AssetPath::from(path).with_source(source.id());
634 if let Some(info) = asset_infos.get_mut(&asset_path) {
635 match processed_reader.read_meta_bytes(asset_path.path()).await {
636 Ok(meta_bytes) => {
637 match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
638 Ok(minimal) => {
639 trace!(
640 "Populated processed info for asset {asset_path} {:?}",
641 minimal.processed_info
642 );
643
644 if let Some(processed_info) = &minimal.processed_info {
645 for process_dependency_info in
646 &processed_info.process_dependencies
647 {
648 dependencies.push(process_dependency_info.path.clone());
649 }
650 }
651 info.processed_info = minimal.processed_info;
652 }
653 Err(err) => {
654 trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
655 self.remove_processed_asset_and_meta(source, asset_path.path())
656 .await;
657 }
658 }
659 }
660 Err(err) => {
661 trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
662 self.remove_processed_asset_and_meta(source, asset_path.path())
663 .await;
664 }
665 }
666 } else {
667 trace!("Removing processed data for non-existent asset {asset_path}");
668 self.remove_processed_asset_and_meta(source, asset_path.path())
669 .await;
670 }
671
672 for dependency in dependencies {
673 asset_infos.add_dependent(&dependency, asset_path.clone());
674 }
675 }
676 }
677
678 self.set_state(ProcessorState::Processing).await;
679
680 Ok(())
681 }
682
683 async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
686 if let Err(err) = source.processed_writer().unwrap().remove(path).await {
687 warn!("Failed to remove non-existent asset {path:?}: {err}");
688 }
689
690 if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
691 warn!("Failed to remove non-existent meta {path:?}: {err}");
692 }
693
694 self.clean_empty_processed_ancestor_folders(source, path)
695 .await;
696 }
697
698 async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
699 if path.is_absolute() {
701 error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
702 return;
703 }
704 while let Some(parent) = path.parent() {
705 if parent == Path::new("") {
706 break;
707 }
708 if source
709 .processed_writer()
710 .unwrap()
711 .remove_empty_directory(parent)
712 .await
713 .is_err()
714 {
715 break;
717 }
718 }
719 }
720
721 async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
729 let asset_path = AssetPath::from(path).with_source(source.id());
730 let result = self.process_asset_internal(source, &asset_path).await;
731 let mut infos = self.data.asset_infos.write().await;
732 infos.finish_processing(asset_path, result).await;
733 }
734
735 async fn process_asset_internal(
736 &self,
737 source: &AssetSource,
738 asset_path: &AssetPath<'static>,
739 ) -> Result<ProcessResult, ProcessError> {
740 debug!("Processing {:?}", asset_path);
743 let server = &self.server;
744 let path = asset_path.path();
745 let reader = source.reader();
746
747 let reader_err = |err| ProcessError::AssetReaderError {
748 path: asset_path.clone(),
749 err,
750 };
751 let writer_err = |err| ProcessError::AssetWriterError {
752 path: asset_path.clone(),
753 err,
754 };
755
756 let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
758
759 let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
760 Ok(meta_bytes) => {
761 let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
762 ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
763 })?;
764 let (meta, processor) = match minimal.asset {
765 AssetActionMinimal::Load { loader } => {
766 let loader = server.get_asset_loader_with_type_name(&loader).await?;
767 let meta = loader.deserialize_meta(&meta_bytes)?;
768 (meta, None)
769 }
770 AssetActionMinimal::Process { processor } => {
771 let processor = self
772 .get_processor(&processor)
773 .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
774 let meta = processor.deserialize_meta(&meta_bytes)?;
775 (meta, Some(processor))
776 }
777 AssetActionMinimal::Ignore => {
778 return Ok(ProcessResult::Ignored);
779 }
780 };
781 (meta, meta_bytes, processor)
782 }
783 Err(AssetReaderError::NotFound(_path)) => {
784 let (meta, processor) = if let Some(processor) = asset_path
785 .get_full_extension()
786 .and_then(|ext| self.get_default_processor(&ext))
787 {
788 let meta = processor.default_meta();
789 (meta, Some(processor))
790 } else {
791 match server.get_path_asset_loader(asset_path.clone()).await {
792 Ok(loader) => (loader.default_meta(), None),
793 Err(MissingAssetLoaderForExtensionError { .. }) => {
794 let meta: Box<dyn AssetMetaDyn> =
795 Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
796 (meta, None)
797 }
798 }
799 };
800 let meta_bytes = meta.serialize();
801 source
803 .writer()?
804 .write_meta_bytes(path, &meta_bytes)
805 .await
806 .map_err(writer_err)?;
807 (meta, meta_bytes, processor)
808 }
809 Err(err) => {
810 return Err(ProcessError::ReadAssetMetaError {
811 path: asset_path.clone(),
812 err,
813 })
814 }
815 };
816
817 let processed_writer = source.processed_writer()?;
818
819 let mut asset_bytes = Vec::new();
820 byte_reader
821 .read_to_end(&mut asset_bytes)
822 .await
823 .map_err(|e| ProcessError::AssetReaderError {
824 path: asset_path.clone(),
825 err: AssetReaderError::Io(e.into()),
826 })?;
827
828 let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
832 let mut new_processed_info = ProcessedInfo {
833 hash: new_hash,
834 full_hash: new_hash,
835 process_dependencies: Vec::new(),
836 };
837
838 {
839 let infos = self.data.asset_infos.read().await;
840 if let Some(current_processed_info) = infos
841 .get(asset_path)
842 .and_then(|i| i.processed_info.as_ref())
843 {
844 if current_processed_info.hash == new_hash {
845 let mut dependency_changed = false;
846 for current_dep_info in ¤t_processed_info.process_dependencies {
847 let live_hash = infos
848 .get(¤t_dep_info.path)
849 .and_then(|i| i.processed_info.as_ref())
850 .map(|i| i.full_hash);
851 if live_hash != Some(current_dep_info.full_hash) {
852 dependency_changed = true;
853 break;
854 }
855 }
856 if !dependency_changed {
857 return Ok(ProcessResult::SkippedNotChanged);
858 }
859 }
860 }
861 }
862 let _transaction_lock = {
865 let mut infos = self.data.asset_infos.write().await;
866 let info = infos.get_or_insert(asset_path.clone());
867 info.file_transaction_lock.write_arc().await
868 };
869
870 self.log_begin_processing(asset_path).await;
874 if let Some(processor) = processor {
875 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
876 let mut processed_meta = {
877 let mut context =
878 ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
879 processor
880 .process(&mut context, source_meta, &mut *writer)
881 .await?
882 };
883
884 writer
885 .flush()
886 .await
887 .map_err(|e| ProcessError::AssetWriterError {
888 path: asset_path.clone(),
889 err: AssetWriterError::Io(e),
890 })?;
891
892 let full_hash = get_full_asset_hash(
893 new_hash,
894 new_processed_info
895 .process_dependencies
896 .iter()
897 .map(|i| i.full_hash),
898 );
899 new_processed_info.full_hash = full_hash;
900 *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
901 let meta_bytes = processed_meta.serialize();
902 processed_writer
903 .write_meta_bytes(path, &meta_bytes)
904 .await
905 .map_err(writer_err)?;
906 } else {
907 processed_writer
908 .write_bytes(path, &asset_bytes)
909 .await
910 .map_err(writer_err)?;
911 *source_meta.processed_info_mut() = Some(new_processed_info.clone());
912 let meta_bytes = source_meta.serialize();
913 processed_writer
914 .write_meta_bytes(path, &meta_bytes)
915 .await
916 .map_err(writer_err)?;
917 }
918 self.log_end_processing(asset_path).await;
919
920 Ok(ProcessResult::Processed(new_processed_info))
921 }
922
923 async fn validate_transaction_log_and_recover(&self) {
924 if let Err(err) = ProcessorTransactionLog::validate().await {
925 let state_is_valid = match err {
926 ValidateLogError::ReadLogError(err) => {
927 error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
928 false
929 }
930 ValidateLogError::UnrecoverableError => {
931 error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
932 false
933 }
934 ValidateLogError::EntryErrors(entry_errors) => {
935 let mut state_is_valid = true;
936 for entry_error in entry_errors {
937 match entry_error {
938 LogEntryError::DuplicateTransaction(_)
939 | LogEntryError::EndedMissingTransaction(_) => {
940 error!("{}", entry_error);
941 state_is_valid = false;
942 break;
943 }
944 LogEntryError::UnfinishedTransaction(path) => {
945 debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
946 let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
947 error!("Failed to remove asset {path:?}: {message}");
948 state_is_valid = false;
949 };
950 let Ok(source) = self.get_source(path.source()) else {
951 unrecoverable_err(&"AssetSource does not exist");
952 continue;
953 };
954 let Ok(processed_writer) = source.processed_writer() else {
955 unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
956 continue;
957 };
958
959 if let Err(err) = processed_writer.remove(path.path()).await {
960 match err {
961 AssetWriterError::Io(err) => {
962 if err.kind() != ErrorKind::NotFound {
964 unrecoverable_err(&err);
965 }
966 }
967 }
968 }
969 if let Err(err) = processed_writer.remove_meta(path.path()).await {
970 match err {
971 AssetWriterError::Io(err) => {
972 if err.kind() != ErrorKind::NotFound {
974 unrecoverable_err(&err);
975 }
976 }
977 }
978 }
979 }
980 }
981 }
982 state_is_valid
983 }
984 };
985
986 if !state_is_valid {
987 error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
988 for source in self.sources().iter_processed() {
989 let Ok(processed_writer) = source.processed_writer() else {
990 continue;
991 };
992 if let Err(err) = processed_writer
993 .remove_assets_in_directory(Path::new(""))
994 .await
995 {
996 panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
997 }
998 }
999 }
1000 }
1001 let mut log = self.data.log.write().await;
1002 *log = match ProcessorTransactionLog::new().await {
1003 Ok(log) => Some(log),
1004 Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1005 };
1006 }
1007}
1008
1009impl AssetProcessorData {
1010 pub fn new(source: AssetSources) -> Self {
1012 let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1013 let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1014 finished_sender.set_overflow(true);
1017 initialized_sender.set_overflow(true);
1018
1019 AssetProcessorData {
1020 sources: source,
1021 finished_sender,
1022 finished_receiver,
1023 initialized_sender,
1024 initialized_receiver,
1025 state: async_lock::RwLock::new(ProcessorState::Initializing),
1026 log: Default::default(),
1027 processors: Default::default(),
1028 asset_infos: Default::default(),
1029 default_processors: Default::default(),
1030 }
1031 }
1032
1033 pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1035 self.wait_until_initialized().await;
1036 let mut receiver = {
1037 let infos = self.asset_infos.write().await;
1038 let info = infos.get(&path);
1039 match info {
1040 Some(info) => match info.status {
1041 Some(result) => return result,
1042 None => info.status_receiver.clone(),
1044 },
1045 None => return ProcessStatus::NonExistent,
1046 }
1047 };
1048 receiver.recv().await.unwrap()
1049 }
1050
1051 pub async fn wait_until_initialized(&self) {
1053 let receiver = {
1054 let state = self.state.read().await;
1055 match *state {
1056 ProcessorState::Initializing => {
1057 Some(self.initialized_receiver.clone())
1059 }
1060 _ => None,
1061 }
1062 };
1063
1064 if let Some(mut receiver) = receiver {
1065 receiver.recv().await.unwrap();
1066 }
1067 }
1068
1069 pub async fn wait_until_finished(&self) {
1071 let receiver = {
1072 let state = self.state.read().await;
1073 match *state {
1074 ProcessorState::Initializing | ProcessorState::Processing => {
1075 Some(self.finished_receiver.clone())
1077 }
1078 ProcessorState::Finished => None,
1079 }
1080 };
1081
1082 if let Some(mut receiver) = receiver {
1083 receiver.recv().await.unwrap();
1084 }
1085 }
1086}
1087
1088#[cfg(feature = "trace")]
1089struct InstrumentedAssetProcessor<T>(T);
1090
1091#[cfg(feature = "trace")]
1092impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1093 type Settings = T::Settings;
1094 type OutputLoader = T::OutputLoader;
1095
1096 fn process(
1097 &self,
1098 context: &mut ProcessContext,
1099 meta: AssetMeta<(), Self>,
1100 writer: &mut crate::io::Writer,
1101 ) -> impl ConditionalSendFuture<
1102 Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1103 > {
1104 let meta = AssetMeta {
1106 meta_format_version: meta.meta_format_version,
1107 processed_info: meta.processed_info,
1108 asset: meta.asset,
1109 };
1110 let span = info_span!(
1111 "asset processing",
1112 processor = core::any::type_name::<T>(),
1113 asset = context.path().to_string(),
1114 );
1115 self.0.process(context, meta, writer).instrument(span)
1116 }
1117}
1118
1119#[derive(Debug, Clone)]
1121pub enum ProcessResult {
1122 Processed(ProcessedInfo),
1123 SkippedNotChanged,
1124 Ignored,
1125}
1126
1127#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1129pub enum ProcessStatus {
1130 Processed,
1131 Failed,
1132 NonExistent,
1133}
1134
1135#[derive(Debug)]
1137pub(crate) struct ProcessorAssetInfo {
1138 processed_info: Option<ProcessedInfo>,
1139 dependents: HashSet<AssetPath<'static>>,
1141 status: Option<ProcessStatus>,
1142 pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1152 status_sender: async_broadcast::Sender<ProcessStatus>,
1153 status_receiver: async_broadcast::Receiver<ProcessStatus>,
1154}
1155
1156impl Default for ProcessorAssetInfo {
1157 fn default() -> Self {
1158 let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1159 status_sender.set_overflow(true);
1162 Self {
1163 processed_info: Default::default(),
1164 dependents: Default::default(),
1165 file_transaction_lock: Default::default(),
1166 status: None,
1167 status_sender,
1168 status_receiver,
1169 }
1170 }
1171}
1172
1173impl ProcessorAssetInfo {
1174 async fn update_status(&mut self, status: ProcessStatus) {
1175 if self.status != Some(status) {
1176 self.status = Some(status);
1177 self.status_sender.broadcast(status).await.unwrap();
1178 }
1179 }
1180}
1181
1182#[derive(Default, Debug)]
1186pub struct ProcessorAssetInfos {
1187 infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1192 non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1197 check_reprocess_queue: VecDeque<AssetPath<'static>>,
1198}
1199
1200impl ProcessorAssetInfos {
1201 fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1202 self.infos.entry(asset_path.clone()).or_insert_with(|| {
1203 let mut info = ProcessorAssetInfo::default();
1204 if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1206 info.dependents = dependents;
1207 }
1208 info
1209 })
1210 }
1211
1212 pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1213 self.infos.get(asset_path)
1214 }
1215
1216 fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1217 self.infos.get_mut(asset_path)
1218 }
1219
1220 fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1221 if let Some(info) = self.get_mut(asset_path) {
1222 info.dependents.insert(dependent);
1223 } else {
1224 let dependents = self
1225 .non_existent_dependents
1226 .entry(asset_path.clone())
1227 .or_default();
1228 dependents.insert(dependent);
1229 }
1230 }
1231
1232 async fn finish_processing(
1234 &mut self,
1235 asset_path: AssetPath<'static>,
1236 result: Result<ProcessResult, ProcessError>,
1237 ) {
1238 match result {
1239 Ok(ProcessResult::Processed(processed_info)) => {
1240 debug!("Finished processing \"{:?}\"", asset_path);
1241 let old_processed_info = self
1243 .infos
1244 .get_mut(&asset_path)
1245 .and_then(|i| i.processed_info.take());
1246 if let Some(old_processed_info) = old_processed_info {
1247 self.clear_dependencies(&asset_path, old_processed_info);
1248 }
1249
1250 for process_dependency_info in &processed_info.process_dependencies {
1252 self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1253 }
1254 let info = self.get_or_insert(asset_path);
1255 info.processed_info = Some(processed_info);
1256 info.update_status(ProcessStatus::Processed).await;
1257 let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1258 for path in dependents {
1259 self.check_reprocess_queue.push_back(path);
1260 }
1261 }
1262 Ok(ProcessResult::SkippedNotChanged) => {
1263 debug!("Skipping processing (unchanged) \"{:?}\"", asset_path);
1264 let info = self.get_mut(&asset_path).expect("info should exist");
1265 info.update_status(ProcessStatus::Processed).await;
1272 }
1273 Ok(ProcessResult::Ignored) => {
1274 debug!("Skipping processing (ignored) \"{:?}\"", asset_path);
1275 }
1276 Err(ProcessError::ExtensionRequired) => {
1277 }
1279 Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1280 trace!("No loader found for {asset_path}");
1281 }
1282 Err(ProcessError::AssetReaderError {
1283 err: AssetReaderError::NotFound(_),
1284 ..
1285 }) => {
1286 trace!("No need to process asset {asset_path} because it does not exist");
1288 }
1289 Err(err) => {
1290 error!("Failed to process asset {asset_path}: {err}");
1291 if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1293 err
1294 {
1295 let info = self.get_mut(&asset_path).expect("info should exist");
1296 info.processed_info = Some(ProcessedInfo {
1297 hash: AssetHash::default(),
1298 full_hash: AssetHash::default(),
1299 process_dependencies: vec![],
1300 });
1301 self.add_dependent(dependency.path(), asset_path.to_owned());
1302 }
1303
1304 let info = self.get_mut(&asset_path).expect("info should exist");
1305 info.update_status(ProcessStatus::Failed).await;
1306 }
1307 }
1308 }
1309
1310 async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1312 let info = self.infos.remove(asset_path);
1313 if let Some(info) = info {
1314 if let Some(processed_info) = info.processed_info {
1315 self.clear_dependencies(asset_path, processed_info);
1316 }
1317 info.status_sender
1319 .broadcast(ProcessStatus::NonExistent)
1320 .await
1321 .unwrap();
1322 if !info.dependents.is_empty() {
1323 error!(
1324 "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1325 info.dependents
1326 );
1327 self.non_existent_dependents
1328 .insert(asset_path.clone(), info.dependents);
1329 }
1330 }
1331 }
1332
1333 async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1335 let info = self.infos.remove(old);
1336 if let Some(mut info) = info {
1337 if !info.dependents.is_empty() {
1338 error!(
1346 "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1347 info.dependents
1348 );
1349 self.non_existent_dependents
1350 .insert(old.clone(), core::mem::take(&mut info.dependents));
1351 }
1352 if let Some(processed_info) = &info.processed_info {
1353 for dep in &processed_info.process_dependencies {
1355 if let Some(info) = self.infos.get_mut(&dep.path) {
1356 info.dependents.remove(old);
1357 info.dependents.insert(new.clone());
1358 } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1359 {
1360 dependents.remove(old);
1361 dependents.insert(new.clone());
1362 }
1363 }
1364 }
1365 info.status_sender
1367 .broadcast(ProcessStatus::NonExistent)
1368 .await
1369 .unwrap();
1370 let dependents: Vec<AssetPath<'static>> = {
1371 let new_info = self.get_or_insert(new.clone());
1372 new_info.processed_info = info.processed_info;
1373 new_info.status = info.status;
1374 if let Some(status) = new_info.status {
1376 new_info.status_sender.broadcast(status).await.unwrap();
1377 }
1378 new_info.dependents.iter().cloned().collect()
1379 };
1380 self.check_reprocess_queue.push_back(new.clone());
1382 for dependent in dependents {
1383 self.check_reprocess_queue.push_back(dependent);
1385 }
1386 }
1387 }
1388
1389 fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1390 for old_load_dep in removed_info.process_dependencies {
1391 if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1392 info.dependents.remove(asset_path);
1393 } else if let Some(dependents) =
1394 self.non_existent_dependents.get_mut(&old_load_dep.path)
1395 {
1396 dependents.remove(asset_path);
1397 }
1398 }
1399 }
1400}
1401
1402#[derive(Copy, Clone, PartialEq, Eq)]
1404pub enum ProcessorState {
1405 Initializing,
1409 Processing,
1411 Finished,
1413}
1414
1415#[derive(Error, Display, Debug)]
1417pub enum InitializeError {
1418 FailedToReadSourcePaths(AssetReaderError),
1419 FailedToReadDestinationPaths(AssetReaderError),
1420 #[display("Failed to validate asset log: {_0}")]
1421 ValidateLogError(ValidateLogError),
1422}