1mod log;
41mod process;
42
43use async_lock::RwLockReadGuardArc;
44pub use log::*;
45pub use process::*;
46
47use crate::{
48 io::{
49 AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
50 AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
51 },
52 meta::{
53 get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54 AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55 },
56 AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57 MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::{
62 collections::{hash_map::Entry, HashMap, HashSet},
63 sync::{PoisonError, RwLock},
64};
65use bevy_tasks::IoTaskPool;
66use futures_io::ErrorKind;
67use futures_lite::{AsyncWriteExt, StreamExt};
68use futures_util::{select_biased, FutureExt};
69use std::{
70 path::{Path, PathBuf},
71 sync::Mutex,
72};
73use thiserror::Error;
74use tracing::{debug, error, trace, warn};
75
76#[cfg(feature = "trace")]
77use {
78 alloc::string::ToString,
79 bevy_reflect::TypePath,
80 bevy_tasks::ConditionalSendFuture,
81 tracing::{info_span, instrument::Instrument},
82};
83
84#[derive(Resource, Clone)]
100pub struct AssetProcessor {
101 server: AssetServer,
102 pub(crate) data: Arc<AssetProcessorData>,
103}
104
105pub struct AssetProcessorData {
107 pub(crate) processing_state: Arc<ProcessingState>,
109 log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
115 log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
116 processors: RwLock<Processors>,
118 sources: Arc<AssetSources>,
119}
120
121pub(crate) struct ProcessingState {
123 state: async_lock::RwLock<ProcessorState>,
125 initialized_sender: async_broadcast::Sender<()>,
127 initialized_receiver: async_broadcast::Receiver<()>,
128 finished_sender: async_broadcast::Sender<()>,
130 finished_receiver: async_broadcast::Receiver<()>,
131 asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
133}
134
135#[derive(Default)]
136struct Processors {
137 type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
139 short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
141 file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
144}
145
146enum ShortTypeProcessorEntry {
147 Unique {
149 type_path: &'static str,
151 processor: Arc<dyn ErasedProcessor>,
153 },
154 Ambiguous(Vec<&'static str>),
158}
159
160impl AssetProcessor {
161 pub fn new(
163 sources: &mut AssetSourceBuilders,
164 watch_processed: bool,
165 ) -> (Self, Arc<AssetSources>) {
166 let state = Arc::new(ProcessingState::new());
167 let mut sources = sources.build_sources(true, watch_processed);
168 sources.gate_on_processor(state.clone());
169 let sources = Arc::new(sources);
170
171 let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
172 let server = AssetServer::new_with_meta_check(
174 sources.clone(),
175 AssetServerMode::Processed,
176 AssetMetaCheck::Always,
177 false,
178 UnapprovedPathMode::default(),
179 );
180 (Self { server, data }, sources)
181 }
182
183 pub fn data(&self) -> &Arc<AssetProcessorData> {
185 &self.data
186 }
187
188 pub fn server(&self) -> &AssetServer {
191 &self.server
192 }
193
194 pub async fn get_state(&self) -> ProcessorState {
196 self.data.processing_state.get_state().await
197 }
198
199 #[inline]
201 pub fn get_source<'a>(
202 &self,
203 id: impl Into<AssetSourceId<'a>>,
204 ) -> Result<&AssetSource, MissingAssetSourceError> {
205 self.data.sources.get(id.into())
206 }
207
208 #[inline]
209 pub fn sources(&self) -> &AssetSources {
210 &self.data.sources
211 }
212
213 async fn log_unrecoverable(&self) {
216 let mut log = self.data.log.write().await;
217 let log = log.as_mut().unwrap();
218 log.unrecoverable()
219 .await
220 .map_err(|error| WriteLogError {
221 log_entry: LogEntry::UnrecoverableError,
222 error,
223 })
224 .unwrap();
225 }
226
227 async fn log_begin_processing(&self, path: &AssetPath<'_>) {
230 let mut log = self.data.log.write().await;
231 let log = log.as_mut().unwrap();
232 log.begin_processing(path)
233 .await
234 .map_err(|error| WriteLogError {
235 log_entry: LogEntry::BeginProcessing(path.clone_owned()),
236 error,
237 })
238 .unwrap();
239 }
240
241 async fn log_end_processing(&self, path: &AssetPath<'_>) {
243 let mut log = self.data.log.write().await;
244 let log = log.as_mut().unwrap();
245 log.end_processing(path)
246 .await
247 .map_err(|error| WriteLogError {
248 log_entry: LogEntry::EndProcessing(path.clone_owned()),
249 error,
250 })
251 .unwrap();
252 }
253
254 pub fn start(processor: Res<Self>) {
256 let processor = processor.clone();
257 IoTaskPool::get()
258 .spawn(async move {
259 let start_time = std::time::Instant::now();
260 debug!("Processing Assets");
261
262 processor.initialize().await.unwrap();
263
264 let (new_task_sender, new_task_receiver) = async_channel::unbounded();
265 processor
266 .queue_initial_processing_tasks(&new_task_sender)
267 .await;
268
269 {
272 let processor = processor.clone();
273 let new_task_sender = new_task_sender.clone();
274 IoTaskPool::get()
275 .spawn(async move {
276 processor
277 .execute_processing_tasks(new_task_sender, new_task_receiver)
278 .await;
279 })
280 .detach();
281 }
282
283 processor.data.wait_until_finished().await;
284
285 let end_time = std::time::Instant::now();
286 debug!("Processing finished in {:?}", end_time - start_time);
287
288 debug!("Listening for changes to source assets");
289 processor.spawn_source_change_event_listeners(&new_task_sender);
290 })
291 .detach();
292 }
293
294 async fn queue_initial_processing_tasks(
296 &self,
297 sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
298 ) {
299 for source in self.sources().iter_processed() {
300 self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
301 .await
302 .unwrap();
303 }
304 }
305
306 fn spawn_source_change_event_listeners(
309 &self,
310 sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
311 ) {
312 for source in self.data.sources.iter_processed() {
313 let Some(receiver) = source.event_receiver().cloned() else {
314 continue;
315 };
316 let source_id = source.id();
317 let processor = self.clone();
318 let sender = sender.clone();
319 IoTaskPool::get()
320 .spawn(async move {
321 while let Ok(event) = receiver.recv().await {
322 let Ok(source) = processor.get_source(source_id.clone()) else {
323 return;
324 };
325 processor
326 .handle_asset_source_event(source, event, &sender)
327 .await;
328 }
329 })
330 .detach();
331 }
332 }
333
334 async fn execute_processing_tasks(
341 &self,
342 new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
343 new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
344 ) {
345 let new_task_sender = {
349 let weak_sender = new_task_sender.downgrade();
350 drop(new_task_sender);
351 weak_sender
352 };
353
354 if new_task_receiver.is_empty() {
358 self.data
359 .processing_state
360 .set_state(ProcessorState::Finished)
361 .await;
362 }
363 enum ProcessorTaskEvent {
364 Start(AssetSourceId<'static>, PathBuf),
365 Finished,
366 }
367 let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
368
369 let mut pending_tasks = 0;
370 while let Ok(event) = {
371 select_biased! {
375 result = new_task_receiver.recv().fuse() => {
376 result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
377 },
378 result = task_finished_receiver.recv().fuse() => {
379 result.map(|()| ProcessorTaskEvent::Finished)
380 }
381 }
382 } {
383 match event {
384 ProcessorTaskEvent::Start(source_id, path) => {
385 let Some(new_task_sender) = new_task_sender.upgrade() else {
386 continue;
393 };
394 let processor = self.clone();
395 let task_finished_sender = task_finished_sender.clone();
396 pending_tasks += 1;
397 IoTaskPool::get()
398 .spawn(async move {
399 let Ok(source) = processor.get_source(source_id) else {
400 return;
401 };
402 processor.process_asset(source, path, new_task_sender).await;
403 let _ = task_finished_sender.send(()).await;
405 })
406 .detach();
407 self.data
408 .processing_state
409 .set_state(ProcessorState::Processing)
410 .await;
411 }
412 ProcessorTaskEvent::Finished => {
413 pending_tasks -= 1;
414 if pending_tasks == 0 {
415 self.server.write_infos().consume_handle_drop_events();
417 self.data
418 .processing_state
419 .set_state(ProcessorState::Finished)
420 .await;
421 }
422 }
423 }
424 }
425 }
426
427 pub async fn write_default_meta_file_for_path(
435 &self,
436 path: impl Into<AssetPath<'_>>,
437 ) -> Result<(), WriteDefaultMetaError> {
438 let path = path.into();
439 let Some(processor) = path
440 .get_full_extension()
441 .and_then(|extension| self.get_default_processor(&extension))
442 else {
443 return self
444 .server
445 .write_default_loader_meta_file_for_path(path)
446 .await;
447 };
448
449 let meta = processor.default_meta();
450 let serialized_meta = meta.serialize();
451
452 let source = self.get_source(path.source())?;
453
454 let reader = source.reader();
458 match reader.read_meta_bytes(path.path()).await {
459 Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
460 Err(AssetReaderError::NotFound(_)) => {
461 }
463 Err(AssetReaderError::Io(err)) => {
464 return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
465 }
466 Err(AssetReaderError::HttpError(err)) => {
467 return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
468 }
469 }
470
471 let writer = source.writer()?;
472 writer
473 .write_meta_bytes(path.path(), &serialized_meta)
474 .await?;
475
476 Ok(())
477 }
478
479 async fn handle_asset_source_event(
480 &self,
481 source: &AssetSource,
482 event: AssetSourceEvent,
483 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
484 ) {
485 trace!("{event:?}");
486 match event {
487 AssetSourceEvent::AddedAsset(path)
488 | AssetSourceEvent::AddedMeta(path)
489 | AssetSourceEvent::ModifiedAsset(path)
490 | AssetSourceEvent::ModifiedMeta(path) => {
491 let _ = new_task_sender.send((source.id(), path)).await;
492 }
493 AssetSourceEvent::RemovedAsset(path) => {
494 self.handle_removed_asset(source, path).await;
495 }
496 AssetSourceEvent::RemovedMeta(path) => {
497 self.handle_removed_meta(source, path, new_task_sender)
498 .await;
499 }
500 AssetSourceEvent::AddedFolder(path) => {
501 self.handle_added_folder(source, path, new_task_sender)
502 .await;
503 }
504 AssetSourceEvent::RemovedFolder(path) => {
508 self.handle_removed_folder(source, &path).await;
509 }
510 AssetSourceEvent::RenamedAsset { old, new } => {
511 if old == new {
514 let _ = new_task_sender.send((source.id(), new)).await;
515 } else {
516 self.handle_renamed_asset(source, old, new, new_task_sender)
517 .await;
518 }
519 }
520 AssetSourceEvent::RenamedMeta { old, new } => {
521 if old == new {
524 let _ = new_task_sender.send((source.id(), new)).await;
525 } else {
526 debug!("Meta renamed from {old:?} to {new:?}");
527 let _ = new_task_sender.send((source.id(), old)).await;
530 let _ = new_task_sender.send((source.id(), new)).await;
531 }
532 }
533 AssetSourceEvent::RenamedFolder { old, new } => {
534 if old == new {
537 self.handle_added_folder(source, new, new_task_sender).await;
538 } else {
539 self.handle_removed_folder(source, &old).await;
542 self.handle_added_folder(source, new, new_task_sender).await;
543 }
544 }
545 AssetSourceEvent::RemovedUnknown { path, is_meta } => {
546 let processed_reader = source.ungated_processed_reader().unwrap();
547 match processed_reader.is_directory(&path).await {
548 Ok(is_directory) => {
549 if is_directory {
550 self.handle_removed_folder(source, &path).await;
551 } else if is_meta {
552 self.handle_removed_meta(source, path, new_task_sender)
553 .await;
554 } else {
555 self.handle_removed_asset(source, path).await;
556 }
557 }
558 Err(err) => {
559 match err {
560 AssetReaderError::NotFound(_) => {
561 }
563 AssetReaderError::Io(err) => {
564 error!(
565 "Path '{}' was removed, but the destination reader could not determine if it \
566 was a folder or a file due to the following error: {err}",
567 AssetPath::from_path(&path).with_source(source.id())
568 );
569 }
570 AssetReaderError::HttpError(status) => {
571 error!(
572 "Path '{}' was removed, but the destination reader could not determine if it \
573 was a folder or a file due to receiving an unexpected HTTP Status {status}",
574 AssetPath::from_path(&path).with_source(source.id())
575 );
576 }
577 }
578 }
579 }
580 }
581 }
582 }
583
584 async fn handle_added_folder(
585 &self,
586 source: &AssetSource,
587 path: PathBuf,
588 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
589 ) {
590 debug!(
591 "Folder {} was added. Attempting to re-process",
592 AssetPath::from_path(&path).with_source(source.id())
593 );
594 self.queue_processing_tasks_for_folder(source, path, new_task_sender)
595 .await
596 .unwrap();
597 }
598
599 async fn handle_removed_meta(
601 &self,
602 source: &AssetSource,
603 path: PathBuf,
604 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
605 ) {
606 debug!(
611 "Meta for asset {} was removed. Attempting to re-process",
612 AssetPath::from_path(&path).with_source(source.id())
613 );
614 let _ = new_task_sender.send((source.id(), path)).await;
615 }
616
617 async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
619 debug!(
620 "Removing folder {} because source was removed",
621 path.display()
622 );
623 let processed_reader = source.ungated_processed_reader().unwrap();
624 match processed_reader.read_directory(path).await {
625 Ok(mut path_stream) => {
626 while let Some(child_path) = path_stream.next().await {
627 self.handle_removed_asset(source, child_path).await;
628 }
629 }
630 Err(err) => match err {
631 AssetReaderError::NotFound(_err) => {
632 }
634 AssetReaderError::HttpError(status) => {
635 self.log_unrecoverable().await;
636 error!(
637 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
638 in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
639 );
640 }
641 AssetReaderError::Io(err) => {
642 self.log_unrecoverable().await;
643 error!(
644 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
645 in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
646 );
647 }
648 },
649 }
650 let processed_writer = source.processed_writer().unwrap();
651 if let Err(err) = processed_writer.remove_directory(path).await {
652 match err {
653 AssetWriterError::Io(err) => {
654 if err.kind() != ErrorKind::NotFound {
657 let asset_path = AssetPath::from_path(path).with_source(source.id());
658 error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
659 }
660 }
661 }
662 }
663 }
664
665 async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
668 let asset_path = AssetPath::from(path).with_source(source.id());
669 debug!("Removing processed {asset_path} because source was removed");
670 let lock = {
671 let mut infos = self.data.processing_state.asset_infos.write().await;
673 infos.remove(&asset_path).await
674 };
675 let Some(lock) = lock else {
676 return;
677 };
678
679 let _write_lock = lock.write();
682 self.remove_processed_asset_and_meta(source, asset_path.path())
683 .await;
684 }
685
686 async fn handle_renamed_asset(
689 &self,
690 source: &AssetSource,
691 old: PathBuf,
692 new: PathBuf,
693 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
694 ) {
695 let old = AssetPath::from(old).with_source(source.id());
696 let new = AssetPath::from(new).with_source(source.id());
697 let processed_writer = source.processed_writer().unwrap();
698 let result = {
699 let mut infos = self.data.processing_state.asset_infos.write().await;
701 infos.rename(&old, &new, new_task_sender).await
702 };
703 let Some((old_lock, new_lock)) = result else {
704 return;
705 };
706 let _old_write_lock = old_lock.write();
709 let _new_write_lock = new_lock.write();
710 processed_writer
711 .rename(old.path(), new.path())
712 .await
713 .unwrap();
714 processed_writer
715 .rename_meta(old.path(), new.path())
716 .await
717 .unwrap();
718 }
719
720 async fn queue_processing_tasks_for_folder(
721 &self,
722 source: &AssetSource,
723 path: PathBuf,
724 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
725 ) -> Result<(), AssetReaderError> {
726 if source.reader().is_directory(&path).await? {
727 let mut path_stream = source.reader().read_directory(&path).await?;
728 while let Some(path) = path_stream.next().await {
729 Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
730 .await?;
731 }
732 } else {
733 let _ = new_task_sender.send((source.id(), path)).await;
734 }
735 Ok(())
736 }
737
738 pub fn register_processor<P: Process>(&self, processor: P) {
740 let mut processors = self
741 .data
742 .processors
743 .write()
744 .unwrap_or_else(PoisonError::into_inner);
745 #[cfg(feature = "trace")]
746 let processor = InstrumentedAssetProcessor(processor);
747 let processor = Arc::new(processor);
748 processors
749 .type_path_to_processor
750 .insert(P::type_path(), processor.clone());
751 match processors
752 .short_type_path_to_processor
753 .entry(P::short_type_path())
754 {
755 Entry::Vacant(entry) => {
756 entry.insert(ShortTypeProcessorEntry::Unique {
757 type_path: P::type_path(),
758 processor,
759 });
760 }
761 Entry::Occupied(mut entry) => match entry.get_mut() {
762 ShortTypeProcessorEntry::Unique { type_path, .. } => {
763 let type_path = *type_path;
764 *entry.get_mut() =
765 ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
766 }
767 ShortTypeProcessorEntry::Ambiguous(type_paths) => {
768 type_paths.push(P::type_path());
769 }
770 },
771 }
772 }
773
774 pub fn set_default_processor<P: Process>(&self, extension: &str) {
776 let mut processors = self
777 .data
778 .processors
779 .write()
780 .unwrap_or_else(PoisonError::into_inner);
781 processors
782 .file_extension_to_default_processor
783 .insert(extension.into(), P::type_path());
784 }
785
786 pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
788 let processors = self
789 .data
790 .processors
791 .read()
792 .unwrap_or_else(PoisonError::into_inner);
793 let key = processors
794 .file_extension_to_default_processor
795 .get(extension)?;
796 processors.type_path_to_processor.get(key).cloned()
797 }
798
799 pub fn get_processor(
801 &self,
802 processor_type_name: &str,
803 ) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
804 let processors = self
805 .data
806 .processors
807 .read()
808 .unwrap_or_else(PoisonError::into_inner);
809 if let Some(short_type_processor) = processors
810 .short_type_path_to_processor
811 .get(processor_type_name)
812 {
813 return match short_type_processor {
814 ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
815 ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
816 processor_short_name: processor_type_name.to_owned(),
817 ambiguous_processor_names: examples.clone(),
818 }),
819 };
820 }
821 processors
822 .type_path_to_processor
823 .get(processor_type_name)
824 .cloned()
825 .ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
826 }
827
828 async fn initialize(&self) -> Result<(), InitializeError> {
833 self.validate_transaction_log_and_recover().await;
834 let mut asset_infos = self.data.processing_state.asset_infos.write().await;
835
836 async fn get_asset_paths(
839 reader: &dyn ErasedAssetReader,
840 path: PathBuf,
841 paths: &mut Vec<PathBuf>,
842 mut empty_dirs: Option<&mut Vec<PathBuf>>,
843 ) -> Result<bool, AssetReaderError> {
844 if reader.is_directory(&path).await? {
845 let mut path_stream = reader.read_directory(&path).await?;
846 let mut contains_files = false;
847
848 while let Some(child_path) = path_stream.next().await {
849 contains_files |= Box::pin(get_asset_paths(
850 reader,
851 child_path,
852 paths,
853 empty_dirs.as_deref_mut(),
854 ))
855 .await?;
856 }
857 if !contains_files
860 && path.parent().is_some()
861 && let Some(empty_dirs) = empty_dirs
862 {
863 empty_dirs.push(path);
864 }
865 Ok(contains_files)
866 } else {
867 paths.push(path);
868 Ok(true)
869 }
870 }
871
872 for source in self.sources().iter_processed() {
873 let Some(processed_reader) = source.ungated_processed_reader() else {
874 continue;
875 };
876 let Ok(processed_writer) = source.processed_writer() else {
877 continue;
878 };
879 let mut unprocessed_paths = Vec::new();
880 get_asset_paths(
881 source.reader(),
882 PathBuf::from(""),
883 &mut unprocessed_paths,
884 None,
885 )
886 .await
887 .map_err(InitializeError::FailedToReadSourcePaths)?;
888
889 let mut processed_paths = Vec::new();
890 let mut empty_dirs = Vec::new();
891 get_asset_paths(
892 processed_reader,
893 PathBuf::from(""),
894 &mut processed_paths,
895 Some(&mut empty_dirs),
896 )
897 .await
898 .map_err(InitializeError::FailedToReadDestinationPaths)?;
899
900 for empty_dir in empty_dirs {
904 let _ = processed_writer.remove_empty_directory(&empty_dir).await;
906 }
907
908 for path in unprocessed_paths {
909 asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
910 }
911
912 for path in processed_paths {
913 let mut dependencies = Vec::new();
914 let asset_path = AssetPath::from(path).with_source(source.id());
915 if let Some(info) = asset_infos.get_mut(&asset_path) {
916 match processed_reader.read_meta_bytes(asset_path.path()).await {
917 Ok(meta_bytes) => {
918 match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
919 Ok(minimal) => {
920 trace!(
921 "Populated processed info for asset {asset_path} {:?}",
922 minimal.processed_info
923 );
924
925 if let Some(processed_info) = &minimal.processed_info {
926 for process_dependency_info in
927 &processed_info.process_dependencies
928 {
929 dependencies.push(process_dependency_info.path.clone());
930 }
931 }
932 info.processed_info = minimal.processed_info;
933 }
934 Err(err) => {
935 trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
936 self.remove_processed_asset_and_meta(source, asset_path.path())
937 .await;
938 }
939 }
940 }
941 Err(err) => {
942 trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
943 self.remove_processed_asset_and_meta(source, asset_path.path())
944 .await;
945 }
946 }
947 } else {
948 trace!("Removing processed data for non-existent asset {asset_path}");
949 self.remove_processed_asset_and_meta(source, asset_path.path())
950 .await;
951 }
952
953 for dependency in dependencies {
954 asset_infos.add_dependent(&dependency, asset_path.clone());
955 }
956 }
957 }
958
959 self.data
960 .processing_state
961 .set_state(ProcessorState::Processing)
962 .await;
963
964 Ok(())
965 }
966
967 async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
970 if let Err(err) = source.processed_writer().unwrap().remove(path).await {
971 warn!("Failed to remove non-existent asset {path:?}: {err}");
972 }
973
974 if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
975 warn!("Failed to remove non-existent meta {path:?}: {err}");
976 }
977
978 self.clean_empty_processed_ancestor_folders(source, path)
979 .await;
980 }
981
982 async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
983 if path.is_absolute() {
985 error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
986 return;
987 }
988 while let Some(parent) = path.parent() {
989 if parent == Path::new("") {
990 break;
991 }
992 if source
993 .processed_writer()
994 .unwrap()
995 .remove_empty_directory(parent)
996 .await
997 .is_err()
998 {
999 break;
1001 }
1002 }
1003 }
1004
1005 async fn process_asset(
1013 &self,
1014 source: &AssetSource,
1015 path: PathBuf,
1016 processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1017 ) {
1018 let asset_path = AssetPath::from(path).with_source(source.id());
1019 let result = self.process_asset_internal(source, &asset_path).await;
1020 let mut infos = self.data.processing_state.asset_infos.write().await;
1021 infos
1022 .finish_processing(asset_path, result, processor_task_event)
1023 .await;
1024 }
1025
1026 async fn process_asset_internal(
1027 &self,
1028 source: &AssetSource,
1029 asset_path: &AssetPath<'static>,
1030 ) -> Result<ProcessResult, ProcessError> {
1031 debug!("Processing {}", asset_path);
1033 let server = &self.server;
1034 let path = asset_path.path();
1035 let reader = source.reader();
1036
1037 let reader_err = |err| ProcessError::AssetReaderError {
1038 path: asset_path.clone(),
1039 err,
1040 };
1041 let writer_err = |err| ProcessError::AssetWriterError {
1042 path: asset_path.clone(),
1043 err,
1044 };
1045
1046 let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
1047 Ok(meta_bytes) => {
1048 let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
1049 ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
1050 })?;
1051 let (meta, processor) = match minimal.asset {
1052 AssetActionMinimal::Load { loader } => {
1053 let loader = server.get_asset_loader_with_type_name(&loader).await?;
1054 let meta = loader.deserialize_meta(&meta_bytes)?;
1055 (meta, None)
1056 }
1057 AssetActionMinimal::Process { processor } => {
1058 let processor = self.get_processor(&processor)?;
1059 let meta = processor.deserialize_meta(&meta_bytes)?;
1060 (meta, Some(processor))
1061 }
1062 AssetActionMinimal::Ignore => {
1063 return Ok(ProcessResult::Ignored);
1064 }
1065 };
1066 (meta, meta_bytes, processor)
1067 }
1068 Err(AssetReaderError::NotFound(_path)) => {
1069 let (meta, processor) = if let Some(processor) = asset_path
1070 .get_full_extension()
1071 .and_then(|ext| self.get_default_processor(&ext))
1072 {
1073 let meta = processor.default_meta();
1074 (meta, Some(processor))
1075 } else {
1076 match server.get_path_asset_loader(asset_path.clone()).await {
1077 Ok(loader) => (loader.default_meta(), None),
1078 Err(MissingAssetLoaderForExtensionError { .. }) => {
1079 let meta: Box<dyn AssetMetaDyn> =
1080 Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
1081 (meta, None)
1082 }
1083 }
1084 };
1085 let meta_bytes = meta.serialize();
1086 (meta, meta_bytes, processor)
1087 }
1088 Err(err) => {
1089 return Err(ProcessError::ReadAssetMetaError {
1090 path: asset_path.clone(),
1091 err,
1092 })
1093 }
1094 };
1095
1096 let processed_writer = source.processed_writer()?;
1097
1098 let new_hash = {
1099 let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1102
1103 get_asset_hash(&meta_bytes, &mut reader_for_hash)
1104 .await
1105 .map_err(reader_err)?
1106 };
1107 let mut new_processed_info = ProcessedInfo {
1108 hash: new_hash,
1109 full_hash: new_hash,
1110 process_dependencies: Vec::new(),
1111 };
1112
1113 {
1114 let infos = self.data.processing_state.asset_infos.read().await;
1115 if let Some(current_processed_info) = infos
1116 .get(asset_path)
1117 .and_then(|i| i.processed_info.as_ref())
1118 && current_processed_info.hash == new_hash
1119 {
1120 let mut dependency_changed = false;
1121 for current_dep_info in ¤t_processed_info.process_dependencies {
1122 let live_hash = infos
1123 .get(¤t_dep_info.path)
1124 .and_then(|i| i.processed_info.as_ref())
1125 .map(|i| i.full_hash);
1126 if live_hash != Some(current_dep_info.full_hash) {
1127 dependency_changed = true;
1128 break;
1129 }
1130 }
1131 if !dependency_changed {
1132 return Ok(ProcessResult::SkippedNotChanged);
1133 }
1134 }
1135 }
1136
1137 let _transaction_lock = {
1140 let lock = {
1141 let mut infos = self.data.processing_state.asset_infos.write().await;
1142 let info = infos.get_or_insert(asset_path.clone());
1143 info.file_transaction_lock.clone()
1147 };
1148 lock.write_arc().await
1149 };
1150
1151 self.log_begin_processing(asset_path).await;
1155 if let Some(processor) = processor {
1156 let settings = source_meta.process_settings().unwrap();
1159
1160 let reader_for_process = reader.read(path).await.map_err(reader_err)?;
1170
1171 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1172 let mut processed_meta = {
1173 let mut context = ProcessContext::new(
1174 self,
1175 asset_path,
1176 reader_for_process,
1177 &mut new_processed_info,
1178 );
1179 processor
1180 .process(&mut context, settings, &mut *writer)
1181 .await?
1182 };
1183
1184 writer
1185 .flush()
1186 .await
1187 .map_err(|e| ProcessError::AssetWriterError {
1188 path: asset_path.clone(),
1189 err: AssetWriterError::Io(e),
1190 })?;
1191
1192 let full_hash = get_full_asset_hash(
1193 new_hash,
1194 new_processed_info
1195 .process_dependencies
1196 .iter()
1197 .map(|i| i.full_hash),
1198 );
1199 new_processed_info.full_hash = full_hash;
1200 *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
1201 let meta_bytes = processed_meta.serialize();
1202
1203 processed_writer
1204 .write_meta_bytes(path, &meta_bytes)
1205 .await
1206 .map_err(writer_err)?;
1207 } else {
1208 let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
1210 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1211 futures_lite::io::copy(&mut reader_for_copy, &mut writer)
1212 .await
1213 .map_err(|err| ProcessError::AssetWriterError {
1214 path: asset_path.clone_owned(),
1215 err: err.into(),
1216 })?;
1217 *source_meta.processed_info_mut() = Some(new_processed_info.clone());
1218 let meta_bytes = source_meta.serialize();
1219 processed_writer
1220 .write_meta_bytes(path, &meta_bytes)
1221 .await
1222 .map_err(writer_err)?;
1223 }
1224 self.log_end_processing(asset_path).await;
1225
1226 Ok(ProcessResult::Processed(new_processed_info))
1227 }
1228
1229 async fn validate_transaction_log_and_recover(&self) {
1230 let log_factory = self
1231 .data
1232 .log_factory
1233 .lock()
1234 .unwrap_or_else(PoisonError::into_inner)
1235 .take()
1238 .expect("the asset processor only starts once");
1239 if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
1240 let state_is_valid = match err {
1241 ValidateLogError::ReadLogError(err) => {
1242 error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
1243 false
1244 }
1245 ValidateLogError::UnrecoverableError => {
1246 error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
1247 false
1248 }
1249 ValidateLogError::EntryErrors(entry_errors) => {
1250 let mut state_is_valid = true;
1251 for entry_error in entry_errors {
1252 match entry_error {
1253 LogEntryError::DuplicateTransaction(_)
1254 | LogEntryError::EndedMissingTransaction(_) => {
1255 error!("{}", entry_error);
1256 state_is_valid = false;
1257 break;
1258 }
1259 LogEntryError::UnfinishedTransaction(path) => {
1260 debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
1261 let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
1262 error!("Failed to remove asset {path:?}: {message}");
1263 state_is_valid = false;
1264 };
1265 let Ok(source) = self.get_source(path.source()) else {
1266 unrecoverable_err(&"AssetSource does not exist");
1267 continue;
1268 };
1269 let Ok(processed_writer) = source.processed_writer() else {
1270 unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1271 continue;
1272 };
1273
1274 if let Err(err) = processed_writer.remove(path.path()).await {
1275 match err {
1276 AssetWriterError::Io(err) => {
1277 if err.kind() != ErrorKind::NotFound {
1279 unrecoverable_err(&err);
1280 }
1281 }
1282 }
1283 }
1284 if let Err(err) = processed_writer.remove_meta(path.path()).await {
1285 match err {
1286 AssetWriterError::Io(err) => {
1287 if err.kind() != ErrorKind::NotFound {
1289 unrecoverable_err(&err);
1290 }
1291 }
1292 }
1293 }
1294 }
1295 }
1296 }
1297 state_is_valid
1298 }
1299 };
1300
1301 if !state_is_valid {
1302 error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1303 for source in self.sources().iter_processed() {
1304 let Ok(processed_writer) = source.processed_writer() else {
1305 continue;
1306 };
1307 if let Err(err) = processed_writer
1308 .remove_assets_in_directory(Path::new(""))
1309 .await
1310 {
1311 panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1312 }
1313 }
1314 }
1315 }
1316 let mut log = self.data.log.write().await;
1317 *log = match log_factory.create_new_log().await {
1318 Ok(log) => Some(log),
1319 Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1320 };
1321 }
1322}
1323
1324impl AssetProcessorData {
1325 pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1327 AssetProcessorData {
1328 processing_state,
1329 sources,
1330 log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
1331 log: Default::default(),
1332 processors: Default::default(),
1333 }
1334 }
1335
1336 pub fn set_log_factory(
1341 &self,
1342 factory: Box<dyn ProcessorTransactionLogFactory>,
1343 ) -> Result<(), SetTransactionLogFactoryError> {
1344 let mut log_factory = self
1345 .log_factory
1346 .lock()
1347 .unwrap_or_else(PoisonError::into_inner);
1348 if log_factory.is_none() {
1349 return Err(SetTransactionLogFactoryError::AlreadyInUse);
1352 }
1353
1354 *log_factory = Some(factory);
1355 Ok(())
1356 }
1357
1358 pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1360 self.processing_state.wait_until_processed(path).await
1361 }
1362
1363 pub async fn wait_until_initialized(&self) {
1365 self.processing_state.wait_until_initialized().await;
1366 }
1367
1368 pub async fn wait_until_finished(&self) {
1370 self.processing_state.wait_until_finished().await;
1371 }
1372}
1373
1374impl ProcessingState {
1375 fn new() -> Self {
1377 let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1378 let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1379 initialized_sender.set_overflow(true);
1382 finished_sender.set_overflow(true);
1383
1384 Self {
1385 state: async_lock::RwLock::new(ProcessorState::Initializing),
1386 initialized_sender,
1387 initialized_receiver,
1388 finished_sender,
1389 finished_receiver,
1390 asset_infos: Default::default(),
1391 }
1392 }
1393
1394 async fn set_state(&self, state: ProcessorState) {
1396 let mut state_guard = self.state.write().await;
1397 let last_state = *state_guard;
1398 *state_guard = state;
1399 if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1400 self.finished_sender.broadcast(()).await.unwrap();
1401 } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1402 self.initialized_sender.broadcast(()).await.unwrap();
1403 }
1404 }
1405
1406 pub(crate) async fn get_state(&self) -> ProcessorState {
1408 *self.state.read().await
1409 }
1410
1411 pub(crate) async fn get_transaction_lock(
1414 &self,
1415 path: &AssetPath<'static>,
1416 ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1417 let lock = {
1418 let infos = self.asset_infos.read().await;
1419 let info = infos
1420 .get(path)
1421 .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1422 info.file_transaction_lock.clone()
1426 };
1427 Ok(lock.read_arc().await)
1428 }
1429
1430 pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1432 self.wait_until_initialized().await;
1433 let mut receiver = {
1434 let infos = self.asset_infos.write().await;
1435 let info = infos.get(&path);
1436 match info {
1437 Some(info) => match info.status {
1438 Some(result) => return result,
1439 None => info.status_receiver.clone(),
1441 },
1442 None => return ProcessStatus::NonExistent,
1443 }
1444 };
1445 receiver.recv().await.unwrap()
1446 }
1447
1448 pub(crate) async fn wait_until_initialized(&self) {
1450 let receiver = {
1451 let state = self.state.read().await;
1452 match *state {
1453 ProcessorState::Initializing => {
1454 Some(self.initialized_receiver.clone())
1456 }
1457 _ => None,
1458 }
1459 };
1460
1461 if let Some(mut receiver) = receiver {
1462 receiver.recv().await.unwrap();
1463 }
1464 }
1465
1466 pub(crate) async fn wait_until_finished(&self) {
1468 let receiver = {
1469 let state = self.state.read().await;
1470 match *state {
1471 ProcessorState::Initializing | ProcessorState::Processing => {
1472 Some(self.finished_receiver.clone())
1474 }
1475 ProcessorState::Finished => None,
1476 }
1477 };
1478
1479 if let Some(mut receiver) = receiver {
1480 receiver.recv().await.unwrap();
1481 }
1482 }
1483}
1484
1485#[cfg(feature = "trace")]
1486#[derive(TypePath)]
1487struct InstrumentedAssetProcessor<T>(T);
1488
1489#[cfg(feature = "trace")]
1490impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1491 type Settings = T::Settings;
1492 type OutputLoader = T::OutputLoader;
1493
1494 fn process(
1495 &self,
1496 context: &mut ProcessContext,
1497 settings: &Self::Settings,
1498 writer: &mut crate::io::Writer,
1499 ) -> impl ConditionalSendFuture<
1500 Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1501 > {
1502 let span = info_span!(
1503 "asset processing",
1504 processor = T::type_path(),
1505 asset = context.path().to_string(),
1506 );
1507 self.0.process(context, settings, writer).instrument(span)
1508 }
1509}
1510
1511#[derive(Debug, Clone)]
1513pub enum ProcessResult {
1514 Processed(ProcessedInfo),
1515 SkippedNotChanged,
1516 Ignored,
1517}
1518
1519#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1521pub enum ProcessStatus {
1522 Processed,
1523 Failed,
1524 NonExistent,
1525}
1526
1527#[derive(Debug)]
1529pub(crate) struct ProcessorAssetInfo {
1530 processed_info: Option<ProcessedInfo>,
1531 dependents: HashSet<AssetPath<'static>>,
1533 status: Option<ProcessStatus>,
1534 pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1544 status_sender: async_broadcast::Sender<ProcessStatus>,
1545 status_receiver: async_broadcast::Receiver<ProcessStatus>,
1546}
1547
1548impl Default for ProcessorAssetInfo {
1549 fn default() -> Self {
1550 let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1551 status_sender.set_overflow(true);
1554 Self {
1555 processed_info: Default::default(),
1556 dependents: Default::default(),
1557 file_transaction_lock: Default::default(),
1558 status: None,
1559 status_sender,
1560 status_receiver,
1561 }
1562 }
1563}
1564
1565impl ProcessorAssetInfo {
1566 async fn update_status(&mut self, status: ProcessStatus) {
1567 if self.status != Some(status) {
1568 self.status = Some(status);
1569 self.status_sender.broadcast(status).await.unwrap();
1570 }
1571 }
1572}
1573
1574#[derive(Default, Debug)]
1578pub struct ProcessorAssetInfos {
1579 infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1584 non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1589}
1590
1591impl ProcessorAssetInfos {
1592 fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1593 self.infos.entry(asset_path.clone()).or_insert_with(|| {
1594 let mut info = ProcessorAssetInfo::default();
1595 if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1597 info.dependents = dependents;
1598 }
1599 info
1600 })
1601 }
1602
1603 pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1604 self.infos.get(asset_path)
1605 }
1606
1607 fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1608 self.infos.get_mut(asset_path)
1609 }
1610
1611 fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1612 if let Some(info) = self.get_mut(asset_path) {
1613 info.dependents.insert(dependent);
1614 } else {
1615 let dependents = self
1616 .non_existent_dependents
1617 .entry(asset_path.clone())
1618 .or_default();
1619 dependents.insert(dependent);
1620 }
1621 }
1622
1623 async fn finish_processing(
1625 &mut self,
1626 asset_path: AssetPath<'static>,
1627 result: Result<ProcessResult, ProcessError>,
1628 reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1629 ) {
1630 match result {
1631 Ok(ProcessResult::Processed(processed_info)) => {
1632 debug!("Finished processing \"{}\"", asset_path);
1633 let old_processed_info = self
1635 .infos
1636 .get_mut(&asset_path)
1637 .and_then(|i| i.processed_info.take());
1638 if let Some(old_processed_info) = old_processed_info {
1639 self.clear_dependencies(&asset_path, old_processed_info);
1640 }
1641
1642 for process_dependency_info in &processed_info.process_dependencies {
1644 self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1645 }
1646 let info = self.get_or_insert(asset_path);
1647 info.processed_info = Some(processed_info);
1648 info.update_status(ProcessStatus::Processed).await;
1649 let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1650 for path in dependents {
1651 let _ = reprocess_sender
1652 .send((path.source().clone_owned(), path.path().to_owned()))
1653 .await;
1654 }
1655 }
1656 Ok(ProcessResult::SkippedNotChanged) => {
1657 debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1658 let info = self.get_mut(&asset_path).expect("info should exist");
1659 info.update_status(ProcessStatus::Processed).await;
1666 }
1667 Ok(ProcessResult::Ignored) => {
1668 debug!("Skipping processing (ignored) \"{}\"", asset_path);
1669 }
1670 Err(ProcessError::ExtensionRequired) => {
1671 }
1673 Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1674 trace!("No loader found for {asset_path}");
1675 }
1676 Err(ProcessError::AssetReaderError {
1677 err: AssetReaderError::NotFound(_),
1678 ..
1679 }) => {
1680 trace!("No need to process asset {asset_path} because it does not exist");
1682 }
1683 Err(err) => {
1684 error!("Failed to process asset {asset_path}: {err}");
1685 if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1687 err
1688 {
1689 let info = self.get_mut(&asset_path).expect("info should exist");
1690 info.processed_info = Some(ProcessedInfo {
1691 hash: AssetHash::default(),
1692 full_hash: AssetHash::default(),
1693 process_dependencies: vec![],
1694 });
1695 self.add_dependent(dependency.path(), asset_path.to_owned());
1696 }
1697
1698 let info = self.get_mut(&asset_path).expect("info should exist");
1699 info.update_status(ProcessStatus::Failed).await;
1700 }
1701 }
1702 }
1703
1704 async fn remove(
1708 &mut self,
1709 asset_path: &AssetPath<'static>,
1710 ) -> Option<Arc<async_lock::RwLock<()>>> {
1711 let info = self.infos.remove(asset_path)?;
1712 if let Some(processed_info) = info.processed_info {
1713 self.clear_dependencies(asset_path, processed_info);
1714 }
1715 info.status_sender
1717 .broadcast(ProcessStatus::NonExistent)
1718 .await
1719 .unwrap();
1720 if !info.dependents.is_empty() {
1721 error!(
1722 "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1723 info.dependents
1724 );
1725 self.non_existent_dependents
1726 .insert(asset_path.clone(), info.dependents);
1727 }
1728
1729 Some(info.file_transaction_lock)
1730 }
1731
1732 async fn rename(
1736 &mut self,
1737 old: &AssetPath<'static>,
1738 new: &AssetPath<'static>,
1739 new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1740 ) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1741 let mut info = self.infos.remove(old)?;
1742 if !info.dependents.is_empty() {
1743 error!(
1751 "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1752 info.dependents
1753 );
1754 self.non_existent_dependents
1755 .insert(old.clone(), core::mem::take(&mut info.dependents));
1756 }
1757 if let Some(processed_info) = &info.processed_info {
1758 for dep in &processed_info.process_dependencies {
1760 if let Some(info) = self.infos.get_mut(&dep.path) {
1761 info.dependents.remove(old);
1762 info.dependents.insert(new.clone());
1763 } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1764 dependents.remove(old);
1765 dependents.insert(new.clone());
1766 }
1767 }
1768 }
1769 info.status_sender
1771 .broadcast(ProcessStatus::NonExistent)
1772 .await
1773 .unwrap();
1774 let new_info = self.get_or_insert(new.clone());
1775 new_info.processed_info = info.processed_info;
1776 new_info.status = info.status;
1777 if let Some(status) = new_info.status {
1779 new_info.status_sender.broadcast(status).await.unwrap();
1780 }
1781 let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1782 let _ = new_task_sender
1784 .send((new.source().clone_owned(), new.path().to_owned()))
1785 .await;
1786 for dependent in dependents {
1787 let _ = new_task_sender
1789 .send((
1790 dependent.source().clone_owned(),
1791 dependent.path().to_owned(),
1792 ))
1793 .await;
1794 }
1795
1796 Some((
1797 info.file_transaction_lock,
1798 new_info.file_transaction_lock.clone(),
1799 ))
1800 }
1801
1802 fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1803 for old_load_dep in removed_info.process_dependencies {
1804 if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1805 info.dependents.remove(asset_path);
1806 } else if let Some(dependents) =
1807 self.non_existent_dependents.get_mut(&old_load_dep.path)
1808 {
1809 dependents.remove(asset_path);
1810 }
1811 }
1812 }
1813}
1814
1815#[derive(Copy, Clone, PartialEq, Eq)]
1817pub enum ProcessorState {
1818 Initializing,
1822 Processing,
1824 Finished,
1826}
1827
1828#[derive(Error, Debug)]
1830pub enum InitializeError {
1831 #[error(transparent)]
1832 FailedToReadSourcePaths(AssetReaderError),
1833 #[error(transparent)]
1834 FailedToReadDestinationPaths(AssetReaderError),
1835 #[error("Failed to validate asset log: {0}")]
1836 ValidateLogError(#[from] ValidateLogError),
1837}
1838
1839#[derive(Error, Debug)]
1841pub enum SetTransactionLogFactoryError {
1842 #[error("Transaction log is already in use so setting the factory does nothing")]
1843 AlreadyInUse,
1844}
1845
1846#[derive(Error, Debug, PartialEq, Eq)]
1848pub enum GetProcessorError {
1849 #[error("The processor '{0}' does not exist")]
1850 Missing(String),
1851 #[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
1852 Ambiguous {
1853 processor_short_name: String,
1854 ambiguous_processor_names: Vec<&'static str>,
1855 },
1856}
1857
1858impl From<GetProcessorError> for ProcessError {
1859 fn from(err: GetProcessorError) -> Self {
1860 match err {
1861 GetProcessorError::Missing(name) => Self::MissingProcessor(name),
1862 GetProcessorError::Ambiguous {
1863 processor_short_name,
1864 ambiguous_processor_names,
1865 } => Self::AmbiguousProcessor {
1866 processor_short_name,
1867 ambiguous_processor_names,
1868 },
1869 }
1870 }
1871}
1872
1873#[cfg(test)]
1874mod tests;