bevy_asset/processor/
mod.rs

1//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2//!
3//! You can think of the asset processing system as a "build system" for assets.
4//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6//!
7//! Its core values are:
8//!
9//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13//!
14//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15//! While it may be possible to manually edit the final asset, this should be discouraged.
16//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17//!
18//! # Usage
19//!
20//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22//!
23//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26//!
27//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28//!
29//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30//!
31//!  # Defining asset processors
32//!
33//! Bevy provides two different ways to define new asset processors:
34//!
35//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37//!
38//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40mod log;
41mod process;
42
43pub use log::*;
44pub use process::*;
45
46use crate::{
47    io::{
48        AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
49        AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
50        MissingAssetSourceError,
51    },
52    meta::{
53        get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54        AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55    },
56    AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57    MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, collections::VecDeque, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::collections::{HashMap, HashSet};
62use bevy_tasks::IoTaskPool;
63use futures_io::ErrorKind;
64use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
65use parking_lot::RwLock;
66use std::path::{Path, PathBuf};
67use thiserror::Error;
68use tracing::{debug, error, trace, warn};
69
70#[cfg(feature = "trace")]
71use {
72    alloc::string::ToString,
73    bevy_tasks::ConditionalSendFuture,
74    tracing::{info_span, instrument::Instrument},
75};
76
77/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
78/// processes them in some way, and writes them to a destination [`AssetSource`].
79///
80/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets that
81/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
82///
83/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
84///
85/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
86/// asset, which is used to determine if the asset source has actually changed.
87///
88/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
89/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
90///
91/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
92#[derive(Resource, Clone)]
93pub struct AssetProcessor {
94    server: AssetServer,
95    pub(crate) data: Arc<AssetProcessorData>,
96}
97
98/// Internal data stored inside an [`AssetProcessor`].
99pub struct AssetProcessorData {
100    pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
101    log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
102    processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
103    /// Default processors for file extensions
104    default_processors: RwLock<HashMap<Box<str>, &'static str>>,
105    state: async_lock::RwLock<ProcessorState>,
106    sources: AssetSources,
107    initialized_sender: async_broadcast::Sender<()>,
108    initialized_receiver: async_broadcast::Receiver<()>,
109    finished_sender: async_broadcast::Sender<()>,
110    finished_receiver: async_broadcast::Receiver<()>,
111}
112
113impl AssetProcessor {
114    /// Creates a new [`AssetProcessor`] instance.
115    pub fn new(source: &mut AssetSourceBuilders) -> Self {
116        let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
117        // The asset processor uses its own asset server with its own id space
118        let mut sources = source.build_sources(false, false);
119        sources.gate_on_processor(data.clone());
120        let server = AssetServer::new_with_meta_check(
121            sources,
122            AssetServerMode::Processed,
123            AssetMetaCheck::Always,
124            false,
125            UnapprovedPathMode::default(),
126        );
127        Self { server, data }
128    }
129
130    /// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
131    pub fn data(&self) -> &Arc<AssetProcessorData> {
132        &self.data
133    }
134
135    /// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
136    /// the main App. It has different processor-specific configuration and a different ID space.
137    pub fn server(&self) -> &AssetServer {
138        &self.server
139    }
140
141    async fn set_state(&self, state: ProcessorState) {
142        let mut state_guard = self.data.state.write().await;
143        let last_state = *state_guard;
144        *state_guard = state;
145        if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
146            self.data.finished_sender.broadcast(()).await.unwrap();
147        } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
148            self.data.initialized_sender.broadcast(()).await.unwrap();
149        }
150    }
151
152    /// Retrieves the current [`ProcessorState`]
153    pub async fn get_state(&self) -> ProcessorState {
154        *self.data.state.read().await
155    }
156
157    /// Retrieves the [`AssetSource`] for this processor
158    #[inline]
159    pub fn get_source<'a>(
160        &self,
161        id: impl Into<AssetSourceId<'a>>,
162    ) -> Result<&AssetSource, MissingAssetSourceError> {
163        self.data.sources.get(id.into())
164    }
165
166    #[inline]
167    pub fn sources(&self) -> &AssetSources {
168        &self.data.sources
169    }
170
171    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
172    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
173    async fn log_unrecoverable(&self) {
174        let mut log = self.data.log.write().await;
175        let log = log.as_mut().unwrap();
176        log.unrecoverable().await.unwrap();
177    }
178
179    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
180    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
181    async fn log_begin_processing(&self, path: &AssetPath<'_>) {
182        let mut log = self.data.log.write().await;
183        let log = log.as_mut().unwrap();
184        log.begin_processing(path).await.unwrap();
185    }
186
187    /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
188    async fn log_end_processing(&self, path: &AssetPath<'_>) {
189        let mut log = self.data.log.write().await;
190        let log = log.as_mut().unwrap();
191        log.end_processing(path).await.unwrap();
192    }
193
194    /// Starts the processor in a background thread.
195    pub fn start(_processor: Res<Self>) {
196        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
197        error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
198        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
199        {
200            let processor = _processor.clone();
201            std::thread::spawn(move || {
202                processor.process_assets();
203                bevy_tasks::block_on(processor.listen_for_source_change_events());
204            });
205        }
206    }
207
208    /// Processes all assets. This will:
209    /// * For each "processed [`AssetSource`]:
210    /// * Scan the [`ProcessorTransactionLog`] and recover from any failures detected
211    /// * Scan the processed [`AssetReader`](crate::io::AssetReader) to build the current view of
212    ///   already processed assets.
213    /// * Scan the unprocessed [`AssetReader`](crate::io::AssetReader) and remove any final
214    ///   processed assets that are invalid or no longer exist.
215    /// * For each asset in the unprocessed [`AssetReader`](crate::io::AssetReader), kick off a new
216    ///   "process job", which will process the asset
217    ///   (if the latest version of the asset has not been processed).
218    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
219    pub fn process_assets(&self) {
220        let start_time = std::time::Instant::now();
221        debug!("Processing Assets");
222        IoTaskPool::get().scope(|scope| {
223            scope.spawn(async move {
224                self.initialize().await.unwrap();
225                for source in self.sources().iter_processed() {
226                    self.process_assets_internal(scope, source, PathBuf::from(""))
227                        .await
228                        .unwrap();
229                }
230            });
231        });
232        // This must happen _after_ the scope resolves or it will happen "too early"
233        // Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
234        bevy_tasks::block_on(self.finish_processing_assets());
235        let end_time = std::time::Instant::now();
236        debug!("Processing finished in {:?}", end_time - start_time);
237    }
238
239    /// Listens for changes to assets in the source [`AssetSource`] and update state accordingly.
240    // PERF: parallelize change event processing
241    pub async fn listen_for_source_change_events(&self) {
242        debug!("Listening for changes to source assets");
243        loop {
244            let mut started_processing = false;
245
246            for source in self.data.sources.iter_processed() {
247                if let Some(receiver) = source.event_receiver() {
248                    for event in receiver.try_iter() {
249                        if !started_processing {
250                            self.set_state(ProcessorState::Processing).await;
251                            started_processing = true;
252                        }
253
254                        self.handle_asset_source_event(source, event).await;
255                    }
256                }
257            }
258
259            if started_processing {
260                self.finish_processing_assets().await;
261            }
262        }
263    }
264
265    /// Writes the default meta file for the provided `path`.
266    ///
267    /// This function generates the appropriate meta file to process `path` with the default
268    /// processor. If there is no default processor, it falls back to the default loader.
269    ///
270    /// Note if there is already a meta file for `path`, this function returns
271    /// `Err(WriteDefaultMetaError::MetaAlreadyExists)`.
272    pub async fn write_default_meta_file_for_path(
273        &self,
274        path: impl Into<AssetPath<'_>>,
275    ) -> Result<(), WriteDefaultMetaError> {
276        let path = path.into();
277        let Some(processor) = path
278            .get_full_extension()
279            .and_then(|extension| self.get_default_processor(&extension))
280        else {
281            return self
282                .server
283                .write_default_loader_meta_file_for_path(path)
284                .await;
285        };
286
287        let meta = processor.default_meta();
288        let serialized_meta = meta.serialize();
289
290        let source = self.get_source(path.source())?;
291
292        // Note: we get the reader rather than the processed reader, since we want to write the meta
293        // file for the unprocessed version of that asset (so it will be processed by the default
294        // processor).
295        let reader = source.reader();
296        match reader.read_meta_bytes(path.path()).await {
297            Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
298            Err(AssetReaderError::NotFound(_)) => {
299                // The meta file couldn't be found so just fall through.
300            }
301            Err(AssetReaderError::Io(err)) => {
302                return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
303            }
304            Err(AssetReaderError::HttpError(err)) => {
305                return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
306            }
307        }
308
309        let writer = source.writer()?;
310        writer
311            .write_meta_bytes(path.path(), &serialized_meta)
312            .await?;
313
314        Ok(())
315    }
316
317    async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
318        trace!("{event:?}");
319        match event {
320            AssetSourceEvent::AddedAsset(path)
321            | AssetSourceEvent::AddedMeta(path)
322            | AssetSourceEvent::ModifiedAsset(path)
323            | AssetSourceEvent::ModifiedMeta(path) => {
324                self.process_asset(source, path).await;
325            }
326            AssetSourceEvent::RemovedAsset(path) => {
327                self.handle_removed_asset(source, path).await;
328            }
329            AssetSourceEvent::RemovedMeta(path) => {
330                self.handle_removed_meta(source, path).await;
331            }
332            AssetSourceEvent::AddedFolder(path) => {
333                self.handle_added_folder(source, path).await;
334            }
335            // NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
336            // touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
337            // Currently this event handler is not parallel, but it could be (and likely should be) in the future.
338            AssetSourceEvent::RemovedFolder(path) => {
339                self.handle_removed_folder(source, &path).await;
340            }
341            AssetSourceEvent::RenamedAsset { old, new } => {
342                // If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
343                // Sometimes this event is returned when an asset is moved "back" into the asset folder
344                if old == new {
345                    self.process_asset(source, new).await;
346                } else {
347                    self.handle_renamed_asset(source, old, new).await;
348                }
349            }
350            AssetSourceEvent::RenamedMeta { old, new } => {
351                // If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
352                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
353                if old == new {
354                    self.process_asset(source, new).await;
355                } else {
356                    debug!("Meta renamed from {old:?} to {new:?}");
357                    let mut infos = self.data.asset_infos.write().await;
358                    // Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
359                    // if they should be re-imported (and/or have new meta generated)
360                    let new_asset_path = AssetPath::from(new).with_source(source.id());
361                    let old_asset_path = AssetPath::from(old).with_source(source.id());
362                    infos.check_reprocess_queue.push_back(old_asset_path);
363                    infos.check_reprocess_queue.push_back(new_asset_path);
364                }
365            }
366            AssetSourceEvent::RenamedFolder { old, new } => {
367                // If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
368                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
369                if old == new {
370                    self.handle_added_folder(source, new).await;
371                } else {
372                    // PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
373                    // requires some nuance when it comes to path handling.
374                    self.handle_removed_folder(source, &old).await;
375                    self.handle_added_folder(source, new).await;
376                }
377            }
378            AssetSourceEvent::RemovedUnknown { path, is_meta } => {
379                let processed_reader = source.processed_reader().unwrap();
380                match processed_reader.is_directory(&path).await {
381                    Ok(is_directory) => {
382                        if is_directory {
383                            self.handle_removed_folder(source, &path).await;
384                        } else if is_meta {
385                            self.handle_removed_meta(source, path).await;
386                        } else {
387                            self.handle_removed_asset(source, path).await;
388                        }
389                    }
390                    Err(err) => {
391                        match err {
392                            AssetReaderError::NotFound(_) => {
393                                // if the path is not found, a processed version does not exist
394                            }
395                            AssetReaderError::Io(err) => {
396                                error!(
397                                    "Path '{}' was removed, but the destination reader could not determine if it \
398                                    was a folder or a file due to the following error: {err}",
399                                    AssetPath::from_path(&path).with_source(source.id())
400                                );
401                            }
402                            AssetReaderError::HttpError(status) => {
403                                error!(
404                                    "Path '{}' was removed, but the destination reader could not determine if it \
405                                    was a folder or a file due to receiving an unexpected HTTP Status {status}",
406                                    AssetPath::from_path(&path).with_source(source.id())
407                                );
408                            }
409                        }
410                    }
411                }
412            }
413        }
414    }
415
416    async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
417        debug!(
418            "Folder {} was added. Attempting to re-process",
419            AssetPath::from_path(&path).with_source(source.id())
420        );
421        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
422        error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
423        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
424        IoTaskPool::get().scope(|scope| {
425            scope.spawn(async move {
426                self.process_assets_internal(scope, source, path)
427                    .await
428                    .unwrap();
429            });
430        });
431    }
432
433    /// Responds to a removed meta event by reprocessing the asset at the given path.
434    async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
435        // If meta was removed, we might need to regenerate it.
436        // Likewise, the user might be manually re-adding the asset.
437        // Therefore, we shouldn't automatically delete the asset ... that is a
438        // user-initiated action.
439        debug!(
440            "Meta for asset {} was removed. Attempting to re-process",
441            AssetPath::from_path(&path).with_source(source.id())
442        );
443        self.process_asset(source, path).await;
444    }
445
446    /// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
447    async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
448        debug!(
449            "Removing folder {} because source was removed",
450            path.display()
451        );
452        let processed_reader = source.processed_reader().unwrap();
453        match processed_reader.read_directory(path).await {
454            Ok(mut path_stream) => {
455                while let Some(child_path) = path_stream.next().await {
456                    self.handle_removed_asset(source, child_path).await;
457                }
458            }
459            Err(err) => match err {
460                AssetReaderError::NotFound(_err) => {
461                    // The processed folder does not exist. No need to update anything
462                }
463                AssetReaderError::HttpError(status) => {
464                    self.log_unrecoverable().await;
465                    error!(
466                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
467                        in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
468                    );
469                }
470                AssetReaderError::Io(err) => {
471                    self.log_unrecoverable().await;
472                    error!(
473                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
474                        in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
475                    );
476                }
477            },
478        }
479        let processed_writer = source.processed_writer().unwrap();
480        if let Err(err) = processed_writer.remove_directory(path).await {
481            match err {
482                AssetWriterError::Io(err) => {
483                    // we can ignore NotFound because if the "final" file in a folder was removed
484                    // then we automatically clean up this folder
485                    if err.kind() != ErrorKind::NotFound {
486                        let asset_path = AssetPath::from_path(path).with_source(source.id());
487                        error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
488                    }
489                }
490            }
491        }
492    }
493
494    /// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
495    /// asset have finished, thanks to the `file_transaction_lock`.
496    async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
497        let asset_path = AssetPath::from(path).with_source(source.id());
498        debug!("Removing processed {asset_path} because source was removed");
499        let mut infos = self.data.asset_infos.write().await;
500        if let Some(info) = infos.get(&asset_path) {
501            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
502            // can finish their operations
503            let _write_lock = info.file_transaction_lock.write();
504            self.remove_processed_asset_and_meta(source, asset_path.path())
505                .await;
506        }
507        infos.remove(&asset_path).await;
508    }
509
510    /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
511    /// This will cause direct path dependencies to break.
512    async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
513        let mut infos = self.data.asset_infos.write().await;
514        let old = AssetPath::from(old).with_source(source.id());
515        let new = AssetPath::from(new).with_source(source.id());
516        let processed_writer = source.processed_writer().unwrap();
517        if let Some(info) = infos.get(&old) {
518            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
519            // can finish their operations
520            let _write_lock = info.file_transaction_lock.write();
521            processed_writer
522                .rename(old.path(), new.path())
523                .await
524                .unwrap();
525            processed_writer
526                .rename_meta(old.path(), new.path())
527                .await
528                .unwrap();
529        }
530        infos.rename(&old, &new).await;
531    }
532
533    async fn finish_processing_assets(&self) {
534        self.try_reprocessing_queued().await;
535        // clean up metadata in asset server
536        self.server.data.infos.write().consume_handle_drop_events();
537        self.set_state(ProcessorState::Finished).await;
538    }
539
540    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
541    async fn process_assets_internal<'scope>(
542        &'scope self,
543        scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
544        source: &'scope AssetSource,
545        path: PathBuf,
546    ) -> Result<(), AssetReaderError> {
547        if source.reader().is_directory(&path).await? {
548            let mut path_stream = source.reader().read_directory(&path).await?;
549            while let Some(path) = path_stream.next().await {
550                Box::pin(self.process_assets_internal(scope, source, path)).await?;
551            }
552        } else {
553            // Files without extensions are skipped
554            let processor = self.clone();
555            scope.spawn(async move {
556                processor.process_asset(source, path).await;
557            });
558        }
559        Ok(())
560    }
561
562    async fn try_reprocessing_queued(&self) {
563        loop {
564            let mut check_reprocess_queue =
565                core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
566            IoTaskPool::get().scope(|scope| {
567                for path in check_reprocess_queue.drain(..) {
568                    let processor = self.clone();
569                    let source = self.get_source(path.source()).unwrap();
570                    scope.spawn(async move {
571                        processor.process_asset(source, path.into()).await;
572                    });
573                }
574            });
575            let infos = self.data.asset_infos.read().await;
576            if infos.check_reprocess_queue.is_empty() {
577                break;
578            }
579        }
580    }
581
582    /// Register a new asset processor.
583    pub fn register_processor<P: Process>(&self, processor: P) {
584        let mut process_plans = self.data.processors.write();
585        #[cfg(feature = "trace")]
586        let processor = InstrumentedAssetProcessor(processor);
587        process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
588    }
589
590    /// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
591    pub fn set_default_processor<P: Process>(&self, extension: &str) {
592        let mut default_processors = self.data.default_processors.write();
593        default_processors.insert(extension.into(), core::any::type_name::<P>());
594    }
595
596    /// Returns the default processor for the given `extension`, if it exists.
597    pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
598        let default_processors = self.data.default_processors.read();
599        let key = default_processors.get(extension)?;
600        self.data.processors.read().get(key).cloned()
601    }
602
603    /// Returns the processor with the given `processor_type_name`, if it exists.
604    pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
605        let processors = self.data.processors.read();
606        processors.get(processor_type_name).cloned()
607    }
608
609    /// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
610    /// This info will later be used to determine whether or not to re-process an asset
611    ///
612    /// This will validate transactions and recover failed transactions when necessary.
613    #[cfg_attr(
614        any(target_arch = "wasm32", not(feature = "multi_threaded")),
615        expect(
616            dead_code,
617            reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
618        )
619    )]
620    async fn initialize(&self) -> Result<(), InitializeError> {
621        self.validate_transaction_log_and_recover().await;
622        let mut asset_infos = self.data.asset_infos.write().await;
623
624        /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
625        /// folders when they are discovered.
626        async fn get_asset_paths(
627            reader: &dyn ErasedAssetReader,
628            clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
629            path: PathBuf,
630            paths: &mut Vec<PathBuf>,
631        ) -> Result<bool, AssetReaderError> {
632            if reader.is_directory(&path).await? {
633                let mut path_stream = reader.read_directory(&path).await?;
634                let mut contains_files = false;
635
636                while let Some(child_path) = path_stream.next().await {
637                    contains_files |= Box::pin(get_asset_paths(
638                        reader,
639                        clean_empty_folders_writer,
640                        child_path,
641                        paths,
642                    ))
643                    .await?;
644                }
645                if !contains_files && path.parent().is_some() {
646                    if let Some(writer) = clean_empty_folders_writer {
647                        // it is ok for this to fail as it is just a cleanup job.
648                        let _ = writer.remove_empty_directory(&path).await;
649                    }
650                }
651                Ok(contains_files)
652            } else {
653                paths.push(path);
654                Ok(true)
655            }
656        }
657
658        for source in self.sources().iter_processed() {
659            let Ok(processed_reader) = source.processed_reader() else {
660                continue;
661            };
662            let Ok(processed_writer) = source.processed_writer() else {
663                continue;
664            };
665            let mut unprocessed_paths = Vec::new();
666            get_asset_paths(
667                source.reader(),
668                None,
669                PathBuf::from(""),
670                &mut unprocessed_paths,
671            )
672            .await
673            .map_err(InitializeError::FailedToReadSourcePaths)?;
674
675            let mut processed_paths = Vec::new();
676            get_asset_paths(
677                processed_reader,
678                Some(processed_writer),
679                PathBuf::from(""),
680                &mut processed_paths,
681            )
682            .await
683            .map_err(InitializeError::FailedToReadDestinationPaths)?;
684
685            for path in unprocessed_paths {
686                asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
687            }
688
689            for path in processed_paths {
690                let mut dependencies = Vec::new();
691                let asset_path = AssetPath::from(path).with_source(source.id());
692                if let Some(info) = asset_infos.get_mut(&asset_path) {
693                    match processed_reader.read_meta_bytes(asset_path.path()).await {
694                        Ok(meta_bytes) => {
695                            match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
696                                Ok(minimal) => {
697                                    trace!(
698                                        "Populated processed info for asset {asset_path} {:?}",
699                                        minimal.processed_info
700                                    );
701
702                                    if let Some(processed_info) = &minimal.processed_info {
703                                        for process_dependency_info in
704                                            &processed_info.process_dependencies
705                                        {
706                                            dependencies.push(process_dependency_info.path.clone());
707                                        }
708                                    }
709                                    info.processed_info = minimal.processed_info;
710                                }
711                                Err(err) => {
712                                    trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
713                                    self.remove_processed_asset_and_meta(source, asset_path.path())
714                                        .await;
715                                }
716                            }
717                        }
718                        Err(err) => {
719                            trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
720                            self.remove_processed_asset_and_meta(source, asset_path.path())
721                                .await;
722                        }
723                    }
724                } else {
725                    trace!("Removing processed data for non-existent asset {asset_path}");
726                    self.remove_processed_asset_and_meta(source, asset_path.path())
727                        .await;
728                }
729
730                for dependency in dependencies {
731                    asset_infos.add_dependent(&dependency, asset_path.clone());
732                }
733            }
734        }
735
736        self.set_state(ProcessorState::Processing).await;
737
738        Ok(())
739    }
740
741    /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
742    /// does it remove existing in-memory metadata.
743    async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
744        if let Err(err) = source.processed_writer().unwrap().remove(path).await {
745            warn!("Failed to remove non-existent asset {path:?}: {err}");
746        }
747
748        if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
749            warn!("Failed to remove non-existent meta {path:?}: {err}");
750        }
751
752        self.clean_empty_processed_ancestor_folders(source, path)
753            .await;
754    }
755
756    async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
757        // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
758        if path.is_absolute() {
759            error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
760            return;
761        }
762        while let Some(parent) = path.parent() {
763            if parent == Path::new("") {
764                break;
765            }
766            if source
767                .processed_writer()
768                .unwrap()
769                .remove_empty_directory(parent)
770                .await
771                .is_err()
772            {
773                // if we fail to delete a folder, stop walking up the tree
774                break;
775            }
776        }
777    }
778
779    /// Processes the asset (if it has not already been processed or the asset source has changed).
780    /// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
781    /// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
782    /// to block reads until the asset is processed).
783    ///
784    /// [`LoadContext`]: crate::loader::LoadContext
785    /// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
786    async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
787        let asset_path = AssetPath::from(path).with_source(source.id());
788        let result = self.process_asset_internal(source, &asset_path).await;
789        let mut infos = self.data.asset_infos.write().await;
790        infos.finish_processing(asset_path, result).await;
791    }
792
793    async fn process_asset_internal(
794        &self,
795        source: &AssetSource,
796        asset_path: &AssetPath<'static>,
797    ) -> Result<ProcessResult, ProcessError> {
798        // TODO: The extension check was removed now that AssetPath is the input. is that ok?
799        // TODO: check if already processing to protect against duplicate hot-reload events
800        debug!("Processing {}", asset_path);
801        let server = &self.server;
802        let path = asset_path.path();
803        let reader = source.reader();
804
805        let reader_err = |err| ProcessError::AssetReaderError {
806            path: asset_path.clone(),
807            err,
808        };
809        let writer_err = |err| ProcessError::AssetWriterError {
810            path: asset_path.clone(),
811            err,
812        };
813
814        // Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
815        let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
816
817        let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
818            Ok(meta_bytes) => {
819                let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
820                    ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
821                })?;
822                let (meta, processor) = match minimal.asset {
823                    AssetActionMinimal::Load { loader } => {
824                        let loader = server.get_asset_loader_with_type_name(&loader).await?;
825                        let meta = loader.deserialize_meta(&meta_bytes)?;
826                        (meta, None)
827                    }
828                    AssetActionMinimal::Process { processor } => {
829                        let processor = self
830                            .get_processor(&processor)
831                            .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
832                        let meta = processor.deserialize_meta(&meta_bytes)?;
833                        (meta, Some(processor))
834                    }
835                    AssetActionMinimal::Ignore => {
836                        return Ok(ProcessResult::Ignored);
837                    }
838                };
839                (meta, meta_bytes, processor)
840            }
841            Err(AssetReaderError::NotFound(_path)) => {
842                let (meta, processor) = if let Some(processor) = asset_path
843                    .get_full_extension()
844                    .and_then(|ext| self.get_default_processor(&ext))
845                {
846                    let meta = processor.default_meta();
847                    (meta, Some(processor))
848                } else {
849                    match server.get_path_asset_loader(asset_path.clone()).await {
850                        Ok(loader) => (loader.default_meta(), None),
851                        Err(MissingAssetLoaderForExtensionError { .. }) => {
852                            let meta: Box<dyn AssetMetaDyn> =
853                                Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
854                            (meta, None)
855                        }
856                    }
857                };
858                let meta_bytes = meta.serialize();
859                (meta, meta_bytes, processor)
860            }
861            Err(err) => {
862                return Err(ProcessError::ReadAssetMetaError {
863                    path: asset_path.clone(),
864                    err,
865                })
866            }
867        };
868
869        let processed_writer = source.processed_writer()?;
870
871        let mut asset_bytes = Vec::new();
872        byte_reader
873            .read_to_end(&mut asset_bytes)
874            .await
875            .map_err(|e| ProcessError::AssetReaderError {
876                path: asset_path.clone(),
877                err: AssetReaderError::Io(e.into()),
878            })?;
879
880        // PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
881        // The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
882        // Hard to say which is worse
883        let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
884        let mut new_processed_info = ProcessedInfo {
885            hash: new_hash,
886            full_hash: new_hash,
887            process_dependencies: Vec::new(),
888        };
889
890        {
891            let infos = self.data.asset_infos.read().await;
892            if let Some(current_processed_info) = infos
893                .get(asset_path)
894                .and_then(|i| i.processed_info.as_ref())
895            {
896                if current_processed_info.hash == new_hash {
897                    let mut dependency_changed = false;
898                    for current_dep_info in &current_processed_info.process_dependencies {
899                        let live_hash = infos
900                            .get(&current_dep_info.path)
901                            .and_then(|i| i.processed_info.as_ref())
902                            .map(|i| i.full_hash);
903                        if live_hash != Some(current_dep_info.full_hash) {
904                            dependency_changed = true;
905                            break;
906                        }
907                    }
908                    if !dependency_changed {
909                        return Ok(ProcessResult::SkippedNotChanged);
910                    }
911                }
912            }
913        }
914        // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
915        // See ProcessedAssetInfo::file_transaction_lock docs for more info
916        let _transaction_lock = {
917            let mut infos = self.data.asset_infos.write().await;
918            let info = infos.get_or_insert(asset_path.clone());
919            info.file_transaction_lock.write_arc().await
920        };
921
922        // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
923        // Directly writing to the asset destination in the processor necessitates this behavior
924        // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
925        self.log_begin_processing(asset_path).await;
926        if let Some(processor) = processor {
927            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
928            let mut processed_meta = {
929                let mut context =
930                    ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
931                processor
932                    .process(&mut context, source_meta, &mut *writer)
933                    .await?
934            };
935
936            writer
937                .flush()
938                .await
939                .map_err(|e| ProcessError::AssetWriterError {
940                    path: asset_path.clone(),
941                    err: AssetWriterError::Io(e),
942                })?;
943
944            let full_hash = get_full_asset_hash(
945                new_hash,
946                new_processed_info
947                    .process_dependencies
948                    .iter()
949                    .map(|i| i.full_hash),
950            );
951            new_processed_info.full_hash = full_hash;
952            *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
953            let meta_bytes = processed_meta.serialize();
954            processed_writer
955                .write_meta_bytes(path, &meta_bytes)
956                .await
957                .map_err(writer_err)?;
958        } else {
959            processed_writer
960                .write_bytes(path, &asset_bytes)
961                .await
962                .map_err(writer_err)?;
963            *source_meta.processed_info_mut() = Some(new_processed_info.clone());
964            let meta_bytes = source_meta.serialize();
965            processed_writer
966                .write_meta_bytes(path, &meta_bytes)
967                .await
968                .map_err(writer_err)?;
969        }
970        self.log_end_processing(asset_path).await;
971
972        Ok(ProcessResult::Processed(new_processed_info))
973    }
974
975    async fn validate_transaction_log_and_recover(&self) {
976        if let Err(err) = ProcessorTransactionLog::validate().await {
977            let state_is_valid = match err {
978                ValidateLogError::ReadLogError(err) => {
979                    error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
980                    false
981                }
982                ValidateLogError::UnrecoverableError => {
983                    error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
984                    false
985                }
986                ValidateLogError::EntryErrors(entry_errors) => {
987                    let mut state_is_valid = true;
988                    for entry_error in entry_errors {
989                        match entry_error {
990                            LogEntryError::DuplicateTransaction(_)
991                            | LogEntryError::EndedMissingTransaction(_) => {
992                                error!("{}", entry_error);
993                                state_is_valid = false;
994                                break;
995                            }
996                            LogEntryError::UnfinishedTransaction(path) => {
997                                debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
998                                let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
999                                    error!("Failed to remove asset {path:?}: {message}");
1000                                    state_is_valid = false;
1001                                };
1002                                let Ok(source) = self.get_source(path.source()) else {
1003                                    unrecoverable_err(&"AssetSource does not exist");
1004                                    continue;
1005                                };
1006                                let Ok(processed_writer) = source.processed_writer() else {
1007                                    unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1008                                    continue;
1009                                };
1010
1011                                if let Err(err) = processed_writer.remove(path.path()).await {
1012                                    match err {
1013                                        AssetWriterError::Io(err) => {
1014                                            // any error but NotFound means we could be in a bad state
1015                                            if err.kind() != ErrorKind::NotFound {
1016                                                unrecoverable_err(&err);
1017                                            }
1018                                        }
1019                                    }
1020                                }
1021                                if let Err(err) = processed_writer.remove_meta(path.path()).await {
1022                                    match err {
1023                                        AssetWriterError::Io(err) => {
1024                                            // any error but NotFound means we could be in a bad state
1025                                            if err.kind() != ErrorKind::NotFound {
1026                                                unrecoverable_err(&err);
1027                                            }
1028                                        }
1029                                    }
1030                                }
1031                            }
1032                        }
1033                    }
1034                    state_is_valid
1035                }
1036            };
1037
1038            if !state_is_valid {
1039                error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1040                for source in self.sources().iter_processed() {
1041                    let Ok(processed_writer) = source.processed_writer() else {
1042                        continue;
1043                    };
1044                    if let Err(err) = processed_writer
1045                        .remove_assets_in_directory(Path::new(""))
1046                        .await
1047                    {
1048                        panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1049                    }
1050                }
1051            }
1052        }
1053        let mut log = self.data.log.write().await;
1054        *log = match ProcessorTransactionLog::new().await {
1055            Ok(log) => Some(log),
1056            Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1057        };
1058    }
1059}
1060
1061impl AssetProcessorData {
1062    /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1063    pub fn new(source: AssetSources) -> Self {
1064        let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1065        let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1066        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1067        // not block if there was older state present.
1068        finished_sender.set_overflow(true);
1069        initialized_sender.set_overflow(true);
1070
1071        AssetProcessorData {
1072            sources: source,
1073            finished_sender,
1074            finished_receiver,
1075            initialized_sender,
1076            initialized_receiver,
1077            state: async_lock::RwLock::new(ProcessorState::Initializing),
1078            log: Default::default(),
1079            processors: Default::default(),
1080            asset_infos: Default::default(),
1081            default_processors: Default::default(),
1082        }
1083    }
1084
1085    /// Returns a future that will not finish until the path has been processed.
1086    pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1087        self.wait_until_initialized().await;
1088        let mut receiver = {
1089            let infos = self.asset_infos.write().await;
1090            let info = infos.get(&path);
1091            match info {
1092                Some(info) => match info.status {
1093                    Some(result) => return result,
1094                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1095                    None => info.status_receiver.clone(),
1096                },
1097                None => return ProcessStatus::NonExistent,
1098            }
1099        };
1100        receiver.recv().await.unwrap()
1101    }
1102
1103    /// Returns a future that will not finish until the processor has been initialized.
1104    pub async fn wait_until_initialized(&self) {
1105        let receiver = {
1106            let state = self.state.read().await;
1107            match *state {
1108                ProcessorState::Initializing => {
1109                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1110                    Some(self.initialized_receiver.clone())
1111                }
1112                _ => None,
1113            }
1114        };
1115
1116        if let Some(mut receiver) = receiver {
1117            receiver.recv().await.unwrap();
1118        }
1119    }
1120
1121    /// Returns a future that will not finish until processing has finished.
1122    pub async fn wait_until_finished(&self) {
1123        let receiver = {
1124            let state = self.state.read().await;
1125            match *state {
1126                ProcessorState::Initializing | ProcessorState::Processing => {
1127                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1128                    Some(self.finished_receiver.clone())
1129                }
1130                ProcessorState::Finished => None,
1131            }
1132        };
1133
1134        if let Some(mut receiver) = receiver {
1135            receiver.recv().await.unwrap();
1136        }
1137    }
1138}
1139
1140#[cfg(feature = "trace")]
1141struct InstrumentedAssetProcessor<T>(T);
1142
1143#[cfg(feature = "trace")]
1144impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1145    type Settings = T::Settings;
1146    type OutputLoader = T::OutputLoader;
1147
1148    fn process(
1149        &self,
1150        context: &mut ProcessContext,
1151        meta: AssetMeta<(), Self>,
1152        writer: &mut crate::io::Writer,
1153    ) -> impl ConditionalSendFuture<
1154        Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1155    > {
1156        // Change the processor type for the `AssetMeta`, which works because we share the `Settings` type.
1157        let meta = AssetMeta {
1158            meta_format_version: meta.meta_format_version,
1159            processed_info: meta.processed_info,
1160            asset: meta.asset,
1161        };
1162        let span = info_span!(
1163            "asset processing",
1164            processor = core::any::type_name::<T>(),
1165            asset = context.path().to_string(),
1166        );
1167        self.0.process(context, meta, writer).instrument(span)
1168    }
1169}
1170
1171/// The (successful) result of processing an asset
1172#[derive(Debug, Clone)]
1173pub enum ProcessResult {
1174    Processed(ProcessedInfo),
1175    SkippedNotChanged,
1176    Ignored,
1177}
1178
1179/// The final status of processing an asset
1180#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1181pub enum ProcessStatus {
1182    Processed,
1183    Failed,
1184    NonExistent,
1185}
1186
1187// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1188#[derive(Debug)]
1189pub(crate) struct ProcessorAssetInfo {
1190    processed_info: Option<ProcessedInfo>,
1191    /// Paths of assets that depend on this asset when they are being processed.
1192    dependents: HashSet<AssetPath<'static>>,
1193    status: Option<ProcessStatus>,
1194    /// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1195    /// _This lock must be locked whenever a read or write to processed assets occurs_
1196    /// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1197    /// * when the processor is running in parallel with an app
1198    /// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1199    ///     * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1200    ///
1201    /// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1202    /// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1203    pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1204    status_sender: async_broadcast::Sender<ProcessStatus>,
1205    status_receiver: async_broadcast::Receiver<ProcessStatus>,
1206}
1207
1208impl Default for ProcessorAssetInfo {
1209    fn default() -> Self {
1210        let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1211        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1212        // not block if there was older state present.
1213        status_sender.set_overflow(true);
1214        Self {
1215            processed_info: Default::default(),
1216            dependents: Default::default(),
1217            file_transaction_lock: Default::default(),
1218            status: None,
1219            status_sender,
1220            status_receiver,
1221        }
1222    }
1223}
1224
1225impl ProcessorAssetInfo {
1226    async fn update_status(&mut self, status: ProcessStatus) {
1227        if self.status != Some(status) {
1228            self.status = Some(status);
1229            self.status_sender.broadcast(status).await.unwrap();
1230        }
1231    }
1232}
1233
1234/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1235/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1236/// consistent as events are processed.
1237#[derive(Default, Debug)]
1238pub struct ProcessorAssetInfos {
1239    /// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1240    /// be considered non-existent.
1241    /// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1242    /// `non_existent_dependents` DATA IS CONSUMED
1243    infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1244    /// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1245    /// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1246    /// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1247    /// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1248    non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1249    check_reprocess_queue: VecDeque<AssetPath<'static>>,
1250}
1251
1252impl ProcessorAssetInfos {
1253    fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1254        self.infos.entry(asset_path.clone()).or_insert_with(|| {
1255            let mut info = ProcessorAssetInfo::default();
1256            // track existing dependents by resolving existing "hanging" dependents.
1257            if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1258                info.dependents = dependents;
1259            }
1260            info
1261        })
1262    }
1263
1264    pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1265        self.infos.get(asset_path)
1266    }
1267
1268    fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1269        self.infos.get_mut(asset_path)
1270    }
1271
1272    fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1273        if let Some(info) = self.get_mut(asset_path) {
1274            info.dependents.insert(dependent);
1275        } else {
1276            let dependents = self
1277                .non_existent_dependents
1278                .entry(asset_path.clone())
1279                .or_default();
1280            dependents.insert(dependent);
1281        }
1282    }
1283
1284    /// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1285    async fn finish_processing(
1286        &mut self,
1287        asset_path: AssetPath<'static>,
1288        result: Result<ProcessResult, ProcessError>,
1289    ) {
1290        match result {
1291            Ok(ProcessResult::Processed(processed_info)) => {
1292                debug!("Finished processing \"{}\"", asset_path);
1293                // clean up old dependents
1294                let old_processed_info = self
1295                    .infos
1296                    .get_mut(&asset_path)
1297                    .and_then(|i| i.processed_info.take());
1298                if let Some(old_processed_info) = old_processed_info {
1299                    self.clear_dependencies(&asset_path, old_processed_info);
1300                }
1301
1302                // populate new dependents
1303                for process_dependency_info in &processed_info.process_dependencies {
1304                    self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1305                }
1306                let info = self.get_or_insert(asset_path);
1307                info.processed_info = Some(processed_info);
1308                info.update_status(ProcessStatus::Processed).await;
1309                let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1310                for path in dependents {
1311                    self.check_reprocess_queue.push_back(path);
1312                }
1313            }
1314            Ok(ProcessResult::SkippedNotChanged) => {
1315                debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1316                let info = self.get_mut(&asset_path).expect("info should exist");
1317                // NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1318                // of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1319                // This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1320                // Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1321                // If "block until latest state is reflected" is required, we can easily add a less granular
1322                // "block until first pass finished" mode
1323                info.update_status(ProcessStatus::Processed).await;
1324            }
1325            Ok(ProcessResult::Ignored) => {
1326                debug!("Skipping processing (ignored) \"{}\"", asset_path);
1327            }
1328            Err(ProcessError::ExtensionRequired) => {
1329                // Skip assets without extensions
1330            }
1331            Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1332                trace!("No loader found for {asset_path}");
1333            }
1334            Err(ProcessError::AssetReaderError {
1335                err: AssetReaderError::NotFound(_),
1336                ..
1337            }) => {
1338                // if there is no asset source, no processing can be done
1339                trace!("No need to process asset {asset_path} because it does not exist");
1340            }
1341            Err(err) => {
1342                error!("Failed to process asset {asset_path}: {err}");
1343                // if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1344                if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1345                    err
1346                {
1347                    let info = self.get_mut(&asset_path).expect("info should exist");
1348                    info.processed_info = Some(ProcessedInfo {
1349                        hash: AssetHash::default(),
1350                        full_hash: AssetHash::default(),
1351                        process_dependencies: vec![],
1352                    });
1353                    self.add_dependent(dependency.path(), asset_path.to_owned());
1354                }
1355
1356                let info = self.get_mut(&asset_path).expect("info should exist");
1357                info.update_status(ProcessStatus::Failed).await;
1358            }
1359        }
1360    }
1361
1362    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1363    async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1364        let info = self.infos.remove(asset_path);
1365        if let Some(info) = info {
1366            if let Some(processed_info) = info.processed_info {
1367                self.clear_dependencies(asset_path, processed_info);
1368            }
1369            // Tell all listeners this asset does not exist
1370            info.status_sender
1371                .broadcast(ProcessStatus::NonExistent)
1372                .await
1373                .unwrap();
1374            if !info.dependents.is_empty() {
1375                error!(
1376                    "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1377                    info.dependents
1378                );
1379                self.non_existent_dependents
1380                    .insert(asset_path.clone(), info.dependents);
1381            }
1382        }
1383    }
1384
1385    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1386    async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1387        let info = self.infos.remove(old);
1388        if let Some(mut info) = info {
1389            if !info.dependents.is_empty() {
1390                // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1391                // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1392                // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1393                // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1394                // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1395                // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1396                // (see the remove impl).
1397                error!(
1398                    "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1399                    info.dependents
1400                );
1401                self.non_existent_dependents
1402                    .insert(old.clone(), core::mem::take(&mut info.dependents));
1403            }
1404            if let Some(processed_info) = &info.processed_info {
1405                // Update "dependent" lists for this asset's "process dependencies" to use new path.
1406                for dep in &processed_info.process_dependencies {
1407                    if let Some(info) = self.infos.get_mut(&dep.path) {
1408                        info.dependents.remove(old);
1409                        info.dependents.insert(new.clone());
1410                    } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1411                    {
1412                        dependents.remove(old);
1413                        dependents.insert(new.clone());
1414                    }
1415                }
1416            }
1417            // Tell all listeners this asset no longer exists
1418            info.status_sender
1419                .broadcast(ProcessStatus::NonExistent)
1420                .await
1421                .unwrap();
1422            let dependents: Vec<AssetPath<'static>> = {
1423                let new_info = self.get_or_insert(new.clone());
1424                new_info.processed_info = info.processed_info;
1425                new_info.status = info.status;
1426                // Ensure things waiting on the new path are informed of the status of this asset
1427                if let Some(status) = new_info.status {
1428                    new_info.status_sender.broadcast(status).await.unwrap();
1429                }
1430                new_info.dependents.iter().cloned().collect()
1431            };
1432            // Queue the asset for a reprocess check, in case it needs new meta.
1433            self.check_reprocess_queue.push_back(new.clone());
1434            for dependent in dependents {
1435                // Queue dependents for reprocessing because they might have been waiting for this asset.
1436                self.check_reprocess_queue.push_back(dependent);
1437            }
1438        }
1439    }
1440
1441    fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1442        for old_load_dep in removed_info.process_dependencies {
1443            if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1444                info.dependents.remove(asset_path);
1445            } else if let Some(dependents) =
1446                self.non_existent_dependents.get_mut(&old_load_dep.path)
1447            {
1448                dependents.remove(asset_path);
1449            }
1450        }
1451    }
1452}
1453
1454/// The current state of the [`AssetProcessor`].
1455#[derive(Copy, Clone, PartialEq, Eq)]
1456pub enum ProcessorState {
1457    /// The processor is still initializing, which involves scanning the current asset folders,
1458    /// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1459    /// and cleaning up old / unused assets.
1460    Initializing,
1461    /// The processor is currently processing assets.
1462    Processing,
1463    /// The processor has finished processing all valid assets and reporting invalid assets.
1464    Finished,
1465}
1466
1467/// An error that occurs when initializing the [`AssetProcessor`].
1468#[derive(Error, Debug)]
1469pub enum InitializeError {
1470    #[error(transparent)]
1471    FailedToReadSourcePaths(AssetReaderError),
1472    #[error(transparent)]
1473    FailedToReadDestinationPaths(AssetReaderError),
1474    #[error("Failed to validate asset log: {0}")]
1475    ValidateLogError(#[from] ValidateLogError),
1476}