1mod log;
41mod process;
42
43pub use log::*;
44pub use process::*;
45
46use crate::{
47 io::{
48 AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
49 AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
50 MissingAssetSourceError,
51 },
52 meta::{
53 get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54 AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55 },
56 AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57 MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, collections::VecDeque, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::collections::{HashMap, HashSet};
62use bevy_tasks::IoTaskPool;
63use futures_io::ErrorKind;
64use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
65use parking_lot::RwLock;
66use std::path::{Path, PathBuf};
67use thiserror::Error;
68use tracing::{debug, error, trace, warn};
69
70#[cfg(feature = "trace")]
71use {
72 alloc::string::ToString,
73 bevy_tasks::ConditionalSendFuture,
74 tracing::{info_span, instrument::Instrument},
75};
76
77#[derive(Resource, Clone)]
93pub struct AssetProcessor {
94 server: AssetServer,
95 pub(crate) data: Arc<AssetProcessorData>,
96}
97
98pub struct AssetProcessorData {
100 pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
101 log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
102 processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
103 default_processors: RwLock<HashMap<Box<str>, &'static str>>,
105 state: async_lock::RwLock<ProcessorState>,
106 sources: AssetSources,
107 initialized_sender: async_broadcast::Sender<()>,
108 initialized_receiver: async_broadcast::Receiver<()>,
109 finished_sender: async_broadcast::Sender<()>,
110 finished_receiver: async_broadcast::Receiver<()>,
111}
112
113impl AssetProcessor {
114 pub fn new(source: &mut AssetSourceBuilders) -> Self {
116 let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
117 let mut sources = source.build_sources(false, false);
119 sources.gate_on_processor(data.clone());
120 let server = AssetServer::new_with_meta_check(
121 sources,
122 AssetServerMode::Processed,
123 AssetMetaCheck::Always,
124 false,
125 UnapprovedPathMode::default(),
126 );
127 Self { server, data }
128 }
129
130 pub fn data(&self) -> &Arc<AssetProcessorData> {
132 &self.data
133 }
134
135 pub fn server(&self) -> &AssetServer {
138 &self.server
139 }
140
141 async fn set_state(&self, state: ProcessorState) {
142 let mut state_guard = self.data.state.write().await;
143 let last_state = *state_guard;
144 *state_guard = state;
145 if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
146 self.data.finished_sender.broadcast(()).await.unwrap();
147 } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
148 self.data.initialized_sender.broadcast(()).await.unwrap();
149 }
150 }
151
152 pub async fn get_state(&self) -> ProcessorState {
154 *self.data.state.read().await
155 }
156
157 #[inline]
159 pub fn get_source<'a>(
160 &self,
161 id: impl Into<AssetSourceId<'a>>,
162 ) -> Result<&AssetSource, MissingAssetSourceError> {
163 self.data.sources.get(id.into())
164 }
165
166 #[inline]
167 pub fn sources(&self) -> &AssetSources {
168 &self.data.sources
169 }
170
171 async fn log_unrecoverable(&self) {
174 let mut log = self.data.log.write().await;
175 let log = log.as_mut().unwrap();
176 log.unrecoverable().await.unwrap();
177 }
178
179 async fn log_begin_processing(&self, path: &AssetPath<'_>) {
182 let mut log = self.data.log.write().await;
183 let log = log.as_mut().unwrap();
184 log.begin_processing(path).await.unwrap();
185 }
186
187 async fn log_end_processing(&self, path: &AssetPath<'_>) {
189 let mut log = self.data.log.write().await;
190 let log = log.as_mut().unwrap();
191 log.end_processing(path).await.unwrap();
192 }
193
194 pub fn start(_processor: Res<Self>) {
196 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
197 error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
198 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
199 {
200 let processor = _processor.clone();
201 std::thread::spawn(move || {
202 processor.process_assets();
203 bevy_tasks::block_on(processor.listen_for_source_change_events());
204 });
205 }
206 }
207
208 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
219 pub fn process_assets(&self) {
220 let start_time = std::time::Instant::now();
221 debug!("Processing Assets");
222 IoTaskPool::get().scope(|scope| {
223 scope.spawn(async move {
224 self.initialize().await.unwrap();
225 for source in self.sources().iter_processed() {
226 self.process_assets_internal(scope, source, PathBuf::from(""))
227 .await
228 .unwrap();
229 }
230 });
231 });
232 bevy_tasks::block_on(self.finish_processing_assets());
235 let end_time = std::time::Instant::now();
236 debug!("Processing finished in {:?}", end_time - start_time);
237 }
238
239 pub async fn listen_for_source_change_events(&self) {
242 debug!("Listening for changes to source assets");
243 loop {
244 let mut started_processing = false;
245
246 for source in self.data.sources.iter_processed() {
247 if let Some(receiver) = source.event_receiver() {
248 for event in receiver.try_iter() {
249 if !started_processing {
250 self.set_state(ProcessorState::Processing).await;
251 started_processing = true;
252 }
253
254 self.handle_asset_source_event(source, event).await;
255 }
256 }
257 }
258
259 if started_processing {
260 self.finish_processing_assets().await;
261 }
262 }
263 }
264
265 pub async fn write_default_meta_file_for_path(
273 &self,
274 path: impl Into<AssetPath<'_>>,
275 ) -> Result<(), WriteDefaultMetaError> {
276 let path = path.into();
277 let Some(processor) = path
278 .get_full_extension()
279 .and_then(|extension| self.get_default_processor(&extension))
280 else {
281 return self
282 .server
283 .write_default_loader_meta_file_for_path(path)
284 .await;
285 };
286
287 let meta = processor.default_meta();
288 let serialized_meta = meta.serialize();
289
290 let source = self.get_source(path.source())?;
291
292 let reader = source.reader();
296 match reader.read_meta_bytes(path.path()).await {
297 Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
298 Err(AssetReaderError::NotFound(_)) => {
299 }
301 Err(AssetReaderError::Io(err)) => {
302 return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
303 }
304 Err(AssetReaderError::HttpError(err)) => {
305 return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
306 }
307 }
308
309 let writer = source.writer()?;
310 writer
311 .write_meta_bytes(path.path(), &serialized_meta)
312 .await?;
313
314 Ok(())
315 }
316
317 async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
318 trace!("{event:?}");
319 match event {
320 AssetSourceEvent::AddedAsset(path)
321 | AssetSourceEvent::AddedMeta(path)
322 | AssetSourceEvent::ModifiedAsset(path)
323 | AssetSourceEvent::ModifiedMeta(path) => {
324 self.process_asset(source, path).await;
325 }
326 AssetSourceEvent::RemovedAsset(path) => {
327 self.handle_removed_asset(source, path).await;
328 }
329 AssetSourceEvent::RemovedMeta(path) => {
330 self.handle_removed_meta(source, path).await;
331 }
332 AssetSourceEvent::AddedFolder(path) => {
333 self.handle_added_folder(source, path).await;
334 }
335 AssetSourceEvent::RemovedFolder(path) => {
339 self.handle_removed_folder(source, &path).await;
340 }
341 AssetSourceEvent::RenamedAsset { old, new } => {
342 if old == new {
345 self.process_asset(source, new).await;
346 } else {
347 self.handle_renamed_asset(source, old, new).await;
348 }
349 }
350 AssetSourceEvent::RenamedMeta { old, new } => {
351 if old == new {
354 self.process_asset(source, new).await;
355 } else {
356 debug!("Meta renamed from {old:?} to {new:?}");
357 let mut infos = self.data.asset_infos.write().await;
358 let new_asset_path = AssetPath::from(new).with_source(source.id());
361 let old_asset_path = AssetPath::from(old).with_source(source.id());
362 infos.check_reprocess_queue.push_back(old_asset_path);
363 infos.check_reprocess_queue.push_back(new_asset_path);
364 }
365 }
366 AssetSourceEvent::RenamedFolder { old, new } => {
367 if old == new {
370 self.handle_added_folder(source, new).await;
371 } else {
372 self.handle_removed_folder(source, &old).await;
375 self.handle_added_folder(source, new).await;
376 }
377 }
378 AssetSourceEvent::RemovedUnknown { path, is_meta } => {
379 let processed_reader = source.processed_reader().unwrap();
380 match processed_reader.is_directory(&path).await {
381 Ok(is_directory) => {
382 if is_directory {
383 self.handle_removed_folder(source, &path).await;
384 } else if is_meta {
385 self.handle_removed_meta(source, path).await;
386 } else {
387 self.handle_removed_asset(source, path).await;
388 }
389 }
390 Err(err) => {
391 match err {
392 AssetReaderError::NotFound(_) => {
393 }
395 AssetReaderError::Io(err) => {
396 error!(
397 "Path '{}' was removed, but the destination reader could not determine if it \
398 was a folder or a file due to the following error: {err}",
399 AssetPath::from_path(&path).with_source(source.id())
400 );
401 }
402 AssetReaderError::HttpError(status) => {
403 error!(
404 "Path '{}' was removed, but the destination reader could not determine if it \
405 was a folder or a file due to receiving an unexpected HTTP Status {status}",
406 AssetPath::from_path(&path).with_source(source.id())
407 );
408 }
409 }
410 }
411 }
412 }
413 }
414 }
415
416 async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
417 debug!(
418 "Folder {} was added. Attempting to re-process",
419 AssetPath::from_path(&path).with_source(source.id())
420 );
421 #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
422 error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
423 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
424 IoTaskPool::get().scope(|scope| {
425 scope.spawn(async move {
426 self.process_assets_internal(scope, source, path)
427 .await
428 .unwrap();
429 });
430 });
431 }
432
433 async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
435 debug!(
440 "Meta for asset {} was removed. Attempting to re-process",
441 AssetPath::from_path(&path).with_source(source.id())
442 );
443 self.process_asset(source, path).await;
444 }
445
446 async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
448 debug!(
449 "Removing folder {} because source was removed",
450 path.display()
451 );
452 let processed_reader = source.processed_reader().unwrap();
453 match processed_reader.read_directory(path).await {
454 Ok(mut path_stream) => {
455 while let Some(child_path) = path_stream.next().await {
456 self.handle_removed_asset(source, child_path).await;
457 }
458 }
459 Err(err) => match err {
460 AssetReaderError::NotFound(_err) => {
461 }
463 AssetReaderError::HttpError(status) => {
464 self.log_unrecoverable().await;
465 error!(
466 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
467 in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
468 );
469 }
470 AssetReaderError::Io(err) => {
471 self.log_unrecoverable().await;
472 error!(
473 "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
474 in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
475 );
476 }
477 },
478 }
479 let processed_writer = source.processed_writer().unwrap();
480 if let Err(err) = processed_writer.remove_directory(path).await {
481 match err {
482 AssetWriterError::Io(err) => {
483 if err.kind() != ErrorKind::NotFound {
486 let asset_path = AssetPath::from_path(path).with_source(source.id());
487 error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
488 }
489 }
490 }
491 }
492 }
493
494 async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
497 let asset_path = AssetPath::from(path).with_source(source.id());
498 debug!("Removing processed {asset_path} because source was removed");
499 let mut infos = self.data.asset_infos.write().await;
500 if let Some(info) = infos.get(&asset_path) {
501 let _write_lock = info.file_transaction_lock.write();
504 self.remove_processed_asset_and_meta(source, asset_path.path())
505 .await;
506 }
507 infos.remove(&asset_path).await;
508 }
509
510 async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
513 let mut infos = self.data.asset_infos.write().await;
514 let old = AssetPath::from(old).with_source(source.id());
515 let new = AssetPath::from(new).with_source(source.id());
516 let processed_writer = source.processed_writer().unwrap();
517 if let Some(info) = infos.get(&old) {
518 let _write_lock = info.file_transaction_lock.write();
521 processed_writer
522 .rename(old.path(), new.path())
523 .await
524 .unwrap();
525 processed_writer
526 .rename_meta(old.path(), new.path())
527 .await
528 .unwrap();
529 }
530 infos.rename(&old, &new).await;
531 }
532
533 async fn finish_processing_assets(&self) {
534 self.try_reprocessing_queued().await;
535 self.server.data.infos.write().consume_handle_drop_events();
537 self.set_state(ProcessorState::Finished).await;
538 }
539
540 #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
541 async fn process_assets_internal<'scope>(
542 &'scope self,
543 scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
544 source: &'scope AssetSource,
545 path: PathBuf,
546 ) -> Result<(), AssetReaderError> {
547 if source.reader().is_directory(&path).await? {
548 let mut path_stream = source.reader().read_directory(&path).await?;
549 while let Some(path) = path_stream.next().await {
550 Box::pin(self.process_assets_internal(scope, source, path)).await?;
551 }
552 } else {
553 let processor = self.clone();
555 scope.spawn(async move {
556 processor.process_asset(source, path).await;
557 });
558 }
559 Ok(())
560 }
561
562 async fn try_reprocessing_queued(&self) {
563 loop {
564 let mut check_reprocess_queue =
565 core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
566 IoTaskPool::get().scope(|scope| {
567 for path in check_reprocess_queue.drain(..) {
568 let processor = self.clone();
569 let source = self.get_source(path.source()).unwrap();
570 scope.spawn(async move {
571 processor.process_asset(source, path.into()).await;
572 });
573 }
574 });
575 let infos = self.data.asset_infos.read().await;
576 if infos.check_reprocess_queue.is_empty() {
577 break;
578 }
579 }
580 }
581
582 pub fn register_processor<P: Process>(&self, processor: P) {
584 let mut process_plans = self.data.processors.write();
585 #[cfg(feature = "trace")]
586 let processor = InstrumentedAssetProcessor(processor);
587 process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
588 }
589
590 pub fn set_default_processor<P: Process>(&self, extension: &str) {
592 let mut default_processors = self.data.default_processors.write();
593 default_processors.insert(extension.into(), core::any::type_name::<P>());
594 }
595
596 pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
598 let default_processors = self.data.default_processors.read();
599 let key = default_processors.get(extension)?;
600 self.data.processors.read().get(key).cloned()
601 }
602
603 pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
605 let processors = self.data.processors.read();
606 processors.get(processor_type_name).cloned()
607 }
608
609 #[cfg_attr(
614 any(target_arch = "wasm32", not(feature = "multi_threaded")),
615 expect(
616 dead_code,
617 reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
618 )
619 )]
620 async fn initialize(&self) -> Result<(), InitializeError> {
621 self.validate_transaction_log_and_recover().await;
622 let mut asset_infos = self.data.asset_infos.write().await;
623
624 async fn get_asset_paths(
627 reader: &dyn ErasedAssetReader,
628 clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
629 path: PathBuf,
630 paths: &mut Vec<PathBuf>,
631 ) -> Result<bool, AssetReaderError> {
632 if reader.is_directory(&path).await? {
633 let mut path_stream = reader.read_directory(&path).await?;
634 let mut contains_files = false;
635
636 while let Some(child_path) = path_stream.next().await {
637 contains_files |= Box::pin(get_asset_paths(
638 reader,
639 clean_empty_folders_writer,
640 child_path,
641 paths,
642 ))
643 .await?;
644 }
645 if !contains_files && path.parent().is_some() {
646 if let Some(writer) = clean_empty_folders_writer {
647 let _ = writer.remove_empty_directory(&path).await;
649 }
650 }
651 Ok(contains_files)
652 } else {
653 paths.push(path);
654 Ok(true)
655 }
656 }
657
658 for source in self.sources().iter_processed() {
659 let Ok(processed_reader) = source.processed_reader() else {
660 continue;
661 };
662 let Ok(processed_writer) = source.processed_writer() else {
663 continue;
664 };
665 let mut unprocessed_paths = Vec::new();
666 get_asset_paths(
667 source.reader(),
668 None,
669 PathBuf::from(""),
670 &mut unprocessed_paths,
671 )
672 .await
673 .map_err(InitializeError::FailedToReadSourcePaths)?;
674
675 let mut processed_paths = Vec::new();
676 get_asset_paths(
677 processed_reader,
678 Some(processed_writer),
679 PathBuf::from(""),
680 &mut processed_paths,
681 )
682 .await
683 .map_err(InitializeError::FailedToReadDestinationPaths)?;
684
685 for path in unprocessed_paths {
686 asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
687 }
688
689 for path in processed_paths {
690 let mut dependencies = Vec::new();
691 let asset_path = AssetPath::from(path).with_source(source.id());
692 if let Some(info) = asset_infos.get_mut(&asset_path) {
693 match processed_reader.read_meta_bytes(asset_path.path()).await {
694 Ok(meta_bytes) => {
695 match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
696 Ok(minimal) => {
697 trace!(
698 "Populated processed info for asset {asset_path} {:?}",
699 minimal.processed_info
700 );
701
702 if let Some(processed_info) = &minimal.processed_info {
703 for process_dependency_info in
704 &processed_info.process_dependencies
705 {
706 dependencies.push(process_dependency_info.path.clone());
707 }
708 }
709 info.processed_info = minimal.processed_info;
710 }
711 Err(err) => {
712 trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
713 self.remove_processed_asset_and_meta(source, asset_path.path())
714 .await;
715 }
716 }
717 }
718 Err(err) => {
719 trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
720 self.remove_processed_asset_and_meta(source, asset_path.path())
721 .await;
722 }
723 }
724 } else {
725 trace!("Removing processed data for non-existent asset {asset_path}");
726 self.remove_processed_asset_and_meta(source, asset_path.path())
727 .await;
728 }
729
730 for dependency in dependencies {
731 asset_infos.add_dependent(&dependency, asset_path.clone());
732 }
733 }
734 }
735
736 self.set_state(ProcessorState::Processing).await;
737
738 Ok(())
739 }
740
741 async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
744 if let Err(err) = source.processed_writer().unwrap().remove(path).await {
745 warn!("Failed to remove non-existent asset {path:?}: {err}");
746 }
747
748 if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
749 warn!("Failed to remove non-existent meta {path:?}: {err}");
750 }
751
752 self.clean_empty_processed_ancestor_folders(source, path)
753 .await;
754 }
755
756 async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
757 if path.is_absolute() {
759 error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
760 return;
761 }
762 while let Some(parent) = path.parent() {
763 if parent == Path::new("") {
764 break;
765 }
766 if source
767 .processed_writer()
768 .unwrap()
769 .remove_empty_directory(parent)
770 .await
771 .is_err()
772 {
773 break;
775 }
776 }
777 }
778
779 async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
787 let asset_path = AssetPath::from(path).with_source(source.id());
788 let result = self.process_asset_internal(source, &asset_path).await;
789 let mut infos = self.data.asset_infos.write().await;
790 infos.finish_processing(asset_path, result).await;
791 }
792
793 async fn process_asset_internal(
794 &self,
795 source: &AssetSource,
796 asset_path: &AssetPath<'static>,
797 ) -> Result<ProcessResult, ProcessError> {
798 debug!("Processing {}", asset_path);
801 let server = &self.server;
802 let path = asset_path.path();
803 let reader = source.reader();
804
805 let reader_err = |err| ProcessError::AssetReaderError {
806 path: asset_path.clone(),
807 err,
808 };
809 let writer_err = |err| ProcessError::AssetWriterError {
810 path: asset_path.clone(),
811 err,
812 };
813
814 let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
816
817 let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
818 Ok(meta_bytes) => {
819 let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
820 ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
821 })?;
822 let (meta, processor) = match minimal.asset {
823 AssetActionMinimal::Load { loader } => {
824 let loader = server.get_asset_loader_with_type_name(&loader).await?;
825 let meta = loader.deserialize_meta(&meta_bytes)?;
826 (meta, None)
827 }
828 AssetActionMinimal::Process { processor } => {
829 let processor = self
830 .get_processor(&processor)
831 .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
832 let meta = processor.deserialize_meta(&meta_bytes)?;
833 (meta, Some(processor))
834 }
835 AssetActionMinimal::Ignore => {
836 return Ok(ProcessResult::Ignored);
837 }
838 };
839 (meta, meta_bytes, processor)
840 }
841 Err(AssetReaderError::NotFound(_path)) => {
842 let (meta, processor) = if let Some(processor) = asset_path
843 .get_full_extension()
844 .and_then(|ext| self.get_default_processor(&ext))
845 {
846 let meta = processor.default_meta();
847 (meta, Some(processor))
848 } else {
849 match server.get_path_asset_loader(asset_path.clone()).await {
850 Ok(loader) => (loader.default_meta(), None),
851 Err(MissingAssetLoaderForExtensionError { .. }) => {
852 let meta: Box<dyn AssetMetaDyn> =
853 Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
854 (meta, None)
855 }
856 }
857 };
858 let meta_bytes = meta.serialize();
859 (meta, meta_bytes, processor)
860 }
861 Err(err) => {
862 return Err(ProcessError::ReadAssetMetaError {
863 path: asset_path.clone(),
864 err,
865 })
866 }
867 };
868
869 let processed_writer = source.processed_writer()?;
870
871 let mut asset_bytes = Vec::new();
872 byte_reader
873 .read_to_end(&mut asset_bytes)
874 .await
875 .map_err(|e| ProcessError::AssetReaderError {
876 path: asset_path.clone(),
877 err: AssetReaderError::Io(e.into()),
878 })?;
879
880 let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
884 let mut new_processed_info = ProcessedInfo {
885 hash: new_hash,
886 full_hash: new_hash,
887 process_dependencies: Vec::new(),
888 };
889
890 {
891 let infos = self.data.asset_infos.read().await;
892 if let Some(current_processed_info) = infos
893 .get(asset_path)
894 .and_then(|i| i.processed_info.as_ref())
895 {
896 if current_processed_info.hash == new_hash {
897 let mut dependency_changed = false;
898 for current_dep_info in ¤t_processed_info.process_dependencies {
899 let live_hash = infos
900 .get(¤t_dep_info.path)
901 .and_then(|i| i.processed_info.as_ref())
902 .map(|i| i.full_hash);
903 if live_hash != Some(current_dep_info.full_hash) {
904 dependency_changed = true;
905 break;
906 }
907 }
908 if !dependency_changed {
909 return Ok(ProcessResult::SkippedNotChanged);
910 }
911 }
912 }
913 }
914 let _transaction_lock = {
917 let mut infos = self.data.asset_infos.write().await;
918 let info = infos.get_or_insert(asset_path.clone());
919 info.file_transaction_lock.write_arc().await
920 };
921
922 self.log_begin_processing(asset_path).await;
926 if let Some(processor) = processor {
927 let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
928 let mut processed_meta = {
929 let mut context =
930 ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
931 processor
932 .process(&mut context, source_meta, &mut *writer)
933 .await?
934 };
935
936 writer
937 .flush()
938 .await
939 .map_err(|e| ProcessError::AssetWriterError {
940 path: asset_path.clone(),
941 err: AssetWriterError::Io(e),
942 })?;
943
944 let full_hash = get_full_asset_hash(
945 new_hash,
946 new_processed_info
947 .process_dependencies
948 .iter()
949 .map(|i| i.full_hash),
950 );
951 new_processed_info.full_hash = full_hash;
952 *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
953 let meta_bytes = processed_meta.serialize();
954 processed_writer
955 .write_meta_bytes(path, &meta_bytes)
956 .await
957 .map_err(writer_err)?;
958 } else {
959 processed_writer
960 .write_bytes(path, &asset_bytes)
961 .await
962 .map_err(writer_err)?;
963 *source_meta.processed_info_mut() = Some(new_processed_info.clone());
964 let meta_bytes = source_meta.serialize();
965 processed_writer
966 .write_meta_bytes(path, &meta_bytes)
967 .await
968 .map_err(writer_err)?;
969 }
970 self.log_end_processing(asset_path).await;
971
972 Ok(ProcessResult::Processed(new_processed_info))
973 }
974
975 async fn validate_transaction_log_and_recover(&self) {
976 if let Err(err) = ProcessorTransactionLog::validate().await {
977 let state_is_valid = match err {
978 ValidateLogError::ReadLogError(err) => {
979 error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
980 false
981 }
982 ValidateLogError::UnrecoverableError => {
983 error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
984 false
985 }
986 ValidateLogError::EntryErrors(entry_errors) => {
987 let mut state_is_valid = true;
988 for entry_error in entry_errors {
989 match entry_error {
990 LogEntryError::DuplicateTransaction(_)
991 | LogEntryError::EndedMissingTransaction(_) => {
992 error!("{}", entry_error);
993 state_is_valid = false;
994 break;
995 }
996 LogEntryError::UnfinishedTransaction(path) => {
997 debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
998 let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
999 error!("Failed to remove asset {path:?}: {message}");
1000 state_is_valid = false;
1001 };
1002 let Ok(source) = self.get_source(path.source()) else {
1003 unrecoverable_err(&"AssetSource does not exist");
1004 continue;
1005 };
1006 let Ok(processed_writer) = source.processed_writer() else {
1007 unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1008 continue;
1009 };
1010
1011 if let Err(err) = processed_writer.remove(path.path()).await {
1012 match err {
1013 AssetWriterError::Io(err) => {
1014 if err.kind() != ErrorKind::NotFound {
1016 unrecoverable_err(&err);
1017 }
1018 }
1019 }
1020 }
1021 if let Err(err) = processed_writer.remove_meta(path.path()).await {
1022 match err {
1023 AssetWriterError::Io(err) => {
1024 if err.kind() != ErrorKind::NotFound {
1026 unrecoverable_err(&err);
1027 }
1028 }
1029 }
1030 }
1031 }
1032 }
1033 }
1034 state_is_valid
1035 }
1036 };
1037
1038 if !state_is_valid {
1039 error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1040 for source in self.sources().iter_processed() {
1041 let Ok(processed_writer) = source.processed_writer() else {
1042 continue;
1043 };
1044 if let Err(err) = processed_writer
1045 .remove_assets_in_directory(Path::new(""))
1046 .await
1047 {
1048 panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1049 }
1050 }
1051 }
1052 }
1053 let mut log = self.data.log.write().await;
1054 *log = match ProcessorTransactionLog::new().await {
1055 Ok(log) => Some(log),
1056 Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1057 };
1058 }
1059}
1060
1061impl AssetProcessorData {
1062 pub fn new(source: AssetSources) -> Self {
1064 let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1065 let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1066 finished_sender.set_overflow(true);
1069 initialized_sender.set_overflow(true);
1070
1071 AssetProcessorData {
1072 sources: source,
1073 finished_sender,
1074 finished_receiver,
1075 initialized_sender,
1076 initialized_receiver,
1077 state: async_lock::RwLock::new(ProcessorState::Initializing),
1078 log: Default::default(),
1079 processors: Default::default(),
1080 asset_infos: Default::default(),
1081 default_processors: Default::default(),
1082 }
1083 }
1084
1085 pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1087 self.wait_until_initialized().await;
1088 let mut receiver = {
1089 let infos = self.asset_infos.write().await;
1090 let info = infos.get(&path);
1091 match info {
1092 Some(info) => match info.status {
1093 Some(result) => return result,
1094 None => info.status_receiver.clone(),
1096 },
1097 None => return ProcessStatus::NonExistent,
1098 }
1099 };
1100 receiver.recv().await.unwrap()
1101 }
1102
1103 pub async fn wait_until_initialized(&self) {
1105 let receiver = {
1106 let state = self.state.read().await;
1107 match *state {
1108 ProcessorState::Initializing => {
1109 Some(self.initialized_receiver.clone())
1111 }
1112 _ => None,
1113 }
1114 };
1115
1116 if let Some(mut receiver) = receiver {
1117 receiver.recv().await.unwrap();
1118 }
1119 }
1120
1121 pub async fn wait_until_finished(&self) {
1123 let receiver = {
1124 let state = self.state.read().await;
1125 match *state {
1126 ProcessorState::Initializing | ProcessorState::Processing => {
1127 Some(self.finished_receiver.clone())
1129 }
1130 ProcessorState::Finished => None,
1131 }
1132 };
1133
1134 if let Some(mut receiver) = receiver {
1135 receiver.recv().await.unwrap();
1136 }
1137 }
1138}
1139
1140#[cfg(feature = "trace")]
1141struct InstrumentedAssetProcessor<T>(T);
1142
1143#[cfg(feature = "trace")]
1144impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1145 type Settings = T::Settings;
1146 type OutputLoader = T::OutputLoader;
1147
1148 fn process(
1149 &self,
1150 context: &mut ProcessContext,
1151 meta: AssetMeta<(), Self>,
1152 writer: &mut crate::io::Writer,
1153 ) -> impl ConditionalSendFuture<
1154 Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1155 > {
1156 let meta = AssetMeta {
1158 meta_format_version: meta.meta_format_version,
1159 processed_info: meta.processed_info,
1160 asset: meta.asset,
1161 };
1162 let span = info_span!(
1163 "asset processing",
1164 processor = core::any::type_name::<T>(),
1165 asset = context.path().to_string(),
1166 );
1167 self.0.process(context, meta, writer).instrument(span)
1168 }
1169}
1170
1171#[derive(Debug, Clone)]
1173pub enum ProcessResult {
1174 Processed(ProcessedInfo),
1175 SkippedNotChanged,
1176 Ignored,
1177}
1178
1179#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1181pub enum ProcessStatus {
1182 Processed,
1183 Failed,
1184 NonExistent,
1185}
1186
1187#[derive(Debug)]
1189pub(crate) struct ProcessorAssetInfo {
1190 processed_info: Option<ProcessedInfo>,
1191 dependents: HashSet<AssetPath<'static>>,
1193 status: Option<ProcessStatus>,
1194 pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1204 status_sender: async_broadcast::Sender<ProcessStatus>,
1205 status_receiver: async_broadcast::Receiver<ProcessStatus>,
1206}
1207
1208impl Default for ProcessorAssetInfo {
1209 fn default() -> Self {
1210 let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1211 status_sender.set_overflow(true);
1214 Self {
1215 processed_info: Default::default(),
1216 dependents: Default::default(),
1217 file_transaction_lock: Default::default(),
1218 status: None,
1219 status_sender,
1220 status_receiver,
1221 }
1222 }
1223}
1224
1225impl ProcessorAssetInfo {
1226 async fn update_status(&mut self, status: ProcessStatus) {
1227 if self.status != Some(status) {
1228 self.status = Some(status);
1229 self.status_sender.broadcast(status).await.unwrap();
1230 }
1231 }
1232}
1233
1234#[derive(Default, Debug)]
1238pub struct ProcessorAssetInfos {
1239 infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1244 non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1249 check_reprocess_queue: VecDeque<AssetPath<'static>>,
1250}
1251
1252impl ProcessorAssetInfos {
1253 fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1254 self.infos.entry(asset_path.clone()).or_insert_with(|| {
1255 let mut info = ProcessorAssetInfo::default();
1256 if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1258 info.dependents = dependents;
1259 }
1260 info
1261 })
1262 }
1263
1264 pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1265 self.infos.get(asset_path)
1266 }
1267
1268 fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1269 self.infos.get_mut(asset_path)
1270 }
1271
1272 fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1273 if let Some(info) = self.get_mut(asset_path) {
1274 info.dependents.insert(dependent);
1275 } else {
1276 let dependents = self
1277 .non_existent_dependents
1278 .entry(asset_path.clone())
1279 .or_default();
1280 dependents.insert(dependent);
1281 }
1282 }
1283
1284 async fn finish_processing(
1286 &mut self,
1287 asset_path: AssetPath<'static>,
1288 result: Result<ProcessResult, ProcessError>,
1289 ) {
1290 match result {
1291 Ok(ProcessResult::Processed(processed_info)) => {
1292 debug!("Finished processing \"{}\"", asset_path);
1293 let old_processed_info = self
1295 .infos
1296 .get_mut(&asset_path)
1297 .and_then(|i| i.processed_info.take());
1298 if let Some(old_processed_info) = old_processed_info {
1299 self.clear_dependencies(&asset_path, old_processed_info);
1300 }
1301
1302 for process_dependency_info in &processed_info.process_dependencies {
1304 self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1305 }
1306 let info = self.get_or_insert(asset_path);
1307 info.processed_info = Some(processed_info);
1308 info.update_status(ProcessStatus::Processed).await;
1309 let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1310 for path in dependents {
1311 self.check_reprocess_queue.push_back(path);
1312 }
1313 }
1314 Ok(ProcessResult::SkippedNotChanged) => {
1315 debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1316 let info = self.get_mut(&asset_path).expect("info should exist");
1317 info.update_status(ProcessStatus::Processed).await;
1324 }
1325 Ok(ProcessResult::Ignored) => {
1326 debug!("Skipping processing (ignored) \"{}\"", asset_path);
1327 }
1328 Err(ProcessError::ExtensionRequired) => {
1329 }
1331 Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1332 trace!("No loader found for {asset_path}");
1333 }
1334 Err(ProcessError::AssetReaderError {
1335 err: AssetReaderError::NotFound(_),
1336 ..
1337 }) => {
1338 trace!("No need to process asset {asset_path} because it does not exist");
1340 }
1341 Err(err) => {
1342 error!("Failed to process asset {asset_path}: {err}");
1343 if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1345 err
1346 {
1347 let info = self.get_mut(&asset_path).expect("info should exist");
1348 info.processed_info = Some(ProcessedInfo {
1349 hash: AssetHash::default(),
1350 full_hash: AssetHash::default(),
1351 process_dependencies: vec![],
1352 });
1353 self.add_dependent(dependency.path(), asset_path.to_owned());
1354 }
1355
1356 let info = self.get_mut(&asset_path).expect("info should exist");
1357 info.update_status(ProcessStatus::Failed).await;
1358 }
1359 }
1360 }
1361
1362 async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1364 let info = self.infos.remove(asset_path);
1365 if let Some(info) = info {
1366 if let Some(processed_info) = info.processed_info {
1367 self.clear_dependencies(asset_path, processed_info);
1368 }
1369 info.status_sender
1371 .broadcast(ProcessStatus::NonExistent)
1372 .await
1373 .unwrap();
1374 if !info.dependents.is_empty() {
1375 error!(
1376 "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1377 info.dependents
1378 );
1379 self.non_existent_dependents
1380 .insert(asset_path.clone(), info.dependents);
1381 }
1382 }
1383 }
1384
1385 async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1387 let info = self.infos.remove(old);
1388 if let Some(mut info) = info {
1389 if !info.dependents.is_empty() {
1390 error!(
1398 "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1399 info.dependents
1400 );
1401 self.non_existent_dependents
1402 .insert(old.clone(), core::mem::take(&mut info.dependents));
1403 }
1404 if let Some(processed_info) = &info.processed_info {
1405 for dep in &processed_info.process_dependencies {
1407 if let Some(info) = self.infos.get_mut(&dep.path) {
1408 info.dependents.remove(old);
1409 info.dependents.insert(new.clone());
1410 } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1411 {
1412 dependents.remove(old);
1413 dependents.insert(new.clone());
1414 }
1415 }
1416 }
1417 info.status_sender
1419 .broadcast(ProcessStatus::NonExistent)
1420 .await
1421 .unwrap();
1422 let dependents: Vec<AssetPath<'static>> = {
1423 let new_info = self.get_or_insert(new.clone());
1424 new_info.processed_info = info.processed_info;
1425 new_info.status = info.status;
1426 if let Some(status) = new_info.status {
1428 new_info.status_sender.broadcast(status).await.unwrap();
1429 }
1430 new_info.dependents.iter().cloned().collect()
1431 };
1432 self.check_reprocess_queue.push_back(new.clone());
1434 for dependent in dependents {
1435 self.check_reprocess_queue.push_back(dependent);
1437 }
1438 }
1439 }
1440
1441 fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1442 for old_load_dep in removed_info.process_dependencies {
1443 if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1444 info.dependents.remove(asset_path);
1445 } else if let Some(dependents) =
1446 self.non_existent_dependents.get_mut(&old_load_dep.path)
1447 {
1448 dependents.remove(asset_path);
1449 }
1450 }
1451 }
1452}
1453
1454#[derive(Copy, Clone, PartialEq, Eq)]
1456pub enum ProcessorState {
1457 Initializing,
1461 Processing,
1463 Finished,
1465}
1466
1467#[derive(Error, Debug)]
1469pub enum InitializeError {
1470 #[error(transparent)]
1471 FailedToReadSourcePaths(AssetReaderError),
1472 #[error(transparent)]
1473 FailedToReadDestinationPaths(AssetReaderError),
1474 #[error("Failed to validate asset log: {0}")]
1475 ValidateLogError(#[from] ValidateLogError),
1476}