bevy_asset/processor/
mod.rs

1//! Asset processing in Bevy is a framework for automatically transforming artist-authored assets into the format that best suits the needs of your particular game.
2//!
3//! You can think of the asset processing system as a "build system" for assets.
4//! When an artist adds a new asset to the project or an asset is changed (assuming asset hot reloading is enabled), the asset processing system will automatically perform the specified processing steps on the asset.
5//! This can include things like creating lightmaps for baked lighting, compressing a `.wav` file to an `.ogg`, or generating mipmaps for a texture.
6//!
7//! Its core values are:
8//!
9//! 1. Automatic: new and changed assets should be ready to use in-game without requiring any manual conversion or cleanup steps.
10//! 2. Configurable: every game has its own needs, and a high level of transparency and control is required.
11//! 3. Lossless: the original asset should always be preserved, ensuring artists can make changes later.
12//! 4. Deterministic: performing the same processing steps on the same asset should (generally) produce the exact same result. In cases where this doesn't make sense (steps that involve a degree of randomness or uncertainty), the results across runs should be "acceptably similar", as they will be generated once for a given set of inputs and cached.
13//!
14//! Taken together, this means that the original asset plus the processing steps should be enough to regenerate the final asset.
15//! While it may be possible to manually edit the final asset, this should be discouraged.
16//! Final post-processed assets should generally not be version-controlled, except to save developer time when recomputing heavy asset processing steps.
17//!
18//! # Usage
19//!
20//! Asset processing can be enabled or disabled in [`AssetPlugin`](crate::AssetPlugin) by setting the [`AssetMode`](crate::AssetMode).\
21//! Enable Bevy's `file_watcher` feature to automatically watch for changes to assets and reprocess them.
22//!
23//! To register a new asset processor, use [`AssetProcessor::register_processor`].
24//! To set the default asset processor for a given extension, use [`AssetProcessor::set_default_processor`].
25//! In most cases, these methods will be called directly on [`App`](bevy_app::App) using the [`AssetApp`](crate::AssetApp) extension trait.
26//!
27//! If a default asset processor is set, assets with a matching extension will be processed using that processor before loading.
28//!
29//! For an end-to-end example, check out the examples in the [`examples/asset/processing`](https://github.com/bevyengine/bevy/tree/latest/examples/asset/processing) directory of the Bevy repository.
30//!
31//!  # Defining asset processors
32//!
33//! Bevy provides two different ways to define new asset processors:
34//!
35//! - [`LoadTransformAndSave`] + [`AssetTransformer`](crate::transformer::AssetTransformer): a high-level API for loading, transforming, and saving assets.
36//! - [`Process`]: a flexible low-level API for processing assets in arbitrary ways.
37//!
38//! In most cases, [`LoadTransformAndSave`] should be sufficient.
39
40mod log;
41mod process;
42
43use async_lock::RwLockReadGuardArc;
44pub use log::*;
45pub use process::*;
46
47use crate::{
48    io::{
49        AssetReaderError, AssetSource, AssetSourceBuilders, AssetSourceEvent, AssetSourceId,
50        AssetSources, AssetWriterError, ErasedAssetReader, MissingAssetSourceError,
51    },
52    meta::{
53        get_asset_hash, get_full_asset_hash, AssetAction, AssetActionMinimal, AssetHash, AssetMeta,
54        AssetMetaDyn, AssetMetaMinimal, ProcessedInfo, ProcessedInfoMinimal,
55    },
56    AssetLoadError, AssetMetaCheck, AssetPath, AssetServer, AssetServerMode, DeserializeMetaError,
57    MissingAssetLoaderForExtensionError, UnapprovedPathMode, WriteDefaultMetaError,
58};
59use alloc::{borrow::ToOwned, boxed::Box, string::String, sync::Arc, vec, vec::Vec};
60use bevy_ecs::prelude::*;
61use bevy_platform::{
62    collections::{hash_map::Entry, HashMap, HashSet},
63    sync::{PoisonError, RwLock},
64};
65use bevy_tasks::IoTaskPool;
66use futures_io::ErrorKind;
67use futures_lite::{AsyncWriteExt, StreamExt};
68use futures_util::{select_biased, FutureExt};
69use std::{
70    path::{Path, PathBuf},
71    sync::Mutex,
72};
73use thiserror::Error;
74use tracing::{debug, error, trace, warn};
75
76#[cfg(feature = "trace")]
77use {
78    alloc::string::ToString,
79    bevy_reflect::TypePath,
80    bevy_tasks::ConditionalSendFuture,
81    tracing::{info_span, instrument::Instrument},
82};
83
84/// A "background" asset processor that reads asset values from a source [`AssetSource`] (which corresponds to an [`AssetReader`](crate::io::AssetReader) / [`AssetWriter`](crate::io::AssetWriter) pair),
85/// processes them in some way, and writes them to a destination [`AssetSource`].
86///
87/// This will create .meta files (a human-editable serialized form of [`AssetMeta`]) in the source [`AssetSource`] for assets
88/// that can be loaded and/or processed. This enables developers to configure how each asset should be loaded and/or processed.
89///
90/// [`AssetProcessor`] can be run in the background while a Bevy App is running. Changes to assets will be automatically detected and hot-reloaded.
91///
92/// Assets will only be re-processed if they have been changed. A hash of each asset source is stored in the metadata of the processed version of the
93/// asset, which is used to determine if the asset source has actually changed.
94///
95/// A [`ProcessorTransactionLog`] is produced, which uses "write-ahead logging" to make the [`AssetProcessor`] crash and failure resistant. If a failed/unfinished
96/// transaction from a previous run is detected, the affected asset(s) will be re-processed.
97///
98/// [`AssetProcessor`] can be cloned. It is backed by an [`Arc`] so clones will share state. Clones can be freely used in parallel.
99#[derive(Resource, Clone)]
100pub struct AssetProcessor {
101    server: AssetServer,
102    pub(crate) data: Arc<AssetProcessorData>,
103}
104
105/// Internal data stored inside an [`AssetProcessor`].
106pub struct AssetProcessorData {
107    /// The state of processing.
108    pub(crate) processing_state: Arc<ProcessingState>,
109    /// The factory that creates the transaction log.
110    ///
111    /// Note: we use a regular Mutex instead of an async mutex since we expect users to only set
112    /// this once, and before the asset processor starts - there is no reason to await (and it
113    /// avoids needing to use [`block_on`](bevy_tasks::block_on) to set the factory).
114    log_factory: Mutex<Option<Box<dyn ProcessorTransactionLogFactory>>>,
115    log: async_lock::RwLock<Option<Box<dyn ProcessorTransactionLog>>>,
116    /// The processors that will be used to process assets.
117    processors: RwLock<Processors>,
118    sources: Arc<AssetSources>,
119}
120
121/// The current state of processing, including the overall state and the state of all assets.
122pub(crate) struct ProcessingState {
123    /// The overall state of processing.
124    state: async_lock::RwLock<ProcessorState>,
125    /// The channel to broadcast when the processor has completed initialization.
126    initialized_sender: async_broadcast::Sender<()>,
127    initialized_receiver: async_broadcast::Receiver<()>,
128    /// The channel to broadcast when the processor has completed processing.
129    finished_sender: async_broadcast::Sender<()>,
130    finished_receiver: async_broadcast::Receiver<()>,
131    /// The current state of the assets.
132    asset_infos: async_lock::RwLock<ProcessorAssetInfos>,
133}
134
135#[derive(Default)]
136struct Processors {
137    /// Maps the type path of the processor to its instance.
138    type_path_to_processor: HashMap<&'static str, Arc<dyn ErasedProcessor>>,
139    /// Maps the short type path of the processor to its instance.
140    short_type_path_to_processor: HashMap<&'static str, ShortTypeProcessorEntry>,
141    /// Maps the file extension of an asset to the type path of the processor we should use to
142    /// process it by default.
143    file_extension_to_default_processor: HashMap<Box<str>, &'static str>,
144}
145
146enum ShortTypeProcessorEntry {
147    /// There is a unique processor with the given short type path.
148    Unique {
149        /// The full type path of the processor.
150        type_path: &'static str,
151        /// The processor itself.
152        processor: Arc<dyn ErasedProcessor>,
153    },
154    /// There are (at least) two processors with the same short type path (storing the full type
155    /// paths of all conflicting processors). Users must fully specify the type path in order to
156    /// disambiguate.
157    Ambiguous(Vec<&'static str>),
158}
159
160impl AssetProcessor {
161    /// Creates a new [`AssetProcessor`] instance.
162    pub fn new(
163        sources: &mut AssetSourceBuilders,
164        watch_processed: bool,
165    ) -> (Self, Arc<AssetSources>) {
166        let state = Arc::new(ProcessingState::new());
167        let mut sources = sources.build_sources(true, watch_processed);
168        sources.gate_on_processor(state.clone());
169        let sources = Arc::new(sources);
170
171        let data = Arc::new(AssetProcessorData::new(sources.clone(), state));
172        // The asset processor uses its own asset server with its own id space
173        let server = AssetServer::new_with_meta_check(
174            sources.clone(),
175            AssetServerMode::Processed,
176            AssetMetaCheck::Always,
177            false,
178            UnapprovedPathMode::default(),
179        );
180        (Self { server, data }, sources)
181    }
182
183    /// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`].
184    pub fn data(&self) -> &Arc<AssetProcessorData> {
185        &self.data
186    }
187
188    /// The "internal" [`AssetServer`] used by the [`AssetProcessor`]. This is _separate_ from the asset processor used by
189    /// the main App. It has different processor-specific configuration and a different ID space.
190    pub fn server(&self) -> &AssetServer {
191        &self.server
192    }
193
194    /// Retrieves the current [`ProcessorState`]
195    pub async fn get_state(&self) -> ProcessorState {
196        self.data.processing_state.get_state().await
197    }
198
199    /// Retrieves the [`AssetSource`] for this processor
200    #[inline]
201    pub fn get_source<'a>(
202        &self,
203        id: impl Into<AssetSourceId<'a>>,
204    ) -> Result<&AssetSource, MissingAssetSourceError> {
205        self.data.sources.get(id.into())
206    }
207
208    #[inline]
209    pub fn sources(&self) -> &AssetSources {
210        &self.data.sources
211    }
212
213    /// Logs an unrecoverable error. On the next run of the processor, all assets will be regenerated. This should only be used as a last resort.
214    /// Every call to this should be considered with scrutiny and ideally replaced with something more granular.
215    async fn log_unrecoverable(&self) {
216        let mut log = self.data.log.write().await;
217        let log = log.as_mut().unwrap();
218        log.unrecoverable()
219            .await
220            .map_err(|error| WriteLogError {
221                log_entry: LogEntry::UnrecoverableError,
222                error,
223            })
224            .unwrap();
225    }
226
227    /// Logs the start of an asset being processed. If this is not followed at some point in the log by a closing [`AssetProcessor::log_end_processing`],
228    /// in the next run of the processor the asset processing will be considered "incomplete" and it will be reprocessed.
229    async fn log_begin_processing(&self, path: &AssetPath<'_>) {
230        let mut log = self.data.log.write().await;
231        let log = log.as_mut().unwrap();
232        log.begin_processing(path)
233            .await
234            .map_err(|error| WriteLogError {
235                log_entry: LogEntry::BeginProcessing(path.clone_owned()),
236                error,
237            })
238            .unwrap();
239    }
240
241    /// Logs the end of an asset being successfully processed. See [`AssetProcessor::log_begin_processing`].
242    async fn log_end_processing(&self, path: &AssetPath<'_>) {
243        let mut log = self.data.log.write().await;
244        let log = log.as_mut().unwrap();
245        log.end_processing(path)
246            .await
247            .map_err(|error| WriteLogError {
248                log_entry: LogEntry::EndProcessing(path.clone_owned()),
249                error,
250            })
251            .unwrap();
252    }
253
254    /// Starts the processor in a background thread.
255    pub fn start(processor: Res<Self>) {
256        let processor = processor.clone();
257        IoTaskPool::get()
258            .spawn(async move {
259                let start_time = std::time::Instant::now();
260                debug!("Processing Assets");
261
262                processor.initialize().await.unwrap();
263
264                let (new_task_sender, new_task_receiver) = async_channel::unbounded();
265                processor
266                    .queue_initial_processing_tasks(&new_task_sender)
267                    .await;
268
269                // Once all the tasks are queued for the initial processing, start actually
270                // executing the tasks.
271                {
272                    let processor = processor.clone();
273                    let new_task_sender = new_task_sender.clone();
274                    IoTaskPool::get()
275                        .spawn(async move {
276                            processor
277                                .execute_processing_tasks(new_task_sender, new_task_receiver)
278                                .await;
279                        })
280                        .detach();
281                }
282
283                processor.data.wait_until_finished().await;
284
285                let end_time = std::time::Instant::now();
286                debug!("Processing finished in {:?}", end_time - start_time);
287
288                debug!("Listening for changes to source assets");
289                processor.spawn_source_change_event_listeners(&new_task_sender);
290            })
291            .detach();
292    }
293
294    /// Sends start task events for all assets in all processed sources into `sender`.
295    async fn queue_initial_processing_tasks(
296        &self,
297        sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
298    ) {
299        for source in self.sources().iter_processed() {
300            self.queue_processing_tasks_for_folder(source, PathBuf::from(""), sender)
301                .await
302                .unwrap();
303        }
304    }
305
306    /// Spawns listeners of change events for all asset sources which will start processor tasks in
307    /// response.
308    fn spawn_source_change_event_listeners(
309        &self,
310        sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
311    ) {
312        for source in self.data.sources.iter_processed() {
313            let Some(receiver) = source.event_receiver().cloned() else {
314                continue;
315            };
316            let source_id = source.id();
317            let processor = self.clone();
318            let sender = sender.clone();
319            IoTaskPool::get()
320                .spawn(async move {
321                    while let Ok(event) = receiver.recv().await {
322                        let Ok(source) = processor.get_source(source_id.clone()) else {
323                            return;
324                        };
325                        processor
326                            .handle_asset_source_event(source, event, &sender)
327                            .await;
328                    }
329                })
330                .detach();
331        }
332    }
333
334    /// Executes all tasks that come through `receiver`, and updates the processor's overall state
335    /// based on task starts and ends.
336    ///
337    /// This future does not terminate until the channel is closed (not when the channel is empty).
338    /// This means that in [`AssetProcessor::start`], this execution will continue even after all
339    /// the initial tasks are processed.
340    async fn execute_processing_tasks(
341        &self,
342        new_task_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
343        new_task_receiver: async_channel::Receiver<(AssetSourceId<'static>, PathBuf)>,
344    ) {
345        // Convert the Sender into a WeakSender so that once all task producers terminate (and drop
346        // their sender), this task doesn't keep itself alive. We still however need a way to get
347        // the sender since processing tasks can start the tasks of dependent assets.
348        let new_task_sender = {
349            let weak_sender = new_task_sender.downgrade();
350            drop(new_task_sender);
351            weak_sender
352        };
353
354        // If there aren't any tasks in the channel the first time around, we should immediately go
355        // to the finished state (otherwise we'd be sitting around stuck in the `Initialized`
356        // state).
357        if new_task_receiver.is_empty() {
358            self.data
359                .processing_state
360                .set_state(ProcessorState::Finished)
361                .await;
362        }
363        enum ProcessorTaskEvent {
364            Start(AssetSourceId<'static>, PathBuf),
365            Finished,
366        }
367        let (task_finished_sender, task_finished_receiver) = async_channel::unbounded::<()>();
368
369        let mut pending_tasks = 0;
370        while let Ok(event) = {
371            // It's ok to use `select_biased` since we prefer to start task rather than finish tasks
372            // anyway - since otherwise we might mark the processor as finished before all queued
373            // tasks are done. `select_biased` also doesn't depend on `std` which is nice!
374            select_biased! {
375                result = new_task_receiver.recv().fuse() => {
376                    result.map(|(source_id, path)| ProcessorTaskEvent::Start(source_id, path))
377                },
378                result = task_finished_receiver.recv().fuse() => {
379                    result.map(|()| ProcessorTaskEvent::Finished)
380                }
381            }
382        } {
383            match event {
384                ProcessorTaskEvent::Start(source_id, path) => {
385                    let Some(new_task_sender) = new_task_sender.upgrade() else {
386                        // If we can't upgrade the task sender, that means all sources of tasks
387                        // (like the source event listeners) have been dropped. That means that the
388                        // sources are no longer in the app, so reading/writing to them will
389                        // probably not work, so ignoring the task is fine. This also likely means
390                        // that the whole app is being dropped, so we can recover on the next
391                        // initialization.
392                        continue;
393                    };
394                    let processor = self.clone();
395                    let task_finished_sender = task_finished_sender.clone();
396                    pending_tasks += 1;
397                    IoTaskPool::get()
398                        .spawn(async move {
399                            let Ok(source) = processor.get_source(source_id) else {
400                                return;
401                            };
402                            processor.process_asset(source, path, new_task_sender).await;
403                            // If the channel gets closed, that's ok. Just ignore it.
404                            let _ = task_finished_sender.send(()).await;
405                        })
406                        .detach();
407                    self.data
408                        .processing_state
409                        .set_state(ProcessorState::Processing)
410                        .await;
411                }
412                ProcessorTaskEvent::Finished => {
413                    pending_tasks -= 1;
414                    if pending_tasks == 0 {
415                        // clean up metadata in asset server
416                        self.server.write_infos().consume_handle_drop_events();
417                        self.data
418                            .processing_state
419                            .set_state(ProcessorState::Finished)
420                            .await;
421                    }
422                }
423            }
424        }
425    }
426
427    /// Writes the default meta file for the provided `path`.
428    ///
429    /// This function generates the appropriate meta file to process `path` with the default
430    /// processor. If there is no default processor, it falls back to the default loader.
431    ///
432    /// Note if there is already a meta file for `path`, this function returns
433    /// `Err(WriteDefaultMetaError::MetaAlreadyExists)`.
434    pub async fn write_default_meta_file_for_path(
435        &self,
436        path: impl Into<AssetPath<'_>>,
437    ) -> Result<(), WriteDefaultMetaError> {
438        let path = path.into();
439        let Some(processor) = path
440            .get_full_extension()
441            .and_then(|extension| self.get_default_processor(&extension))
442        else {
443            return self
444                .server
445                .write_default_loader_meta_file_for_path(path)
446                .await;
447        };
448
449        let meta = processor.default_meta();
450        let serialized_meta = meta.serialize();
451
452        let source = self.get_source(path.source())?;
453
454        // Note: we get the reader rather than the processed reader, since we want to write the meta
455        // file for the unprocessed version of that asset (so it will be processed by the default
456        // processor).
457        let reader = source.reader();
458        match reader.read_meta_bytes(path.path()).await {
459            Ok(_) => return Err(WriteDefaultMetaError::MetaAlreadyExists),
460            Err(AssetReaderError::NotFound(_)) => {
461                // The meta file couldn't be found so just fall through.
462            }
463            Err(AssetReaderError::Io(err)) => {
464                return Err(WriteDefaultMetaError::IoErrorFromExistingMetaCheck(err))
465            }
466            Err(AssetReaderError::HttpError(err)) => {
467                return Err(WriteDefaultMetaError::HttpErrorFromExistingMetaCheck(err))
468            }
469        }
470
471        let writer = source.writer()?;
472        writer
473            .write_meta_bytes(path.path(), &serialized_meta)
474            .await?;
475
476        Ok(())
477    }
478
479    async fn handle_asset_source_event(
480        &self,
481        source: &AssetSource,
482        event: AssetSourceEvent,
483        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
484    ) {
485        trace!("{event:?}");
486        match event {
487            AssetSourceEvent::AddedAsset(path)
488            | AssetSourceEvent::AddedMeta(path)
489            | AssetSourceEvent::ModifiedAsset(path)
490            | AssetSourceEvent::ModifiedMeta(path) => {
491                let _ = new_task_sender.send((source.id(), path)).await;
492            }
493            AssetSourceEvent::RemovedAsset(path) => {
494                self.handle_removed_asset(source, path).await;
495            }
496            AssetSourceEvent::RemovedMeta(path) => {
497                self.handle_removed_meta(source, path, new_task_sender)
498                    .await;
499            }
500            AssetSourceEvent::AddedFolder(path) => {
501                self.handle_added_folder(source, path, new_task_sender)
502                    .await;
503            }
504            // NOTE: As a heads up for future devs: this event shouldn't be run in parallel with other events that might
505            // touch this folder (ex: the folder might be re-created with new assets). Clean up the old state first.
506            // Currently this event handler is not parallel, but it could be (and likely should be) in the future.
507            AssetSourceEvent::RemovedFolder(path) => {
508                self.handle_removed_folder(source, &path).await;
509            }
510            AssetSourceEvent::RenamedAsset { old, new } => {
511                // If there was a rename event, but the path hasn't changed, this asset might need reprocessing.
512                // Sometimes this event is returned when an asset is moved "back" into the asset folder
513                if old == new {
514                    let _ = new_task_sender.send((source.id(), new)).await;
515                } else {
516                    self.handle_renamed_asset(source, old, new, new_task_sender)
517                        .await;
518                }
519            }
520            AssetSourceEvent::RenamedMeta { old, new } => {
521                // If there was a rename event, but the path hasn't changed, this asset meta might need reprocessing.
522                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
523                if old == new {
524                    let _ = new_task_sender.send((source.id(), new)).await;
525                } else {
526                    debug!("Meta renamed from {old:?} to {new:?}");
527                    // Renaming meta should not assume that an asset has also been renamed. Check both old and new assets to see
528                    // if they should be re-imported (and/or have new meta generated)
529                    let _ = new_task_sender.send((source.id(), old)).await;
530                    let _ = new_task_sender.send((source.id(), new)).await;
531                }
532            }
533            AssetSourceEvent::RenamedFolder { old, new } => {
534                // If there was a rename event, but the path hasn't changed, this asset folder might need reprocessing.
535                // Sometimes this event is returned when an asset meta is moved "back" into the asset folder
536                if old == new {
537                    self.handle_added_folder(source, new, new_task_sender).await;
538                } else {
539                    // PERF: this reprocesses everything in the moved folder. this is not necessary in most cases, but
540                    // requires some nuance when it comes to path handling.
541                    self.handle_removed_folder(source, &old).await;
542                    self.handle_added_folder(source, new, new_task_sender).await;
543                }
544            }
545            AssetSourceEvent::RemovedUnknown { path, is_meta } => {
546                let processed_reader = source.ungated_processed_reader().unwrap();
547                match processed_reader.is_directory(&path).await {
548                    Ok(is_directory) => {
549                        if is_directory {
550                            self.handle_removed_folder(source, &path).await;
551                        } else if is_meta {
552                            self.handle_removed_meta(source, path, new_task_sender)
553                                .await;
554                        } else {
555                            self.handle_removed_asset(source, path).await;
556                        }
557                    }
558                    Err(err) => {
559                        match err {
560                            AssetReaderError::NotFound(_) => {
561                                // if the path is not found, a processed version does not exist
562                            }
563                            AssetReaderError::Io(err) => {
564                                error!(
565                                    "Path '{}' was removed, but the destination reader could not determine if it \
566                                    was a folder or a file due to the following error: {err}",
567                                    AssetPath::from_path(&path).with_source(source.id())
568                                );
569                            }
570                            AssetReaderError::HttpError(status) => {
571                                error!(
572                                    "Path '{}' was removed, but the destination reader could not determine if it \
573                                    was a folder or a file due to receiving an unexpected HTTP Status {status}",
574                                    AssetPath::from_path(&path).with_source(source.id())
575                                );
576                            }
577                        }
578                    }
579                }
580            }
581        }
582    }
583
584    async fn handle_added_folder(
585        &self,
586        source: &AssetSource,
587        path: PathBuf,
588        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
589    ) {
590        debug!(
591            "Folder {} was added. Attempting to re-process",
592            AssetPath::from_path(&path).with_source(source.id())
593        );
594        self.queue_processing_tasks_for_folder(source, path, new_task_sender)
595            .await
596            .unwrap();
597    }
598
599    /// Responds to a removed meta event by reprocessing the asset at the given path.
600    async fn handle_removed_meta(
601        &self,
602        source: &AssetSource,
603        path: PathBuf,
604        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
605    ) {
606        // If meta was removed, we might need to regenerate it.
607        // Likewise, the user might be manually re-adding the asset.
608        // Therefore, we shouldn't automatically delete the asset ... that is a
609        // user-initiated action.
610        debug!(
611            "Meta for asset {} was removed. Attempting to re-process",
612            AssetPath::from_path(&path).with_source(source.id())
613        );
614        let _ = new_task_sender.send((source.id(), path)).await;
615    }
616
617    /// Removes all processed assets stored at the given path (respecting transactionality), then removes the folder itself.
618    async fn handle_removed_folder(&self, source: &AssetSource, path: &Path) {
619        debug!(
620            "Removing folder {} because source was removed",
621            path.display()
622        );
623        let processed_reader = source.ungated_processed_reader().unwrap();
624        match processed_reader.read_directory(path).await {
625            Ok(mut path_stream) => {
626                while let Some(child_path) = path_stream.next().await {
627                    self.handle_removed_asset(source, child_path).await;
628                }
629            }
630            Err(err) => match err {
631                AssetReaderError::NotFound(_err) => {
632                    // The processed folder does not exist. No need to update anything
633                }
634                AssetReaderError::HttpError(status) => {
635                    self.log_unrecoverable().await;
636                    error!(
637                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
638                        in the source directory. Restart the asset processor to fully reprocess assets. HTTP Status Code {status}"
639                    );
640                }
641                AssetReaderError::Io(err) => {
642                    self.log_unrecoverable().await;
643                    error!(
644                        "Unrecoverable Error: Failed to read the processed assets at {path:?} in order to remove assets that no longer exist \
645                        in the source directory. Restart the asset processor to fully reprocess assets. Error: {err}"
646                    );
647                }
648            },
649        }
650        let processed_writer = source.processed_writer().unwrap();
651        if let Err(err) = processed_writer.remove_directory(path).await {
652            match err {
653                AssetWriterError::Io(err) => {
654                    // we can ignore NotFound because if the "final" file in a folder was removed
655                    // then we automatically clean up this folder
656                    if err.kind() != ErrorKind::NotFound {
657                        let asset_path = AssetPath::from_path(path).with_source(source.id());
658                        error!("Failed to remove destination folder that no longer exists in {asset_path}: {err}");
659                    }
660                }
661            }
662        }
663    }
664
665    /// Removes the processed version of an asset and associated in-memory metadata. This will block until all existing reads/writes to the
666    /// asset have finished, thanks to the `file_transaction_lock`.
667    async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
668        let asset_path = AssetPath::from(path).with_source(source.id());
669        debug!("Removing processed {asset_path} because source was removed");
670        let lock = {
671            // Scope the infos lock so we don't hold up other processing for too long.
672            let mut infos = self.data.processing_state.asset_infos.write().await;
673            infos.remove(&asset_path).await
674        };
675        let Some(lock) = lock else {
676            return;
677        };
678
679        // we must wait for uncontested write access to the asset source to ensure existing
680        // readers/writers can finish their operations
681        let _write_lock = lock.write();
682        self.remove_processed_asset_and_meta(source, asset_path.path())
683            .await;
684    }
685
686    /// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
687    /// This will cause direct path dependencies to break.
688    async fn handle_renamed_asset(
689        &self,
690        source: &AssetSource,
691        old: PathBuf,
692        new: PathBuf,
693        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
694    ) {
695        let old = AssetPath::from(old).with_source(source.id());
696        let new = AssetPath::from(new).with_source(source.id());
697        let processed_writer = source.processed_writer().unwrap();
698        let result = {
699            // Scope the infos lock so we don't hold up other processing for too long.
700            let mut infos = self.data.processing_state.asset_infos.write().await;
701            infos.rename(&old, &new, new_task_sender).await
702        };
703        let Some((old_lock, new_lock)) = result else {
704            return;
705        };
706        // we must wait for uncontested write access to both assets to ensure existing
707        // readers/writers can finish their operations
708        let _old_write_lock = old_lock.write();
709        let _new_write_lock = new_lock.write();
710        processed_writer
711            .rename(old.path(), new.path())
712            .await
713            .unwrap();
714        processed_writer
715            .rename_meta(old.path(), new.path())
716            .await
717            .unwrap();
718    }
719
720    async fn queue_processing_tasks_for_folder(
721        &self,
722        source: &AssetSource,
723        path: PathBuf,
724        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
725    ) -> Result<(), AssetReaderError> {
726        if source.reader().is_directory(&path).await? {
727            let mut path_stream = source.reader().read_directory(&path).await?;
728            while let Some(path) = path_stream.next().await {
729                Box::pin(self.queue_processing_tasks_for_folder(source, path, new_task_sender))
730                    .await?;
731            }
732        } else {
733            let _ = new_task_sender.send((source.id(), path)).await;
734        }
735        Ok(())
736    }
737
738    /// Register a new asset processor.
739    pub fn register_processor<P: Process>(&self, processor: P) {
740        let mut processors = self
741            .data
742            .processors
743            .write()
744            .unwrap_or_else(PoisonError::into_inner);
745        #[cfg(feature = "trace")]
746        let processor = InstrumentedAssetProcessor(processor);
747        let processor = Arc::new(processor);
748        processors
749            .type_path_to_processor
750            .insert(P::type_path(), processor.clone());
751        match processors
752            .short_type_path_to_processor
753            .entry(P::short_type_path())
754        {
755            Entry::Vacant(entry) => {
756                entry.insert(ShortTypeProcessorEntry::Unique {
757                    type_path: P::type_path(),
758                    processor,
759                });
760            }
761            Entry::Occupied(mut entry) => match entry.get_mut() {
762                ShortTypeProcessorEntry::Unique { type_path, .. } => {
763                    let type_path = *type_path;
764                    *entry.get_mut() =
765                        ShortTypeProcessorEntry::Ambiguous(vec![type_path, P::type_path()]);
766                }
767                ShortTypeProcessorEntry::Ambiguous(type_paths) => {
768                    type_paths.push(P::type_path());
769                }
770            },
771        }
772    }
773
774    /// Set the default processor for the given `extension`. Make sure `P` is registered with [`AssetProcessor::register_processor`].
775    pub fn set_default_processor<P: Process>(&self, extension: &str) {
776        let mut processors = self
777            .data
778            .processors
779            .write()
780            .unwrap_or_else(PoisonError::into_inner);
781        processors
782            .file_extension_to_default_processor
783            .insert(extension.into(), P::type_path());
784    }
785
786    /// Returns the default processor for the given `extension`, if it exists.
787    pub fn get_default_processor(&self, extension: &str) -> Option<Arc<dyn ErasedProcessor>> {
788        let processors = self
789            .data
790            .processors
791            .read()
792            .unwrap_or_else(PoisonError::into_inner);
793        let key = processors
794            .file_extension_to_default_processor
795            .get(extension)?;
796        processors.type_path_to_processor.get(key).cloned()
797    }
798
799    /// Returns the processor with the given `processor_type_name`, if it exists.
800    pub fn get_processor(
801        &self,
802        processor_type_name: &str,
803    ) -> Result<Arc<dyn ErasedProcessor>, GetProcessorError> {
804        let processors = self
805            .data
806            .processors
807            .read()
808            .unwrap_or_else(PoisonError::into_inner);
809        if let Some(short_type_processor) = processors
810            .short_type_path_to_processor
811            .get(processor_type_name)
812        {
813            return match short_type_processor {
814                ShortTypeProcessorEntry::Unique { processor, .. } => Ok(processor.clone()),
815                ShortTypeProcessorEntry::Ambiguous(examples) => Err(GetProcessorError::Ambiguous {
816                    processor_short_name: processor_type_name.to_owned(),
817                    ambiguous_processor_names: examples.clone(),
818                }),
819            };
820        }
821        processors
822            .type_path_to_processor
823            .get(processor_type_name)
824            .cloned()
825            .ok_or_else(|| GetProcessorError::Missing(processor_type_name.to_owned()))
826    }
827
828    /// Populates the initial view of each asset by scanning the unprocessed and processed asset folders.
829    /// This info will later be used to determine whether or not to re-process an asset
830    ///
831    /// This will validate transactions and recover failed transactions when necessary.
832    async fn initialize(&self) -> Result<(), InitializeError> {
833        self.validate_transaction_log_and_recover().await;
834        let mut asset_infos = self.data.processing_state.asset_infos.write().await;
835
836        /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty
837        /// folders when they are discovered.
838        async fn get_asset_paths(
839            reader: &dyn ErasedAssetReader,
840            path: PathBuf,
841            paths: &mut Vec<PathBuf>,
842            mut empty_dirs: Option<&mut Vec<PathBuf>>,
843        ) -> Result<bool, AssetReaderError> {
844            if reader.is_directory(&path).await? {
845                let mut path_stream = reader.read_directory(&path).await?;
846                let mut contains_files = false;
847
848                while let Some(child_path) = path_stream.next().await {
849                    contains_files |= Box::pin(get_asset_paths(
850                        reader,
851                        child_path,
852                        paths,
853                        empty_dirs.as_deref_mut(),
854                    ))
855                    .await?;
856                }
857                // Add the current directory after all its subdirectories so we delete any empty
858                // subdirectories before the current directory.
859                if !contains_files
860                    && path.parent().is_some()
861                    && let Some(empty_dirs) = empty_dirs
862                {
863                    empty_dirs.push(path);
864                }
865                Ok(contains_files)
866            } else {
867                paths.push(path);
868                Ok(true)
869            }
870        }
871
872        for source in self.sources().iter_processed() {
873            let Some(processed_reader) = source.ungated_processed_reader() else {
874                continue;
875            };
876            let Ok(processed_writer) = source.processed_writer() else {
877                continue;
878            };
879            let mut unprocessed_paths = Vec::new();
880            get_asset_paths(
881                source.reader(),
882                PathBuf::from(""),
883                &mut unprocessed_paths,
884                None,
885            )
886            .await
887            .map_err(InitializeError::FailedToReadSourcePaths)?;
888
889            let mut processed_paths = Vec::new();
890            let mut empty_dirs = Vec::new();
891            get_asset_paths(
892                processed_reader,
893                PathBuf::from(""),
894                &mut processed_paths,
895                Some(&mut empty_dirs),
896            )
897            .await
898            .map_err(InitializeError::FailedToReadDestinationPaths)?;
899
900            // Remove any empty directories from the processed path. Note: this has to happen after
901            // we fetch all the paths, otherwise the path stream can skip over paths
902            // (we're modifying a collection while iterating through it).
903            for empty_dir in empty_dirs {
904                // We don't care if this succeeds, since it's just a cleanup task. It is best-effort
905                let _ = processed_writer.remove_empty_directory(&empty_dir).await;
906            }
907
908            for path in unprocessed_paths {
909                asset_infos.get_or_insert(AssetPath::from(path).with_source(source.id()));
910            }
911
912            for path in processed_paths {
913                let mut dependencies = Vec::new();
914                let asset_path = AssetPath::from(path).with_source(source.id());
915                if let Some(info) = asset_infos.get_mut(&asset_path) {
916                    match processed_reader.read_meta_bytes(asset_path.path()).await {
917                        Ok(meta_bytes) => {
918                            match ron::de::from_bytes::<ProcessedInfoMinimal>(&meta_bytes) {
919                                Ok(minimal) => {
920                                    trace!(
921                                        "Populated processed info for asset {asset_path} {:?}",
922                                        minimal.processed_info
923                                    );
924
925                                    if let Some(processed_info) = &minimal.processed_info {
926                                        for process_dependency_info in
927                                            &processed_info.process_dependencies
928                                        {
929                                            dependencies.push(process_dependency_info.path.clone());
930                                        }
931                                    }
932                                    info.processed_info = minimal.processed_info;
933                                }
934                                Err(err) => {
935                                    trace!("Removing processed data for {asset_path} because meta could not be parsed: {err}");
936                                    self.remove_processed_asset_and_meta(source, asset_path.path())
937                                        .await;
938                                }
939                            }
940                        }
941                        Err(err) => {
942                            trace!("Removing processed data for {asset_path} because meta failed to load: {err}");
943                            self.remove_processed_asset_and_meta(source, asset_path.path())
944                                .await;
945                        }
946                    }
947                } else {
948                    trace!("Removing processed data for non-existent asset {asset_path}");
949                    self.remove_processed_asset_and_meta(source, asset_path.path())
950                        .await;
951                }
952
953                for dependency in dependencies {
954                    asset_infos.add_dependent(&dependency, asset_path.clone());
955                }
956            }
957        }
958
959        self.data
960            .processing_state
961            .set_state(ProcessorState::Processing)
962            .await;
963
964        Ok(())
965    }
966
967    /// Removes the processed version of an asset and its metadata, if it exists. This _is not_ transactional like `remove_processed_asset_transactional`, nor
968    /// does it remove existing in-memory metadata.
969    async fn remove_processed_asset_and_meta(&self, source: &AssetSource, path: &Path) {
970        if let Err(err) = source.processed_writer().unwrap().remove(path).await {
971            warn!("Failed to remove non-existent asset {path:?}: {err}");
972        }
973
974        if let Err(err) = source.processed_writer().unwrap().remove_meta(path).await {
975            warn!("Failed to remove non-existent meta {path:?}: {err}");
976        }
977
978        self.clean_empty_processed_ancestor_folders(source, path)
979            .await;
980    }
981
982    async fn clean_empty_processed_ancestor_folders(&self, source: &AssetSource, path: &Path) {
983        // As a safety precaution don't delete absolute paths to avoid deleting folders outside of the destination folder
984        if path.is_absolute() {
985            error!("Attempted to clean up ancestor folders of an absolute path. This is unsafe so the operation was skipped.");
986            return;
987        }
988        while let Some(parent) = path.parent() {
989            if parent == Path::new("") {
990                break;
991            }
992            if source
993                .processed_writer()
994                .unwrap()
995                .remove_empty_directory(parent)
996                .await
997                .is_err()
998            {
999                // if we fail to delete a folder, stop walking up the tree
1000                break;
1001            }
1002        }
1003    }
1004
1005    /// Processes the asset (if it has not already been processed or the asset source has changed).
1006    /// If the asset has "process dependencies" (relies on the values of other assets), it will asynchronously await until
1007    /// the dependencies have been processed (See [`ProcessorGatedReader`], which is used in the [`AssetProcessor`]'s [`AssetServer`]
1008    /// to block reads until the asset is processed).
1009    ///
1010    /// [`LoadContext`]: crate::loader::LoadContext
1011    /// [`ProcessorGatedReader`]: crate::io::processor_gated::ProcessorGatedReader
1012    async fn process_asset(
1013        &self,
1014        source: &AssetSource,
1015        path: PathBuf,
1016        processor_task_event: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1017    ) {
1018        let asset_path = AssetPath::from(path).with_source(source.id());
1019        let result = self.process_asset_internal(source, &asset_path).await;
1020        let mut infos = self.data.processing_state.asset_infos.write().await;
1021        infos
1022            .finish_processing(asset_path, result, processor_task_event)
1023            .await;
1024    }
1025
1026    async fn process_asset_internal(
1027        &self,
1028        source: &AssetSource,
1029        asset_path: &AssetPath<'static>,
1030    ) -> Result<ProcessResult, ProcessError> {
1031        // TODO: check if already processing to protect against duplicate hot-reload events
1032        debug!("Processing {}", asset_path);
1033        let server = &self.server;
1034        let path = asset_path.path();
1035        let reader = source.reader();
1036
1037        let reader_err = |err| ProcessError::AssetReaderError {
1038            path: asset_path.clone(),
1039            err,
1040        };
1041        let writer_err = |err| ProcessError::AssetWriterError {
1042            path: asset_path.clone(),
1043            err,
1044        };
1045
1046        let (mut source_meta, meta_bytes, processor) = match reader.read_meta_bytes(path).await {
1047            Ok(meta_bytes) => {
1048                let minimal: AssetMetaMinimal = ron::de::from_bytes(&meta_bytes).map_err(|e| {
1049                    ProcessError::DeserializeMetaError(DeserializeMetaError::DeserializeMinimal(e))
1050                })?;
1051                let (meta, processor) = match minimal.asset {
1052                    AssetActionMinimal::Load { loader } => {
1053                        let loader = server.get_asset_loader_with_type_name(&loader).await?;
1054                        let meta = loader.deserialize_meta(&meta_bytes)?;
1055                        (meta, None)
1056                    }
1057                    AssetActionMinimal::Process { processor } => {
1058                        let processor = self.get_processor(&processor)?;
1059                        let meta = processor.deserialize_meta(&meta_bytes)?;
1060                        (meta, Some(processor))
1061                    }
1062                    AssetActionMinimal::Ignore => {
1063                        return Ok(ProcessResult::Ignored);
1064                    }
1065                };
1066                (meta, meta_bytes, processor)
1067            }
1068            Err(AssetReaderError::NotFound(_path)) => {
1069                let (meta, processor) = if let Some(processor) = asset_path
1070                    .get_full_extension()
1071                    .and_then(|ext| self.get_default_processor(&ext))
1072                {
1073                    let meta = processor.default_meta();
1074                    (meta, Some(processor))
1075                } else {
1076                    match server.get_path_asset_loader(asset_path.clone()).await {
1077                        Ok(loader) => (loader.default_meta(), None),
1078                        Err(MissingAssetLoaderForExtensionError { .. }) => {
1079                            let meta: Box<dyn AssetMetaDyn> =
1080                                Box::new(AssetMeta::<(), ()>::new(AssetAction::Ignore));
1081                            (meta, None)
1082                        }
1083                    }
1084                };
1085                let meta_bytes = meta.serialize();
1086                (meta, meta_bytes, processor)
1087            }
1088            Err(err) => {
1089                return Err(ProcessError::ReadAssetMetaError {
1090                    path: asset_path.clone(),
1091                    err,
1092                })
1093            }
1094        };
1095
1096        let processed_writer = source.processed_writer()?;
1097
1098        let new_hash = {
1099            // Create a reader just for computing the hash. Keep this scoped here so that we drop it
1100            // as soon as the hash is computed.
1101            let mut reader_for_hash = reader.read(path).await.map_err(reader_err)?;
1102
1103            get_asset_hash(&meta_bytes, &mut reader_for_hash)
1104                .await
1105                .map_err(reader_err)?
1106        };
1107        let mut new_processed_info = ProcessedInfo {
1108            hash: new_hash,
1109            full_hash: new_hash,
1110            process_dependencies: Vec::new(),
1111        };
1112
1113        {
1114            let infos = self.data.processing_state.asset_infos.read().await;
1115            if let Some(current_processed_info) = infos
1116                .get(asset_path)
1117                .and_then(|i| i.processed_info.as_ref())
1118                && current_processed_info.hash == new_hash
1119            {
1120                let mut dependency_changed = false;
1121                for current_dep_info in &current_processed_info.process_dependencies {
1122                    let live_hash = infos
1123                        .get(&current_dep_info.path)
1124                        .and_then(|i| i.processed_info.as_ref())
1125                        .map(|i| i.full_hash);
1126                    if live_hash != Some(current_dep_info.full_hash) {
1127                        dependency_changed = true;
1128                        break;
1129                    }
1130                }
1131                if !dependency_changed {
1132                    return Ok(ProcessResult::SkippedNotChanged);
1133                }
1134            }
1135        }
1136
1137        // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
1138        // See ProcessedAssetInfo::file_transaction_lock docs for more info
1139        let _transaction_lock = {
1140            let lock = {
1141                let mut infos = self.data.processing_state.asset_infos.write().await;
1142                let info = infos.get_or_insert(asset_path.clone());
1143                // Clone out the transaction lock first and then lock after we've dropped the
1144                // asset_infos. Otherwise, trying to lock a single path can block all other paths to
1145                // (leading to deadlocks).
1146                info.file_transaction_lock.clone()
1147            };
1148            lock.write_arc().await
1149        };
1150
1151        // NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
1152        // Directly writing to the asset destination in the processor necessitates this behavior
1153        // TODO: this class of failure can be recovered via re-processing + smarter log validation that allows for duplicate transactions in the event of failures
1154        self.log_begin_processing(asset_path).await;
1155        if let Some(processor) = processor {
1156            // Unwrap is ok since we have a processor, so the `AssetAction` must have been
1157            // `AssetAction::Process` (which includes its settings).
1158            let settings = source_meta.process_settings().unwrap();
1159
1160            // Create a reader just for the actual process. Note: this means that we're performing
1161            // two reads for the same file (but we avoid having to load the whole file into memory).
1162            // For some sources (like local file systems), this is not a big deal, but for other
1163            // sources like an HTTP asset sources, this could be an entire additional download (if
1164            // the asset source doesn't do any caching). In practice, most sources being processed
1165            // are likely to be local, and processing in general is a publish-time operation, so
1166            // it's not likely to be too big a deal. If in the future, we decide we want to avoid
1167            // this repeated read, we could "ask" the asset source if it prefers avoiding repeated
1168            // reads or not.
1169            let reader_for_process = reader.read(path).await.map_err(reader_err)?;
1170
1171            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1172            let mut processed_meta = {
1173                let mut context = ProcessContext::new(
1174                    self,
1175                    asset_path,
1176                    reader_for_process,
1177                    &mut new_processed_info,
1178                );
1179                processor
1180                    .process(&mut context, settings, &mut *writer)
1181                    .await?
1182            };
1183
1184            writer
1185                .flush()
1186                .await
1187                .map_err(|e| ProcessError::AssetWriterError {
1188                    path: asset_path.clone(),
1189                    err: AssetWriterError::Io(e),
1190                })?;
1191
1192            let full_hash = get_full_asset_hash(
1193                new_hash,
1194                new_processed_info
1195                    .process_dependencies
1196                    .iter()
1197                    .map(|i| i.full_hash),
1198            );
1199            new_processed_info.full_hash = full_hash;
1200            *processed_meta.processed_info_mut() = Some(new_processed_info.clone());
1201            let meta_bytes = processed_meta.serialize();
1202
1203            processed_writer
1204                .write_meta_bytes(path, &meta_bytes)
1205                .await
1206                .map_err(writer_err)?;
1207        } else {
1208            // See the reasoning for processing why it's ok to do a second read here.
1209            let mut reader_for_copy = reader.read(path).await.map_err(reader_err)?;
1210            let mut writer = processed_writer.write(path).await.map_err(writer_err)?;
1211            futures_lite::io::copy(&mut reader_for_copy, &mut writer)
1212                .await
1213                .map_err(|err| ProcessError::AssetWriterError {
1214                    path: asset_path.clone_owned(),
1215                    err: err.into(),
1216                })?;
1217            *source_meta.processed_info_mut() = Some(new_processed_info.clone());
1218            let meta_bytes = source_meta.serialize();
1219            processed_writer
1220                .write_meta_bytes(path, &meta_bytes)
1221                .await
1222                .map_err(writer_err)?;
1223        }
1224        self.log_end_processing(asset_path).await;
1225
1226        Ok(ProcessResult::Processed(new_processed_info))
1227    }
1228
1229    async fn validate_transaction_log_and_recover(&self) {
1230        let log_factory = self
1231            .data
1232            .log_factory
1233            .lock()
1234            .unwrap_or_else(PoisonError::into_inner)
1235            // Take the log factory to indicate we've started and this should disable setting a new
1236            // log factory.
1237            .take()
1238            .expect("the asset processor only starts once");
1239        if let Err(err) = validate_transaction_log(log_factory.as_ref()).await {
1240            let state_is_valid = match err {
1241                ValidateLogError::ReadLogError(err) => {
1242                    error!("Failed to read processor log file. Processed assets cannot be validated so they must be re-generated {err}");
1243                    false
1244                }
1245                ValidateLogError::UnrecoverableError => {
1246                    error!("Encountered an unrecoverable error in the last run. Processed assets cannot be validated so they must be re-generated");
1247                    false
1248                }
1249                ValidateLogError::EntryErrors(entry_errors) => {
1250                    let mut state_is_valid = true;
1251                    for entry_error in entry_errors {
1252                        match entry_error {
1253                            LogEntryError::DuplicateTransaction(_)
1254                            | LogEntryError::EndedMissingTransaction(_) => {
1255                                error!("{}", entry_error);
1256                                state_is_valid = false;
1257                                break;
1258                            }
1259                            LogEntryError::UnfinishedTransaction(path) => {
1260                                debug!("Asset {path:?} did not finish processing. Clearing state for that asset");
1261                                let mut unrecoverable_err = |message: &dyn core::fmt::Display| {
1262                                    error!("Failed to remove asset {path:?}: {message}");
1263                                    state_is_valid = false;
1264                                };
1265                                let Ok(source) = self.get_source(path.source()) else {
1266                                    unrecoverable_err(&"AssetSource does not exist");
1267                                    continue;
1268                                };
1269                                let Ok(processed_writer) = source.processed_writer() else {
1270                                    unrecoverable_err(&"AssetSource does not have a processed AssetWriter registered");
1271                                    continue;
1272                                };
1273
1274                                if let Err(err) = processed_writer.remove(path.path()).await {
1275                                    match err {
1276                                        AssetWriterError::Io(err) => {
1277                                            // any error but NotFound means we could be in a bad state
1278                                            if err.kind() != ErrorKind::NotFound {
1279                                                unrecoverable_err(&err);
1280                                            }
1281                                        }
1282                                    }
1283                                }
1284                                if let Err(err) = processed_writer.remove_meta(path.path()).await {
1285                                    match err {
1286                                        AssetWriterError::Io(err) => {
1287                                            // any error but NotFound means we could be in a bad state
1288                                            if err.kind() != ErrorKind::NotFound {
1289                                                unrecoverable_err(&err);
1290                                            }
1291                                        }
1292                                    }
1293                                }
1294                            }
1295                        }
1296                    }
1297                    state_is_valid
1298                }
1299            };
1300
1301            if !state_is_valid {
1302                error!("Processed asset transaction log state was invalid and unrecoverable for some reason (see previous logs). Removing processed assets and starting fresh.");
1303                for source in self.sources().iter_processed() {
1304                    let Ok(processed_writer) = source.processed_writer() else {
1305                        continue;
1306                    };
1307                    if let Err(err) = processed_writer
1308                        .remove_assets_in_directory(Path::new(""))
1309                        .await
1310                    {
1311                        panic!("Processed assets were in a bad state. To correct this, the asset processor attempted to remove all processed assets and start from scratch. This failed. There is no way to continue. Try restarting, or deleting imported asset folder manually. {err}");
1312                    }
1313                }
1314            }
1315        }
1316        let mut log = self.data.log.write().await;
1317        *log = match log_factory.create_new_log().await {
1318            Ok(log) => Some(log),
1319            Err(err) => panic!("Failed to initialize asset processor log. This cannot be recovered. Try restarting. If that doesn't work, try deleting processed asset folder. {}", err),
1320        };
1321    }
1322}
1323
1324impl AssetProcessorData {
1325    /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`].
1326    pub(crate) fn new(sources: Arc<AssetSources>, processing_state: Arc<ProcessingState>) -> Self {
1327        AssetProcessorData {
1328            processing_state,
1329            sources,
1330            log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))),
1331            log: Default::default(),
1332            processors: Default::default(),
1333        }
1334    }
1335
1336    /// Sets the transaction log factory for the processor.
1337    ///
1338    /// If this is called after asset processing has begun (in the `Startup` schedule), it will
1339    /// return an error. If not called, the default transaction log will be used.
1340    pub fn set_log_factory(
1341        &self,
1342        factory: Box<dyn ProcessorTransactionLogFactory>,
1343    ) -> Result<(), SetTransactionLogFactoryError> {
1344        let mut log_factory = self
1345            .log_factory
1346            .lock()
1347            .unwrap_or_else(PoisonError::into_inner);
1348        if log_factory.is_none() {
1349            // This indicates the asset processor has already started, so setting the factory does
1350            // nothing here.
1351            return Err(SetTransactionLogFactoryError::AlreadyInUse);
1352        }
1353
1354        *log_factory = Some(factory);
1355        Ok(())
1356    }
1357
1358    /// Returns a future that will not finish until the path has been processed.
1359    pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1360        self.processing_state.wait_until_processed(path).await
1361    }
1362
1363    /// Returns a future that will not finish until the processor has been initialized.
1364    pub async fn wait_until_initialized(&self) {
1365        self.processing_state.wait_until_initialized().await;
1366    }
1367
1368    /// Returns a future that will not finish until processing has finished.
1369    pub async fn wait_until_finished(&self) {
1370        self.processing_state.wait_until_finished().await;
1371    }
1372}
1373
1374impl ProcessingState {
1375    /// Creates a new empty processing state.
1376    fn new() -> Self {
1377        let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1);
1378        let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1);
1379        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1380        // not block if there was older state present.
1381        initialized_sender.set_overflow(true);
1382        finished_sender.set_overflow(true);
1383
1384        Self {
1385            state: async_lock::RwLock::new(ProcessorState::Initializing),
1386            initialized_sender,
1387            initialized_receiver,
1388            finished_sender,
1389            finished_receiver,
1390            asset_infos: Default::default(),
1391        }
1392    }
1393
1394    /// Sets the overall state of processing and broadcasts appropriate events.
1395    async fn set_state(&self, state: ProcessorState) {
1396        let mut state_guard = self.state.write().await;
1397        let last_state = *state_guard;
1398        *state_guard = state;
1399        if last_state != ProcessorState::Finished && state == ProcessorState::Finished {
1400            self.finished_sender.broadcast(()).await.unwrap();
1401        } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing {
1402            self.initialized_sender.broadcast(()).await.unwrap();
1403        }
1404    }
1405
1406    /// Retrieves the current [`ProcessorState`]
1407    pub(crate) async fn get_state(&self) -> ProcessorState {
1408        *self.state.read().await
1409    }
1410
1411    /// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur
1412    /// while it is held.
1413    pub(crate) async fn get_transaction_lock(
1414        &self,
1415        path: &AssetPath<'static>,
1416    ) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
1417        let lock = {
1418            let infos = self.asset_infos.read().await;
1419            let info = infos
1420                .get(path)
1421                .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
1422            // Clone out the transaction lock first and then lock after we've dropped the
1423            // asset_infos. Otherwise, trying to lock a single path can block all other paths to
1424            // (leading to deadlocks).
1425            info.file_transaction_lock.clone()
1426        };
1427        Ok(lock.read_arc().await)
1428    }
1429
1430    /// Returns a future that will not finish until the path has been processed.
1431    pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus {
1432        self.wait_until_initialized().await;
1433        let mut receiver = {
1434            let infos = self.asset_infos.write().await;
1435            let info = infos.get(&path);
1436            match info {
1437                Some(info) => match info.status {
1438                    Some(result) => return result,
1439                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1440                    None => info.status_receiver.clone(),
1441                },
1442                None => return ProcessStatus::NonExistent,
1443            }
1444        };
1445        receiver.recv().await.unwrap()
1446    }
1447
1448    /// Returns a future that will not finish until the processor has been initialized.
1449    pub(crate) async fn wait_until_initialized(&self) {
1450        let receiver = {
1451            let state = self.state.read().await;
1452            match *state {
1453                ProcessorState::Initializing => {
1454                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1455                    Some(self.initialized_receiver.clone())
1456                }
1457                _ => None,
1458            }
1459        };
1460
1461        if let Some(mut receiver) = receiver {
1462            receiver.recv().await.unwrap();
1463        }
1464    }
1465
1466    /// Returns a future that will not finish until processing has finished.
1467    pub(crate) async fn wait_until_finished(&self) {
1468        let receiver = {
1469            let state = self.state.read().await;
1470            match *state {
1471                ProcessorState::Initializing | ProcessorState::Processing => {
1472                    // This receiver must be created prior to losing the read lock to ensure this is transactional
1473                    Some(self.finished_receiver.clone())
1474                }
1475                ProcessorState::Finished => None,
1476            }
1477        };
1478
1479        if let Some(mut receiver) = receiver {
1480            receiver.recv().await.unwrap();
1481        }
1482    }
1483}
1484
1485#[cfg(feature = "trace")]
1486#[derive(TypePath)]
1487struct InstrumentedAssetProcessor<T>(T);
1488
1489#[cfg(feature = "trace")]
1490impl<T: Process> Process for InstrumentedAssetProcessor<T> {
1491    type Settings = T::Settings;
1492    type OutputLoader = T::OutputLoader;
1493
1494    fn process(
1495        &self,
1496        context: &mut ProcessContext,
1497        settings: &Self::Settings,
1498        writer: &mut crate::io::Writer,
1499    ) -> impl ConditionalSendFuture<
1500        Output = Result<<Self::OutputLoader as crate::AssetLoader>::Settings, ProcessError>,
1501    > {
1502        let span = info_span!(
1503            "asset processing",
1504            processor = T::type_path(),
1505            asset = context.path().to_string(),
1506        );
1507        self.0.process(context, settings, writer).instrument(span)
1508    }
1509}
1510
1511/// The (successful) result of processing an asset
1512#[derive(Debug, Clone)]
1513pub enum ProcessResult {
1514    Processed(ProcessedInfo),
1515    SkippedNotChanged,
1516    Ignored,
1517}
1518
1519/// The final status of processing an asset
1520#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1521pub enum ProcessStatus {
1522    Processed,
1523    Failed,
1524    NonExistent,
1525}
1526
1527// NOTE: if you add new fields to this struct, make sure they are propagated (when relevant) in ProcessorAssetInfos::rename
1528#[derive(Debug)]
1529pub(crate) struct ProcessorAssetInfo {
1530    processed_info: Option<ProcessedInfo>,
1531    /// Paths of assets that depend on this asset when they are being processed.
1532    dependents: HashSet<AssetPath<'static>>,
1533    status: Option<ProcessStatus>,
1534    /// A lock that controls read/write access to processed asset files. The lock is shared for both the asset bytes and the meta bytes.
1535    /// _This lock must be locked whenever a read or write to processed assets occurs_
1536    /// There are scenarios where processed assets (and their metadata) are being read and written in multiple places at once:
1537    /// * when the processor is running in parallel with an app
1538    /// * when processing assets in parallel, the processor might read an asset's `process_dependencies` when processing new versions of those dependencies
1539    ///     * this second scenario almost certainly isn't possible with the current implementation, but its worth protecting against
1540    ///
1541    /// This lock defends against those scenarios by ensuring readers don't read while processed files are being written. And it ensures
1542    /// Because this lock is shared across meta and asset bytes, readers can ensure they don't read "old" versions of metadata with "new" asset data.
1543    pub(crate) file_transaction_lock: Arc<async_lock::RwLock<()>>,
1544    status_sender: async_broadcast::Sender<ProcessStatus>,
1545    status_receiver: async_broadcast::Receiver<ProcessStatus>,
1546}
1547
1548impl Default for ProcessorAssetInfo {
1549    fn default() -> Self {
1550        let (mut status_sender, status_receiver) = async_broadcast::broadcast(1);
1551        // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to
1552        // not block if there was older state present.
1553        status_sender.set_overflow(true);
1554        Self {
1555            processed_info: Default::default(),
1556            dependents: Default::default(),
1557            file_transaction_lock: Default::default(),
1558            status: None,
1559            status_sender,
1560            status_receiver,
1561        }
1562    }
1563}
1564
1565impl ProcessorAssetInfo {
1566    async fn update_status(&mut self, status: ProcessStatus) {
1567        if self.status != Some(status) {
1568            self.status = Some(status);
1569            self.status_sender.broadcast(status).await.unwrap();
1570        }
1571    }
1572}
1573
1574/// The "current" in memory view of the asset space. This is "eventually consistent". It does not directly
1575/// represent the state of assets in storage, but rather a valid historical view that will gradually become more
1576/// consistent as events are processed.
1577#[derive(Default, Debug)]
1578pub struct ProcessorAssetInfos {
1579    /// The "current" in memory view of the asset space. During processing, if path does not exist in this, it should
1580    /// be considered non-existent.
1581    /// NOTE: YOU MUST USE `Self::get_or_insert` or `Self::insert` TO ADD ITEMS TO THIS COLLECTION TO ENSURE
1582    /// `non_existent_dependents` DATA IS CONSUMED
1583    infos: HashMap<AssetPath<'static>, ProcessorAssetInfo>,
1584    /// Dependents for assets that don't exist. This exists to track "dangling" asset references due to deleted / missing files.
1585    /// If the dependent asset is added, it can "resolve" these dependencies and re-compute those assets.
1586    /// Therefore this _must_ always be consistent with the `infos` data. If a new asset is added to `infos`, it should
1587    /// check this maps for dependencies and add them. If an asset is removed, it should update the dependents here.
1588    non_existent_dependents: HashMap<AssetPath<'static>, HashSet<AssetPath<'static>>>,
1589}
1590
1591impl ProcessorAssetInfos {
1592    fn get_or_insert(&mut self, asset_path: AssetPath<'static>) -> &mut ProcessorAssetInfo {
1593        self.infos.entry(asset_path.clone()).or_insert_with(|| {
1594            let mut info = ProcessorAssetInfo::default();
1595            // track existing dependents by resolving existing "hanging" dependents.
1596            if let Some(dependents) = self.non_existent_dependents.remove(&asset_path) {
1597                info.dependents = dependents;
1598            }
1599            info
1600        })
1601    }
1602
1603    pub(crate) fn get(&self, asset_path: &AssetPath<'static>) -> Option<&ProcessorAssetInfo> {
1604        self.infos.get(asset_path)
1605    }
1606
1607    fn get_mut(&mut self, asset_path: &AssetPath<'static>) -> Option<&mut ProcessorAssetInfo> {
1608        self.infos.get_mut(asset_path)
1609    }
1610
1611    fn add_dependent(&mut self, asset_path: &AssetPath<'static>, dependent: AssetPath<'static>) {
1612        if let Some(info) = self.get_mut(asset_path) {
1613            info.dependents.insert(dependent);
1614        } else {
1615            let dependents = self
1616                .non_existent_dependents
1617                .entry(asset_path.clone())
1618                .or_default();
1619            dependents.insert(dependent);
1620        }
1621    }
1622
1623    /// Finalize processing the asset, which will incorporate the result of the processed asset into the in-memory view the processed assets.
1624    async fn finish_processing(
1625        &mut self,
1626        asset_path: AssetPath<'static>,
1627        result: Result<ProcessResult, ProcessError>,
1628        reprocess_sender: async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1629    ) {
1630        match result {
1631            Ok(ProcessResult::Processed(processed_info)) => {
1632                debug!("Finished processing \"{}\"", asset_path);
1633                // clean up old dependents
1634                let old_processed_info = self
1635                    .infos
1636                    .get_mut(&asset_path)
1637                    .and_then(|i| i.processed_info.take());
1638                if let Some(old_processed_info) = old_processed_info {
1639                    self.clear_dependencies(&asset_path, old_processed_info);
1640                }
1641
1642                // populate new dependents
1643                for process_dependency_info in &processed_info.process_dependencies {
1644                    self.add_dependent(&process_dependency_info.path, asset_path.to_owned());
1645                }
1646                let info = self.get_or_insert(asset_path);
1647                info.processed_info = Some(processed_info);
1648                info.update_status(ProcessStatus::Processed).await;
1649                let dependents = info.dependents.iter().cloned().collect::<Vec<_>>();
1650                for path in dependents {
1651                    let _ = reprocess_sender
1652                        .send((path.source().clone_owned(), path.path().to_owned()))
1653                        .await;
1654                }
1655            }
1656            Ok(ProcessResult::SkippedNotChanged) => {
1657                debug!("Skipping processing (unchanged) \"{}\"", asset_path);
1658                let info = self.get_mut(&asset_path).expect("info should exist");
1659                // NOTE: skipping an asset on a given pass doesn't mean it won't change in the future as a result
1660                // of a dependency being re-processed. This means apps might receive an "old" (but valid) asset first.
1661                // This is in the interest of fast startup times that don't block for all assets being checked + reprocessed
1662                // Therefore this relies on hot-reloading in the app to pickup the "latest" version of the asset
1663                // If "block until latest state is reflected" is required, we can easily add a less granular
1664                // "block until first pass finished" mode
1665                info.update_status(ProcessStatus::Processed).await;
1666            }
1667            Ok(ProcessResult::Ignored) => {
1668                debug!("Skipping processing (ignored) \"{}\"", asset_path);
1669            }
1670            Err(ProcessError::ExtensionRequired) => {
1671                // Skip assets without extensions
1672            }
1673            Err(ProcessError::MissingAssetLoaderForExtension(_)) => {
1674                trace!("No loader found for {asset_path}");
1675            }
1676            Err(ProcessError::AssetReaderError {
1677                err: AssetReaderError::NotFound(_),
1678                ..
1679            }) => {
1680                // if there is no asset source, no processing can be done
1681                trace!("No need to process asset {asset_path} because it does not exist");
1682            }
1683            Err(err) => {
1684                error!("Failed to process asset {asset_path}: {err}");
1685                // if this failed because a dependency could not be loaded, make sure it is reprocessed if that dependency is reprocessed
1686                if let ProcessError::AssetLoadError(AssetLoadError::AssetLoaderError(dependency)) =
1687                    err
1688                {
1689                    let info = self.get_mut(&asset_path).expect("info should exist");
1690                    info.processed_info = Some(ProcessedInfo {
1691                        hash: AssetHash::default(),
1692                        full_hash: AssetHash::default(),
1693                        process_dependencies: vec![],
1694                    });
1695                    self.add_dependent(dependency.path(), asset_path.to_owned());
1696                }
1697
1698                let info = self.get_mut(&asset_path).expect("info should exist");
1699                info.update_status(ProcessStatus::Failed).await;
1700            }
1701        }
1702    }
1703
1704    /// Remove the info for the given path. This should only happen if an asset's source is
1705    /// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset
1706    /// path does not exist.
1707    async fn remove(
1708        &mut self,
1709        asset_path: &AssetPath<'static>,
1710    ) -> Option<Arc<async_lock::RwLock<()>>> {
1711        let info = self.infos.remove(asset_path)?;
1712        if let Some(processed_info) = info.processed_info {
1713            self.clear_dependencies(asset_path, processed_info);
1714        }
1715        // Tell all listeners this asset does not exist
1716        info.status_sender
1717            .broadcast(ProcessStatus::NonExistent)
1718            .await
1719            .unwrap();
1720        if !info.dependents.is_empty() {
1721            error!(
1722                    "The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1723                    info.dependents
1724                );
1725            self.non_existent_dependents
1726                .insert(asset_path.clone(), info.dependents);
1727        }
1728
1729        Some(info.file_transaction_lock)
1730    }
1731
1732    /// Remove the info for the old path, and move over its info to the new path. This should only
1733    /// happen if an asset's source is removed/non-existent. Returns the transaction locks for the
1734    /// old and new assets respectively, or [`None`] if the old path does not exist.
1735    async fn rename(
1736        &mut self,
1737        old: &AssetPath<'static>,
1738        new: &AssetPath<'static>,
1739        new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
1740    ) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
1741        let mut info = self.infos.remove(old)?;
1742        if !info.dependents.is_empty() {
1743            // TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
1744            // doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
1745            // we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
1746            // If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
1747            // If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
1748            // TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
1749            // (see the remove impl).
1750            error!(
1751                    "The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
1752                    info.dependents
1753                );
1754            self.non_existent_dependents
1755                .insert(old.clone(), core::mem::take(&mut info.dependents));
1756        }
1757        if let Some(processed_info) = &info.processed_info {
1758            // Update "dependent" lists for this asset's "process dependencies" to use new path.
1759            for dep in &processed_info.process_dependencies {
1760                if let Some(info) = self.infos.get_mut(&dep.path) {
1761                    info.dependents.remove(old);
1762                    info.dependents.insert(new.clone());
1763                } else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
1764                    dependents.remove(old);
1765                    dependents.insert(new.clone());
1766                }
1767            }
1768        }
1769        // Tell all listeners this asset no longer exists
1770        info.status_sender
1771            .broadcast(ProcessStatus::NonExistent)
1772            .await
1773            .unwrap();
1774        let new_info = self.get_or_insert(new.clone());
1775        new_info.processed_info = info.processed_info;
1776        new_info.status = info.status;
1777        // Ensure things waiting on the new path are informed of the status of this asset
1778        if let Some(status) = new_info.status {
1779            new_info.status_sender.broadcast(status).await.unwrap();
1780        }
1781        let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
1782        // Queue the asset for a reprocess check, in case it needs new meta.
1783        let _ = new_task_sender
1784            .send((new.source().clone_owned(), new.path().to_owned()))
1785            .await;
1786        for dependent in dependents {
1787            // Queue dependents for reprocessing because they might have been waiting for this asset.
1788            let _ = new_task_sender
1789                .send((
1790                    dependent.source().clone_owned(),
1791                    dependent.path().to_owned(),
1792                ))
1793                .await;
1794        }
1795
1796        Some((
1797            info.file_transaction_lock,
1798            new_info.file_transaction_lock.clone(),
1799        ))
1800    }
1801
1802    fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
1803        for old_load_dep in removed_info.process_dependencies {
1804            if let Some(info) = self.infos.get_mut(&old_load_dep.path) {
1805                info.dependents.remove(asset_path);
1806            } else if let Some(dependents) =
1807                self.non_existent_dependents.get_mut(&old_load_dep.path)
1808            {
1809                dependents.remove(asset_path);
1810            }
1811        }
1812    }
1813}
1814
1815/// The current state of the [`AssetProcessor`].
1816#[derive(Copy, Clone, PartialEq, Eq)]
1817pub enum ProcessorState {
1818    /// The processor is still initializing, which involves scanning the current asset folders,
1819    /// constructing an in-memory view of the asset space, recovering from previous errors / crashes,
1820    /// and cleaning up old / unused assets.
1821    Initializing,
1822    /// The processor is currently processing assets.
1823    Processing,
1824    /// The processor has finished processing all valid assets and reporting invalid assets.
1825    Finished,
1826}
1827
1828/// An error that occurs when initializing the [`AssetProcessor`].
1829#[derive(Error, Debug)]
1830pub enum InitializeError {
1831    #[error(transparent)]
1832    FailedToReadSourcePaths(AssetReaderError),
1833    #[error(transparent)]
1834    FailedToReadDestinationPaths(AssetReaderError),
1835    #[error("Failed to validate asset log: {0}")]
1836    ValidateLogError(#[from] ValidateLogError),
1837}
1838
1839/// An error when attempting to set the transaction log factory.
1840#[derive(Error, Debug)]
1841pub enum SetTransactionLogFactoryError {
1842    #[error("Transaction log is already in use so setting the factory does nothing")]
1843    AlreadyInUse,
1844}
1845
1846/// An error when retrieving an asset processor.
1847#[derive(Error, Debug, PartialEq, Eq)]
1848pub enum GetProcessorError {
1849    #[error("The processor '{0}' does not exist")]
1850    Missing(String),
1851    #[error("The processor '{processor_short_name}' is ambiguous between several processors: {ambiguous_processor_names:?}")]
1852    Ambiguous {
1853        processor_short_name: String,
1854        ambiguous_processor_names: Vec<&'static str>,
1855    },
1856}
1857
1858impl From<GetProcessorError> for ProcessError {
1859    fn from(err: GetProcessorError) -> Self {
1860        match err {
1861            GetProcessorError::Missing(name) => Self::MissingProcessor(name),
1862            GetProcessorError::Ambiguous {
1863                processor_short_name,
1864                ambiguous_processor_names,
1865            } => Self::AmbiguousProcessor {
1866                processor_short_name,
1867                ambiguous_processor_names,
1868            },
1869        }
1870    }
1871}
1872
1873#[cfg(test)]
1874mod tests;