bevy_asset/processor/
mod.rs

1//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2//!
3//! You can think of the asset processing system as a "build system" for assets.
4//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6//!
7//! Its core values are:
8//!
9//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13//!
14//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15//! While it may be possible to manually edit the final asset, this should be discouraged.
16//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17//!
18//! # Usage
19//!
20//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22//!
23//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26//!
27//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28//!
29//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30//!
31//!  # Defining asset processors
32//!
33//! Bevy provides two different ways to define new asset processors:
34//!
35//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37//!
38//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40mod log;
41mod process;
42
43pub use log::*;
44pub use process::*;
45
46use crate::{
47    io::{
48        AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
49        AssetSources, AssetWriterError, ErasedAssetReader, ErasedAssetWriter,
50        MissingAssetSourceError,
51    },
52    meta::{
53        get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54        AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55    },
56    AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57    MissingAssetLoaderForExtensionError,
58};
59use alloc::{collections::VecDeque, sync::Arc};
60use bevy_ecs::prelude::*;
61use bevy_tasks::IoTaskPool;
62use bevy_utils::{
63    tracing::{debug, error, trace, warn},
64    HashMap, HashSet,
65};
66#[cfg(feature = "trace")]
67use bevy_utils::{
68    tracing::{info_span, instrument::Instrument},
69    ConditionalSendFuture,
70};
71use derive_more::derive::{Display, Error};
72use futures_io::ErrorKind;
73use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
74use parking_lot::RwLock;
75use std::path::{Path, PathBuf};
76
77/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
78/// processes them in some way, and writes them to a destination [`AssetSource`].
79///
80/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets that
81/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
82///
83/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
84///
85/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
86/// asset, which is used to determine if the asset source has actually changed.
87///
88/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
89/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
90///
91/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
92#[derive(Resource, Clone)]
93pub struct AssetProcessor {
94    server: AssetServer,
95    pub(crate) data: Arc<AssetProcessorData>,
96}
97
98/// Internal data stored inside an [`AssetProcessor`].
99pub struct AssetProcessorData {
100    pub(crate) asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
101    log: async_lock::RwLock<Option<ProcessorTransactionLog>>,
102    processors: RwLock<HashMap<&'static str, Arc<dyn ErasedProcessor>>>,
103    /// Default processors for file extensions
104    default_processors: RwLock<HashMap<Box<str>, &'static str>>,
105    state: async_lock::RwLock<ProcessorState>,
106    sources: AssetSources,
107    initialized_sender: async_broadcast::Sender<()>,
108    initialized_receiver: async_broadcast::Receiver<()>,
109    finished_sender: async_broadcast::Sender<()>,
110    finished_receiver: async_broadcast::Receiver<()>,
111}
112
113impl AssetProcessor {
114    /// Creates a new [`AssetProcessor`] instance.
115    pub fn new(source: &mut AssetSourceBuilders) -> Self {
116        let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false)));
117        // The asset processor uses its own asset server with its own id space
118        let mut sources = source.build_sources(false, false);
119        sources.gate_on_processor(data.clone());
120        let server = AssetServer::new_with_meta_check(
121            sources,
122            AssetServerMode::Processed,
123            AssetMetaCheck::Always,
124            false,
125        );
126        Self { server, data }
127    }
128
129    /// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
130    pub fn data(&self) -> &Arc<AssetProcessorData> {
131        &self.data
132    }
133
134    /// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
135    /// the main App. It has different processor-specific configuration and a different ID space.
136    pub fn server(&self) -> &AssetServer {
137        &self.server
138    }
139
140    async fn set_state(&self, state: ProcessorState) {
141        let mut state_guard = self.data.state.write().await;
142        let last_state = *state_guard;
143        *state_guard = state;
144        if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
145            self.data.finished_sender.broadcast(()).await.unwrap();
146        } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
147            self.data.initialized_sender.broadcast(()).await.unwrap();
148        }
149    }
150
151    /// Retrieves the current [`ProcessorState`]
152    pub async fn get_state(&self) -> ProcessorState {
153        *self.data.state.read().await
154    }
155
156    /// Retrieves the [`AssetSource`] for this processor
157    #[inline]
158    pub fn get_source<'a>(
159        &self,
160        id: impl Into<AssetSourceId<'a>>,
161    ) -> Result<&AssetSource, MissingAssetSourceError> {
162        self.data.sources.get(id.into())
163    }
164
165    #[inline]
166    pub fn sources(&self) -> &AssetSources {
167        &self.data.sources
168    }
169
170    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
171    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
172    async fn log_unrecoverable(&self) {
173        let mut log = self.data.log.write().await;
174        let log = log.as_mut().unwrap();
175        log.unrecoverable().await.unwrap();
176    }
177
178    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
179    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
180    async fn log_begin_processing(&self, path: &AssetPath<'_>) {
181        let mut log = self.data.log.write().await;
182        let log = log.as_mut().unwrap();
183        log.begin_processing(path).await.unwrap();
184    }
185
186    /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
187    async fn log_end_processing(&self, path: &AssetPath<'_>) {
188        let mut log = self.data.log.write().await;
189        let log = log.as_mut().unwrap();
190        log.end_processing(path).await.unwrap();
191    }
192
193    /// Starts the processor in a background thread.
194    pub fn start(_processor: Res<Self>) {
195        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
196        error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
197        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
198        {
199            let processor = _processor.clone();
200            std::thread::spawn(move || {
201                processor.process_assets();
202                bevy_tasks::block_on(processor.listen_for_source_change_events());
203            });
204        }
205    }
206
207    /// Processes all assets. This will:
208    /// * For each "processed [`AssetSource`]:
209    /// * Scan the [`ProcessorTransactionLog`] and recover from any failures detected
210    /// * Scan the processed [`AssetReader`](crate::io::AssetReader) to build the current view of already processed assets.
211    /// * Scan the unprocessed [`AssetReader`](crate::io::AssetReader) and remove any final processed assets that are invalid or no longer exist.
212    /// * For each asset in the unprocessed [`AssetReader`](crate::io::AssetReader), kick off a new "process job", which will process the asset
213    ///     (if the latest version of the asset has not been processed).
214    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
215    pub fn process_assets(&self) {
216        let start_time = std::time::Instant::now();
217        debug!("Processing Assets");
218        IoTaskPool::get().scope(|scope| {
219            scope.spawn(async move {
220                self.initialize().await.unwrap();
221                for source in self.sources().iter_processed() {
222                    self.process_assets_internal(scope, source, PathBuf::from(""))
223                        .await
224                        .unwrap();
225                }
226            });
227        });
228        // This must happen _after_ the scope resolves or it will happen "too early"
229        // Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
230        bevy_tasks::block_on(self.finish_processing_assets());
231        let end_time = std::time::Instant::now();
232        debug!("Processing finished in {:?}", end_time - start_time);
233    }
234
235    /// Listens for changes to assets in the source [`AssetSource`] and update state accordingly.
236    // PERF: parallelize change event processing
237    pub async fn listen_for_source_change_events(&self) {
238        debug!("Listening for changes to source assets");
239        loop {
240            let mut started_processing = false;
241
242            for source in self.data.sources.iter_processed() {
243                if let Some(receiver) = source.event_receiver() {
244                    for event in receiver.try_iter() {
245                        if !started_processing {
246                            self.set_state(ProcessorState::Processing).await;
247                            started_processing = true;
248                        }
249
250                        self.handle_asset_source_event(source, event).await;
251                    }
252                }
253            }
254
255            if started_processing {
256                self.finish_processing_assets().await;
257            }
258        }
259    }
260
261    async fn handle_asset_source_event(&self, source: &AssetSource, event: AssetSourceEvent) {
262        trace!("{event:?}");
263        match event {
264            AssetSourceEvent::AddedAsset(path)
265            | AssetSourceEvent::AddedMeta(path)
266            | AssetSourceEvent::ModifiedAsset(path)
267            | AssetSourceEvent::ModifiedMeta(path) => {
268                self.process_asset(source, path).await;
269            }
270            AssetSourceEvent::RemovedAsset(path) => {
271                self.handle_removed_asset(source, path).await;
272            }
273            AssetSourceEvent::RemovedMeta(path) => {
274                self.handle_removed_meta(source, path).await;
275            }
276            AssetSourceEvent::AddedFolder(path) => {
277                self.handle_added_folder(source, path).await;
278            }
279            // NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
280            // touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
281            // Currently this event handler is not parallel, but it could be (and likely should be) in the future.
282            AssetSourceEvent::RemovedFolder(path) => {
283                self.handle_removed_folder(source, &path).await;
284            }
285            AssetSourceEvent::RenamedAsset { old, new } => {
286                // If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
287                // Sometimes this event is returned when an asset is moved "back" into the asset folder
288                if old == new {
289                    self.process_asset(source, new).await;
290                } else {
291                    self.handle_renamed_asset(source, old, new).await;
292                }
293            }
294            AssetSourceEvent::RenamedMeta { old, new } => {
295                // If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
296                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
297                if old == new {
298                    self.process_asset(source, new).await;
299                } else {
300                    debug!("Meta renamed from {old:?} to {new:?}");
301                    let mut infos = self.data.asset_infos.write().await;
302                    // Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
303                    // if they should be re-imported (and/or have new meta generated)
304                    let new_asset_path = AssetPath::from(new).with_source(source.id());
305                    let old_asset_path = AssetPath::from(old).with_source(source.id());
306                    infos.check_reprocess_queue.push_back(old_asset_path);
307                    infos.check_reprocess_queue.push_back(new_asset_path);
308                }
309            }
310            AssetSourceEvent::RenamedFolder { old, new } => {
311                // If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
312                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
313                if old == new {
314                    self.handle_added_folder(source, new).await;
315                } else {
316                    // PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
317                    // requires some nuance when it comes to path handling.
318                    self.handle_removed_folder(source, &old).await;
319                    self.handle_added_folder(source, new).await;
320                }
321            }
322            AssetSourceEvent::RemovedUnknown { path, is_meta } => {
323                let processed_reader = source.processed_reader().unwrap();
324                match processed_reader.is_directory(&path).await {
325                    Ok(is_directory) => {
326                        if is_directory {
327                            self.handle_removed_folder(source, &path).await;
328                        } else if is_meta {
329                            self.handle_removed_meta(source, path).await;
330                        } else {
331                            self.handle_removed_asset(source, path).await;
332                        }
333                    }
334                    Err(err) => {
335                        match err {
336                            AssetReaderError::NotFound(_) => {
337                                // if the path is not found, a processed version does not exist
338                            }
339                            AssetReaderError::Io(err) => {
340                                error!(
341                                    "Path '{}' was removed, but the destination reader could not determine if it \
342                                    was a folder or a file due to the following error: {err}",
343                                    AssetPath::from_path(&path).with_source(source.id())
344                                );
345                            }
346                            AssetReaderError::HttpError(status) => {
347                                error!(
348                                    "Path '{}' was removed, but the destination reader could not determine if it \
349                                    was a folder or a file due to receiving an unexpected HTTP Status {status}",
350                                    AssetPath::from_path(&path).with_source(source.id())
351                                );
352                            }
353                        }
354                    }
355                }
356            }
357        }
358    }
359
360    async fn handle_added_folder(&self, source: &AssetSource, path: PathBuf) {
361        debug!(
362            "Folder {} was added. Attempting to re-process",
363            AssetPath::from_path(&path).with_source(source.id())
364        );
365        #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
366        error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
367        #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
368        IoTaskPool::get().scope(|scope| {
369            scope.spawn(async move {
370                self.process_assets_internal(scope, source, path)
371                    .await
372                    .unwrap();
373            });
374        });
375    }
376
377    /// Responds to a removed meta event by reprocessing the asset at the given path.
378    async fn handle_removed_meta(&self, source: &AssetSource, path: PathBuf) {
379        // If meta was removed, we might need to regenerate it.
380        // Likewise, the user might be manually re-adding the asset.
381        // Therefore, we shouldn't automatically delete the asset ... that is a
382        // user-initiated action.
383        debug!(
384            "Meta for asset {:?} was removed. Attempting to re-process",
385            AssetPath::from_path(&path).with_source(source.id())
386        );
387        self.process_asset(source, path).await;
388    }
389
390    /// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
391    async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
392        debug!("Removing folder {:?} because source was removed", path);
393        let processed_reader = source.processed_reader().unwrap();
394        match processed_reader.read_directory(path).await {
395            Ok(mut path_stream) => {
396                while let Some(child_path) = path_stream.next().await {
397                    self.handle_removed_asset(source, child_path).await;
398                }
399            }
400            Err(err) => match err {
401                AssetReaderError::NotFound(_err) => {
402                    // The processed folder does not exist. No need to update anything
403                }
404                AssetReaderError::HttpError(status) => {
405                    self.log_unrecoverable().await;
406                    error!(
407                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
408                        in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
409                    );
410                }
411                AssetReaderError::Io(err) => {
412                    self.log_unrecoverable().await;
413                    error!(
414                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
415                        in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
416                    );
417                }
418            },
419        }
420        let processed_writer = source.processed_writer().unwrap();
421        if let Err(err) = processed_writer.remove_directory(path).await {
422            match err {
423                AssetWriterError::Io(err) => {
424                    // we can ignore NotFound because if the "final" file in a folder was removed
425                    // then we automatically clean up this folder
426                    if err.kind() != ErrorKind::NotFound {
427                        let asset_path = AssetPath::from_path(path).with_source(source.id());
428                        error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
429                    }
430                }
431            }
432        }
433    }
434
435    /// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
436    /// asset have finished, thanks to the `file_transaction_lock`.
437    async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
438        let asset_path = AssetPath::from(path).with_source(source.id());
439        debug!("Removing processed {asset_path} because source was removed");
440        let mut infos = self.data.asset_infos.write().await;
441        if let Some(info) = infos.get(&asset_path) {
442            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
443            // can finish their operations
444            let _write_lock = info.file_transaction_lock.write();
445            self.remove_processed_asset_and_meta(source, asset_path.path())
446                .await;
447        }
448        infos.remove(&asset_path).await;
449    }
450
451    /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
452    /// This will cause direct path dependencies to break.
453    async fn handle_renamed_asset(&self, source: &AssetSource, old: PathBuf, new: PathBuf) {
454        let mut infos = self.data.asset_infos.write().await;
455        let old = AssetPath::from(old).with_source(source.id());
456        let new = AssetPath::from(new).with_source(source.id());
457        let processed_writer = source.processed_writer().unwrap();
458        if let Some(info) = infos.get(&old) {
459            // we must wait for uncontested write access to the asset source to ensure existing readers / writers
460            // can finish their operations
461            let _write_lock = info.file_transaction_lock.write();
462            processed_writer
463                .rename(old.path(), new.path())
464                .await
465                .unwrap();
466            processed_writer
467                .rename_meta(old.path(), new.path())
468                .await
469                .unwrap();
470        }
471        infos.rename(&old, &new).await;
472    }
473
474    async fn finish_processing_assets(&self) {
475        self.try_reprocessing_queued().await;
476        // clean up metadata in asset server
477        self.server.data.infos.write().consume_handle_drop_events();
478        self.set_state(ProcessorState::Finished).await;
479    }
480
481    #[allow(unused)]
482    #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
483    async fn process_assets_internal<'scope>(
484        &'scope self,
485        scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
486        source: &'scope AssetSource,
487        path: PathBuf,
488    ) -> Result<(), AssetReaderError> {
489        if source.reader().is_directory(&path).await? {
490            let mut path_stream = source.reader().read_directory(&path).await?;
491            while let Some(path) = path_stream.next().await {
492                Box::pin(self.process_assets_internal(scope, source, path)).await?;
493            }
494        } else {
495            // Files without extensions are skipped
496            let processor = self.clone();
497            scope.spawn(async move {
498                processor.process_asset(source, path).await;
499            });
500        }
501        Ok(())
502    }
503
504    async fn try_reprocessing_queued(&self) {
505        loop {
506            let mut check_reprocess_queue =
507                core::mem::take(&mut self.data.asset_infos.write().await.check_reprocess_queue);
508            IoTaskPool::get().scope(|scope| {
509                for path in check_reprocess_queue.drain(..) {
510                    let processor = self.clone();
511                    let source = self.get_source(path.source()).unwrap();
512                    scope.spawn(async move {
513                        processor.process_asset(source, path.into()).await;
514                    });
515                }
516            });
517            let infos = self.data.asset_infos.read().await;
518            if infos.check_reprocess_queue.is_empty() {
519                break;
520            }
521        }
522    }
523
524    /// Register a new asset processor.
525    pub fn register_processor<P: Process>(&self, processor: P) {
526        let mut process_plans = self.data.processors.write();
527        #[cfg(feature = "trace")]
528        let processor = InstrumentedAssetProcessor(processor);
529        process_plans.insert(core::any::type_name::<P>(), Arc::new(processor));
530    }
531
532    /// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
533    pub fn set_default_processor<P: Process>(&self, extension: &str) {
534        let mut default_processors = self.data.default_processors.write();
535        default_processors.insert(extension.into(), core::any::type_name::<P>());
536    }
537
538    /// Returns the default processor for the given `extension`, if it exists.
539    pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
540        let default_processors = self.data.default_processors.read();
541        let key = default_processors.get(extension)?;
542        self.data.processors.read().get(key).cloned()
543    }
544
545    /// Returns the processor with the given `processor_type_name`, if it exists.
546    pub fn get_processor(&self, processor_type_name: &str) -> Option<Arc<dyn ErasedProcessor>> {
547        let processors = self.data.processors.read();
548        processors.get(processor_type_name).cloned()
549    }
550
551    /// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
552    /// This info will later be used to determine whether or not to re-process an asset
553    ///
554    /// This will validate transactions and recover failed transactions when necessary.
555    #[cfg_attr(
556        any(target_arch = "wasm32", not(feature = "multi_threaded")),
557        expect(
558            dead_code,
559            reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
560        )
561    )]
562    async fn initialize(&self) -> Result<(), InitializeError> {
563        self.validate_transaction_log_and_recover().await;
564        let mut asset_infos = self.data.asset_infos.write().await;
565
566        /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
567        /// folders when they are discovered.
568        async fn get_asset_paths(
569            reader: &dyn ErasedAssetReader,
570            clean_empty_folders_writer: Option<&dyn ErasedAssetWriter>,
571            path: PathBuf,
572            paths: &mut Vec<PathBuf>,
573        ) -> Result<bool, AssetReaderError> {
574            if reader.is_directory(&path).await? {
575                let mut path_stream = reader.read_directory(&path).await?;
576                let mut contains_files = false;
577
578                while let Some(child_path) = path_stream.next().await {
579                    contains_files |= Box::pin(get_asset_paths(
580                        reader,
581                        clean_empty_folders_writer,
582                        child_path,
583                        paths,
584                    ))
585                    .await?;
586                }
587                if !contains_files && path.parent().is_some() {
588                    if let Some(writer) = clean_empty_folders_writer {
589                        // it is ok for this to fail as it is just a cleanup job.
590                        let _ = writer.remove_empty_directory(&path).await;
591                    }
592                }
593                Ok(contains_files)
594            } else {
595                paths.push(path);
596                Ok(true)
597            }
598        }
599
600        for source in self.sources().iter_processed() {
601            let Ok(processed_reader) = source.processed_reader() else {
602                continue;
603            };
604            let Ok(processed_writer) = source.processed_writer() else {
605                continue;
606            };
607            let mut unprocessed_paths = Vec::new();
608            get_asset_paths(
609                source.reader(),
610                None,
611                PathBuf::from(""),
612                &mut unprocessed_paths,
613            )
614            .await
615            .map_err(InitializeError::FailedToReadSourcePaths)?;
616
617            let mut processed_paths = Vec::new();
618            get_asset_paths(
619                processed_reader,
620                Some(processed_writer),
621                PathBuf::from(""),
622                &mut processed_paths,
623            )
624            .await
625            .map_err(InitializeError::FailedToReadDestinationPaths)?;
626
627            for path in unprocessed_paths {
628                asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
629            }
630
631            for path in processed_paths {
632                let mut dependencies = Vec::new();
633                let asset_path = AssetPath::from(path).with_source(source.id());
634                if let Some(info) = asset_infos.get_mut(&asset_path) {
635                    match processed_reader.read_meta_bytes(asset_path.path()).await {
636                        Ok(meta_bytes) => {
637                            match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
638                                Ok(minimal) => {
639                                    trace!(
640                                        "Populated processed info for asset {asset_path} {:?}",
641                                        minimal.processed_info
642                                    );
643
644                                    if let Some(processed_info) = &minimal.processed_info {
645                                        for process_dependency_info in
646                                            &processed_info.process_dependencies
647                                        {
648                                            dependencies.push(process_dependency_info.path.clone());
649                                        }
650                                    }
651                                    info.processed_info = minimal.processed_info;
652                                }
653                                Err(err) => {
654                                    trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
655                                    self.remove_processed_asset_and_meta(source, asset_path.path())
656                                        .await;
657                                }
658                            }
659                        }
660                        Err(err) => {
661                            trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
662                            self.remove_processed_asset_and_meta(source, asset_path.path())
663                                .await;
664                        }
665                    }
666                } else {
667                    trace!("Removing processed data for non-existent asset {asset_path}");
668                    self.remove_processed_asset_and_meta(source, asset_path.path())
669                        .await;
670                }
671
672                for dependency in dependencies {
673                    asset_infos.add_dependent(&dependency, asset_path.clone());
674                }
675            }
676        }
677
678        self.set_state(ProcessorState::Processing).await;
679
680        Ok(())
681    }
682
683    /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
684    /// does it remove existing in-memory metadata.
685    async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
686        if let Err(err) = source.processed_writer().unwrap().remove(path).await {
687            warn!("Failed to remove non-existent asset {path:?}: {err}");
688        }
689
690        if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
691            warn!("Failed to remove non-existent meta {path:?}: {err}");
692        }
693
694        self.clean_empty_processed_ancestor_folders(source, path)
695            .await;
696    }
697
698    async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
699        // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
700        if path.is_absolute() {
701            error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
702            return;
703        }
704        while let Some(parent) = path.parent() {
705            if parent == Path::new("") {
706                break;
707            }
708            if source
709                .processed_writer()
710                .unwrap()
711                .remove_empty_directory(parent)
712                .await
713                .is_err()
714            {
715                // if we fail to delete a folder, stop walking up the tree
716                break;
717            }
718        }
719    }
720
721    /// Processes the asset (if it has not already been processed or the asset source has changed).
722    /// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
723    /// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
724    /// to block reads until the asset is processed).
725    ///
726    /// [`LoadContext`]: crate::loader::LoadContext
727    /// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
728    async fn process_asset(&self, source: &AssetSource, path: PathBuf) {
729        let asset_path = AssetPath::from(path).with_source(source.id());
730        let result = self.process_asset_internal(source, &asset_path).await;
731        let mut infos = self.data.asset_infos.write().await;
732        infos.finish_processing(asset_path, result).await;
733    }
734
735    async fn process_asset_internal(
736        &self,
737        source: &AssetSource,
738        asset_path: &AssetPath<'static>,
739    ) -> Result<ProcessResult, ProcessError> {
740        // TODO: The extension check was removed now that AssetPath is the input. is that ok?
741        // TODO: check if already processing to protect against duplicate hot-reload events
742        debug!("Processing {:?}", asset_path);
743        let server = &self.server;
744        let path = asset_path.path();
745        let reader = source.reader();
746
747        let reader_err = |err| ProcessError::AssetReaderError {
748            path: asset_path.clone(),
749            err,
750        };
751        let writer_err = |err| ProcessError::AssetWriterError {
752            path: asset_path.clone(),
753            err,
754        };
755
756        // Note: we get the asset source reader first because we don't want to create meta files for assets that don't have source files
757        let mut byte_reader = reader.read(path).await.map_err(reader_err)?;
758
759        let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
760            Ok(meta_bytes) => {
761                let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
762                    ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
763                })?;
764                let (meta, processor) = match minimal.asset {
765                    AssetActionMinimal::Load { loader } => {
766                        let loader = server.get_asset_loader_with_type_name(&loader).await?;
767                        let meta = loader.deserialize_meta(&meta_bytes)?;
768                        (meta, None)
769                    }
770                    AssetActionMinimal::Process { processor } => {
771                        let processor = self
772                            .get_processor(&processor)
773                            .ok_or_else(|| ProcessError::MissingProcessor(processor))?;
774                        let meta = processor.deserialize_meta(&meta_bytes)?;
775                        (meta, Some(processor))
776                    }
777                    AssetActionMinimal::Ignore => {
778                        return Ok(ProcessResult::Ignored);
779                    }
780                };
781                (meta, meta_bytes, processor)
782            }
783            Err(AssetReaderError::NotFound(_path)) => {
784                let (meta, processor) = if let Some(processor) = asset_path
785                    .get_full_extension()
786                    .and_then(|ext| self.get_default_processor(&ext))
787                {
788                    let meta = processor.default_meta();
789                    (meta, Some(processor))
790                } else {
791                    match server.get_path_asset_loader(asset_path.clone()).await {
792                        Ok(loader) => (loader.default_meta(), None),
793                        Err(MissingAssetLoaderForExtensionError { .. }) => {
794                            let meta: Box<dyn AssetMetaDyn> =
795                                Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
796                            (meta, None)
797                        }
798                    }
799                };
800                let meta_bytes = meta.serialize();
801                // write meta to source location if it doesn't already exist
802                source
803                    .writer()?
804                    .write_meta_bytes(path, &meta_bytes)
805                    .await
806                    .map_err(writer_err)?;
807                (meta, meta_bytes, processor)
808            }
809            Err(err) => {
810                return Err(ProcessError::ReadAssetMetaError {
811                    path: asset_path.clone(),
812                    err,
813                })
814            }
815        };
816
817        let processed_writer = source.processed_writer()?;
818
819        let mut asset_bytes = Vec::new();
820        byte_reader
821            .read_to_end(&mut asset_bytes)
822            .await
823            .map_err(|e| ProcessError::AssetReaderError {
824                path: asset_path.clone(),
825                err: AssetReaderError::Io(e.into()),
826            })?;
827
828        // PERF: in theory these hashes could be streamed if we want to avoid allocating the whole asset.
829        // The downside is that reading assets would need to happen twice (once for the hash and once for the asset loader)
830        // Hard to say which is worse
831        let new_hash = get_asset_hash(&meta_bytes, &asset_bytes);
832        let mut new_processed_info = ProcessedInfo {
833            hash: new_hash,
834            full_hash: new_hash,
835            process_dependencies: Vec::new(),
836        };
837
838        {
839            let infos = self.data.asset_infos.read().await;
840            if let Some(current_processed_info) = infos
841                .get(asset_path)
842                .and_then(|i| i.processed_info.as_ref())
843            {
844                if current_processed_info.hash == new_hash {
845                    let mut dependency_changed = false;
846                    for current_dep_info in &current_processed_info.process_dependencies {
847                        let live_hash = infos
848                            .get(&current_dep_info.path)
849                            .and_then(|i| i.processed_info.as_ref())
850                            .map(|i| i.full_hash);
851                        if live_hash != Some(current_dep_info.full_hash) {
852                            dependency_changed = true;
853                            break;
854                        }
855                    }
856                    if !dependency_changed {
857                        return Ok(ProcessResult::SkippedNotChanged);
858                    }
859                }
860            }
861        }
862        // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
863        // See ProcessedAssetInfo::file_transaction_lock docs for more info
864        let _transaction_lock = {
865            let mut infos = self.data.asset_infos.write().await;
866            let info = infos.get_or_insert(asset_path.clone());
867            info.file_transaction_lock.write_arc().await
868        };
869
870        // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
871        // Directly writing to the asset destination in the processor necessitates this behavior
872        // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
873        self.log_begin_processing(asset_path).await;
874        if let Some(processor) = processor {
875            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
876            let mut processed_meta = {
877                let mut context =
878                    ProcessContext::new(self, asset_path, &asset_bytes, &mut new_processed_info);
879                processor
880                    .process(&mut context, source_meta, &mut *writer)
881                    .await?
882            };
883
884            writer
885                .flush()
886                .await
887                .map_err(|e| ProcessError::AssetWriterError {
888                    path: asset_path.clone(),
889                    err: AssetWriterError::Io(e),
890                })?;
891
892            let full_hash = get_full_asset_hash(
893                new_hash,
894                new_processed_info
895                    .process_dependencies
896                    .iter()
897                    .map(|i| i.full_hash),
898            );
899            new_processed_info.full_hash = full_hash;
900            *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
901            let meta_bytes = processed_meta.serialize();
902            processed_writer
903                .write_meta_bytes(path, &meta_bytes)
904                .await
905                .map_err(writer_err)?;
906        } else {
907            processed_writer
908                .write_bytes(path, &asset_bytes)
909                .await
910                .map_err(writer_err)?;
911            *source_meta.processed_info_mut() = Some(new_processed_info.clone());
912            let meta_bytes = source_meta.serialize();
913            processed_writer
914                .write_meta_bytes(path, &meta_bytes)
915                .await
916                .map_err(writer_err)?;
917        }
918        self.log_end_processing(asset_path).await;
919
920        Ok(ProcessResult::Processed(new_processed_info))
921    }
922
923    async fn validate_transaction_log_and_recover(&self) {
924        if let Err(err) = ProcessorTransactionLog::validate().await {
925            let state_is_valid = match err {
926                ValidateLogError::ReadLogError(err) => {
927                    error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
928                    false
929                }
930                ValidateLogError::UnrecoverableError => {
931                    error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
932                    false
933                }
934                ValidateLogError::EntryErrors(entry_errors) => {
935                    let mut state_is_valid = true;
936                    for entry_error in entry_errors {
937                        match entry_error {
938                            LogEntryError::DuplicateTransaction(_)
939                            | LogEntryError::EndedMissingTransaction(_) => {
940                                error!("{}", entry_error);
941                                state_is_valid = false;
942                                break;
943                            }
944                            LogEntryError::UnfinishedTransaction(path) => {
945                                debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
946                                let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
947                                    error!("Failed to remove asset {path:?}: {message}");
948                                    state_is_valid = false;
949                                };
950                                let Ok(source) = self.get_source(path.source()) else {
951                                    unrecoverable_err(&"AssetSource does not exist");
952                                    continue;
953                                };
954                                let Ok(processed_writer) = source.processed_writer() else {
955                                    unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
956                                    continue;
957                                };
958
959                                if let Err(err) = processed_writer.remove(path.path()).await {
960                                    match err {
961                                        AssetWriterError::Io(err) => {
962                                            // any error but NotFound means we could be in a bad state
963                                            if err.kind() != ErrorKind::NotFound {
964                                                unrecoverable_err(&err);
965                                            }
966                                        }
967                                    }
968                                }
969                                if let Err(err) = processed_writer.remove_meta(path.path()).await {
970                                    match err {
971                                        AssetWriterError::Io(err) => {
972                                            // any error but NotFound means we could be in a bad state
973                                            if err.kind() != ErrorKind::NotFound {
974                                                unrecoverable_err(&err);
975                                            }
976                                        }
977                                    }
978                                }
979                            }
980                        }
981                    }
982                    state_is_valid
983                }
984            };
985
986            if !state_is_valid {
987                error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
988                for source in self.sources().iter_processed() {
989                    let Ok(processed_writer) = source.processed_writer() else {
990                        continue;
991                    };
992                    if let Err(err) = processed_writer
993                        .remove_assets_in_directory(Path::new(""))
994                        .await
995                    {
996                        panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
997                    }
998                }
999            }
1000        }
1001        let mut log = self.data.log.write().await;
1002        *log = match ProcessorTransactionLog::new().await {
1003            Ok(log) => Some(log),
1004            Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1005        };
1006    }
1007}
1008
1009impl AssetProcessorData {
1010    /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1011    pub fn new(source: AssetSources) -> Self {
1012        let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1013        let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1014        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1015        // not block if there was older state present.
1016        finished_sender.set_overflow(true);
1017        initialized_sender.set_overflow(true);
1018
1019        AssetProcessorData {
1020            sources: source,
1021            finished_sender,
1022            finished_receiver,
1023            initialized_sender,
1024            initialized_receiver,
1025            state: async_lock::RwLock::new(ProcessorState::Initializing),
1026            log: Default::default(),
1027            processors: Default::default(),
1028            asset_infos: Default::default(),
1029            default_processors: Default::default(),
1030        }
1031    }
1032
1033    /// Returns a future that will not finish until the path has been processed.
1034    pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1035        self.wait_until_initialized().await;
1036        let mut receiver = {
1037            let infos = self.asset_infos.write().await;
1038            let info = infos.get(&path);
1039            match info {
1040                Some(info) => match info.status {
1041                    Some(result) => return result,
1042                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1043                    None => info.status_receiver.clone(),
1044                },
1045                None => return ProcessStatus::NonExistent,
1046            }
1047        };
1048        receiver.recv().await.unwrap()
1049    }
1050
1051    /// Returns a future that will not finish until the processor has been initialized.
1052    pub async fn wait_until_initialized(&self) {
1053        let receiver = {
1054            let state = self.state.read().await;
1055            match *state {
1056                ProcessorState::Initializing => {
1057                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1058                    Some(self.initialized_receiver.clone())
1059                }
1060                _ => None,
1061            }
1062        };
1063
1064        if let Some(mut receiver) = receiver {
1065            receiver.recv().await.unwrap();
1066        }
1067    }
1068
1069    /// Returns a future that will not finish until processing has finished.
1070    pub async fn wait_until_finished(&self) {
1071        let receiver = {
1072            let state = self.state.read().await;
1073            match *state {
1074                ProcessorState::Initializing | ProcessorState::Processing => {
1075                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1076                    Some(self.finished_receiver.clone())
1077                }
1078                ProcessorState::Finished => None,
1079            }
1080        };
1081
1082        if let Some(mut receiver) = receiver {
1083            receiver.recv().await.unwrap();
1084        }
1085    }
1086}
1087
1088#[cfg(feature = "trace")]
1089struct InstrumentedAssetProcessor<T>(T);
1090
1091#[cfg(feature = "trace")]
1092impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1093    type Settings = T::Settings;
1094    type OutputLoader = T::OutputLoader;
1095
1096    fn process(
1097        &self,
1098        context: &mut ProcessContext,
1099        meta: AssetMeta<(), Self>,
1100        writer: &mut crate::io::Writer,
1101    ) -> impl ConditionalSendFuture<
1102        Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1103    > {
1104        // Change the processor type for the `AssetMeta`, which works because we share the `Settings` type.
1105        let meta = AssetMeta {
1106            meta_format_version: meta.meta_format_version,
1107            processed_info: meta.processed_info,
1108            asset: meta.asset,
1109        };
1110        let span = info_span!(
1111            "asset processing",
1112            processor = core::any::type_name::<T>(),
1113            asset = context.path().to_string(),
1114        );
1115        self.0.process(context, meta, writer).instrument(span)
1116    }
1117}
1118
1119/// The (successful) result of processing an asset
1120#[derive(Debug, Clone)]
1121pub enum ProcessResult {
1122    Processed(ProcessedInfo),
1123    SkippedNotChanged,
1124    Ignored,
1125}
1126
1127/// The final status of processing an asset
1128#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1129pub enum ProcessStatus {
1130    Processed,
1131    Failed,
1132    NonExistent,
1133}
1134
1135// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1136#[derive(Debug)]
1137pub(crate) struct ProcessorAssetInfo {
1138    processed_info: Option<ProcessedInfo>,
1139    /// Paths of assets that depend on this asset when they are being processed.
1140    dependents: HashSet<AssetPath<'static>>,
1141    status: Option<ProcessStatus>,
1142    /// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1143    /// _This lock must be locked whenever a read or write to processed assets occurs_
1144    /// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1145    /// * when the processor is running in parallel with an app
1146    /// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1147    ///     * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1148    ///
1149    /// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1150    /// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1151    pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1152    status_sender: async_broadcast::Sender<ProcessStatus>,
1153    status_receiver: async_broadcast::Receiver<ProcessStatus>,
1154}
1155
1156impl Default for ProcessorAssetInfo {
1157    fn default() -> Self {
1158        let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1159        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1160        // not block if there was older state present.
1161        status_sender.set_overflow(true);
1162        Self {
1163            processed_info: Default::default(),
1164            dependents: Default::default(),
1165            file_transaction_lock: Default::default(),
1166            status: None,
1167            status_sender,
1168            status_receiver,
1169        }
1170    }
1171}
1172
1173impl ProcessorAssetInfo {
1174    async fn update_status(&mut self, status: ProcessStatus) {
1175        if self.status != Some(status) {
1176            self.status = Some(status);
1177            self.status_sender.broadcast(status).await.unwrap();
1178        }
1179    }
1180}
1181
1182/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1183/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1184/// consistent as events are processed.
1185#[derive(Default, Debug)]
1186pub struct ProcessorAssetInfos {
1187    /// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1188    /// be considered non-existent.
1189    /// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1190    /// `non_existent_dependents` DATA IS CONSUMED
1191    infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1192    /// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1193    /// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1194    /// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1195    /// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1196    non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1197    check_reprocess_queue: VecDeque<AssetPath<'static>>,
1198}
1199
1200impl ProcessorAssetInfos {
1201    fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1202        self.infos.entry(asset_path.clone()).or_insert_with(|| {
1203            let mut info = ProcessorAssetInfo::default();
1204            // track existing dependents by resolving existing "hanging" dependents.
1205            if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1206                info.dependents = dependents;
1207            }
1208            info
1209        })
1210    }
1211
1212    pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1213        self.infos.get(asset_path)
1214    }
1215
1216    fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1217        self.infos.get_mut(asset_path)
1218    }
1219
1220    fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1221        if let Some(info) = self.get_mut(asset_path) {
1222            info.dependents.insert(dependent);
1223        } else {
1224            let dependents = self
1225                .non_existent_dependents
1226                .entry(asset_path.clone())
1227                .or_default();
1228            dependents.insert(dependent);
1229        }
1230    }
1231
1232    /// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1233    async fn finish_processing(
1234        &mut self,
1235        asset_path: AssetPath<'static>,
1236        result: Result<ProcessResult, ProcessError>,
1237    ) {
1238        match result {
1239            Ok(ProcessResult::Processed(processed_info)) => {
1240                debug!("Finished processing \"{:?}\"", asset_path);
1241                // clean up old dependents
1242                let old_processed_info = self
1243                    .infos
1244                    .get_mut(&asset_path)
1245                    .and_then(|i| i.processed_info.take());
1246                if let Some(old_processed_info) = old_processed_info {
1247                    self.clear_dependencies(&asset_path, old_processed_info);
1248                }
1249
1250                // populate new dependents
1251                for process_dependency_info in &processed_info.process_dependencies {
1252                    self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1253                }
1254                let info = self.get_or_insert(asset_path);
1255                info.processed_info = Some(processed_info);
1256                info.update_status(ProcessStatus::Processed).await;
1257                let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1258                for path in dependents {
1259                    self.check_reprocess_queue.push_back(path);
1260                }
1261            }
1262            Ok(ProcessResult::SkippedNotChanged) => {
1263                debug!("Skipping processing (unchanged) \"{:?}\"", asset_path);
1264                let info = self.get_mut(&asset_path).expect("info should exist");
1265                // NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1266                // of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1267                // This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1268                // Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1269                // If "block until latest state is reflected" is required, we can easily add a less granular
1270                // "block until first pass finished" mode
1271                info.update_status(ProcessStatus::Processed).await;
1272            }
1273            Ok(ProcessResult::Ignored) => {
1274                debug!("Skipping processing (ignored) \"{:?}\"", asset_path);
1275            }
1276            Err(ProcessError::ExtensionRequired) => {
1277                // Skip assets without extensions
1278            }
1279            Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1280                trace!("No loader found for {asset_path}");
1281            }
1282            Err(ProcessError::AssetReaderError {
1283                err: AssetReaderError::NotFound(_),
1284                ..
1285            }) => {
1286                // if there is no asset source, no processing can be done
1287                trace!("No need to process asset {asset_path} because it does not exist");
1288            }
1289            Err(err) => {
1290                error!("Failed to process asset {asset_path}: {err}");
1291                // if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1292                if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1293                    err
1294                {
1295                    let info = self.get_mut(&asset_path).expect("info should exist");
1296                    info.processed_info = Some(ProcessedInfo {
1297                        hash: AssetHash::default(),
1298                        full_hash: AssetHash::default(),
1299                        process_dependencies: vec![],
1300                    });
1301                    self.add_dependent(dependency.path(), asset_path.to_owned());
1302                }
1303
1304                let info = self.get_mut(&asset_path).expect("info should exist");
1305                info.update_status(ProcessStatus::Failed).await;
1306            }
1307        }
1308    }
1309
1310    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1311    async fn remove(&mut self, asset_path: &AssetPath<'static>) {
1312        let info = self.infos.remove(asset_path);
1313        if let Some(info) = info {
1314            if let Some(processed_info) = info.processed_info {
1315                self.clear_dependencies(asset_path, processed_info);
1316            }
1317            // Tell all listeners this asset does not exist
1318            info.status_sender
1319                .broadcast(ProcessStatus::NonExistent)
1320                .await
1321                .unwrap();
1322            if !info.dependents.is_empty() {
1323                error!(
1324                    "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1325                    info.dependents
1326                );
1327                self.non_existent_dependents
1328                    .insert(asset_path.clone(), info.dependents);
1329            }
1330        }
1331    }
1332
1333    /// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
1334    async fn rename(&mut self, old: &AssetPath<'static>, new: &AssetPath<'static>) {
1335        let info = self.infos.remove(old);
1336        if let Some(mut info) = info {
1337            if !info.dependents.is_empty() {
1338                // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1339                // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1340                // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1341                // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1342                // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1343                // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1344                // (see the remove impl).
1345                error!(
1346                    "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1347                    info.dependents
1348                );
1349                self.non_existent_dependents
1350                    .insert(old.clone(), core::mem::take(&mut info.dependents));
1351            }
1352            if let Some(processed_info) = &info.processed_info {
1353                // Update "dependent" lists for this asset's "process dependencies" to use new path.
1354                for dep in &processed_info.process_dependencies {
1355                    if let Some(info) = self.infos.get_mut(&dep.path) {
1356                        info.dependents.remove(old);
1357                        info.dependents.insert(new.clone());
1358                    } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
1359                    {
1360                        dependents.remove(old);
1361                        dependents.insert(new.clone());
1362                    }
1363                }
1364            }
1365            // Tell all listeners this asset no longer exists
1366            info.status_sender
1367                .broadcast(ProcessStatus::NonExistent)
1368                .await
1369                .unwrap();
1370            let dependents: Vec<AssetPath<'static>> = {
1371                let new_info = self.get_or_insert(new.clone());
1372                new_info.processed_info = info.processed_info;
1373                new_info.status = info.status;
1374                // Ensure things waiting on the new path are informed of the status of this asset
1375                if let Some(status) = new_info.status {
1376                    new_info.status_sender.broadcast(status).await.unwrap();
1377                }
1378                new_info.dependents.iter().cloned().collect()
1379            };
1380            // Queue the asset for a reprocess check, in case it needs new meta.
1381            self.check_reprocess_queue.push_back(new.clone());
1382            for dependent in dependents {
1383                // Queue dependents for reprocessing because they might have been waiting for this asset.
1384                self.check_reprocess_queue.push_back(dependent);
1385            }
1386        }
1387    }
1388
1389    fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1390        for old_load_dep in removed_info.process_dependencies {
1391            if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1392                info.dependents.remove(asset_path);
1393            } else if let Some(dependents) =
1394                self.non_existent_dependents.get_mut(&old_load_dep.path)
1395            {
1396                dependents.remove(asset_path);
1397            }
1398        }
1399    }
1400}
1401
1402/// The current state of the [`AssetProcessor`].
1403#[derive(Copy, Clone, PartialEq, Eq)]
1404pub enum ProcessorState {
1405    /// The processor is still initializing, which involves scanning the current asset folders,
1406    /// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1407    /// and cleaning up old / unused assets.
1408    Initializing,
1409    /// The processor is currently processing assets.
1410    Processing,
1411    /// The processor has finished processing all valid assets and reporting invalid assets.
1412    Finished,
1413}
1414
1415/// An error that occurs when initializing the [`AssetProcessor`].
1416#[derive(Error, Display, Debug)]
1417pub enum InitializeError {
1418    FailedToReadSourcePaths(AssetReaderError),
1419    FailedToReadDestinationPaths(AssetReaderError),
1420    #[display("Failed to validate asset log: {_0}")]
1421    ValidateLogError(ValidateLogError),
1422}