calloop/
loop_logic.rs

1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use log::trace;
18use polling::Poller;
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25    AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30/// A token representing a registration in the [`EventLoop`].
31///
32/// This token is given to you by the [`EventLoop`] when an [`EventSource`] is inserted or
33/// a [`Dispatcher`] is registered. You can use it to [disable](LoopHandle#method.disable),
34/// [enable](LoopHandle#method.enable), [update`](LoopHandle#method.update),
35/// [remove](LoopHandle#method.remove) or [kill](LoopHandle#method.kill) it.
36#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38    inner: TokenInner,
39}
40
41impl RegistrationToken {
42    /// Create the RegistrationToken corresponding to the given raw key
43    /// This is needed because some methods use `RegistrationToken`s as
44    /// raw usizes within this crate
45    pub(crate) fn new(inner: TokenInner) -> Self {
46        Self { inner }
47    }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51    pub(crate) poll: RefCell<Poll>,
52    // The `Option` is used to keep slots of the slab occipied, to prevent id reuse
53    // while in-flight events might still referr to a recently destroyed event source.
54    pub(crate) sources: RefCell<SourceList<'l, Data>>,
55    pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56    idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57    pending_action: Cell<PostAction>,
58}
59
60/// An handle to an event loop
61///
62/// This handle allows you to insert new sources and idles in this event loop,
63/// it can be cloned, and it is possible to insert new sources from within a source
64/// callback.
65pub struct LoopHandle<'l, Data> {
66    inner: Rc<LoopInner<'l, Data>>,
67}
68
69impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
70    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_str("LoopHandle { ... }")
73    }
74}
75
76impl<'l, Data> Clone for LoopHandle<'l, Data> {
77    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
78    fn clone(&self) -> Self {
79        LoopHandle {
80            inner: self.inner.clone(),
81        }
82    }
83}
84
85impl<'l, Data> LoopHandle<'l, Data> {
86    /// Inserts a new event source in the loop.
87    ///
88    /// The provided callback will be called during the dispatching cycles whenever the
89    /// associated source generates events, see `EventLoop::dispatch(..)` for details.
90    ///
91    /// This function takes ownership of the event source. Use `register_dispatcher`
92    /// if you need access to the event source after this call.
93    pub fn insert_source<S, F>(
94        &self,
95        source: S,
96        callback: F,
97    ) -> Result<RegistrationToken, InsertError<S>>
98    where
99        S: EventSource + 'l,
100        F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
101    {
102        let dispatcher = Dispatcher::new(source, callback);
103        self.register_dispatcher(dispatcher.clone())
104            .map_err(|error| InsertError {
105                error,
106                inserted: dispatcher.into_source_inner(),
107            })
108    }
109
110    /// Registers a `Dispatcher` in the loop.
111    ///
112    /// Use this function if you need access to the event source after its insertion in the loop.
113    ///
114    /// See also `insert_source`.
115    #[cfg_attr(feature = "nightly_coverage", coverage(off))] // Contains a branch we can't hit w/o OOM
116    pub fn register_dispatcher<S>(
117        &self,
118        dispatcher: Dispatcher<'l, S, Data>,
119    ) -> crate::Result<RegistrationToken>
120    where
121        S: EventSource + 'l,
122    {
123        let mut sources = self.inner.sources.borrow_mut();
124        let mut poll = self.inner.poll.borrow_mut();
125
126        // Find an empty slot if any
127        let slot = sources.vacant_entry();
128
129        slot.source = Some(dispatcher.clone_as_event_dispatcher());
130        trace!("[calloop] Inserting new source #{}", slot.token.get_id());
131        let ret = slot.source.as_ref().unwrap().register(
132            &mut poll,
133            &mut self
134                .inner
135                .sources_with_additional_lifecycle_events
136                .borrow_mut(),
137            &mut TokenFactory::new(slot.token),
138        );
139
140        if let Err(error) = ret {
141            slot.source = None;
142            return Err(error);
143        }
144
145        Ok(RegistrationToken { inner: slot.token })
146    }
147
148    /// Inserts an idle callback.
149    ///
150    /// This callback will be called during a dispatching cycle when the event loop has
151    /// finished processing all pending events from the sources and becomes idle.
152    pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
153        let mut opt_cb = Some(callback);
154        let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
155            if let Some(cb) = opt_cb.take() {
156                cb(data);
157            }
158        })));
159        self.inner.idles.borrow_mut().push(callback.clone());
160        Idle { callback }
161    }
162
163    /// Enables this previously disabled event source.
164    ///
165    /// This previously disabled source will start generating events again.
166    ///
167    /// **Note:** this cannot be done from within the source callback.
168    pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
169        if let &SourceEntry {
170            token: entry_token,
171            source: Some(ref source),
172        } = self.inner.sources.borrow().get(token.inner)?
173        {
174            trace!("[calloop] Registering source #{}", entry_token.get_id());
175            source.register(
176                &mut self.inner.poll.borrow_mut(),
177                &mut self
178                    .inner
179                    .sources_with_additional_lifecycle_events
180                    .borrow_mut(),
181                &mut TokenFactory::new(entry_token),
182            )
183        } else {
184            Err(crate::Error::InvalidToken)
185        }
186    }
187
188    /// Makes this source update its registration.
189    ///
190    /// If after accessing the source you changed its parameters in a way that requires
191    /// updating its registration.
192    pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
193        if let &SourceEntry {
194            token: entry_token,
195            source: Some(ref source),
196        } = self.inner.sources.borrow().get(token.inner)?
197        {
198            trace!(
199                "[calloop] Updating registration of source #{}",
200                entry_token.get_id()
201            );
202            if !source.reregister(
203                &mut self.inner.poll.borrow_mut(),
204                &mut self
205                    .inner
206                    .sources_with_additional_lifecycle_events
207                    .borrow_mut(),
208                &mut TokenFactory::new(entry_token),
209            )? {
210                trace!("[calloop] Cannot do it now, storing for later.");
211                // we are in a callback, store for later processing
212                self.inner.pending_action.set(PostAction::Reregister);
213            }
214            Ok(())
215        } else {
216            Err(crate::Error::InvalidToken)
217        }
218    }
219
220    /// Disables this event source.
221    ///
222    /// The source remains in the event loop, but it'll no longer generate events
223    pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
224        if let &SourceEntry {
225            token: entry_token,
226            source: Some(ref source),
227        } = self.inner.sources.borrow().get(token.inner)?
228        {
229            if !token.inner.same_source_as(entry_token) {
230                // The token provided by the user is no longer valid
231                return Err(crate::Error::InvalidToken);
232            }
233            trace!("[calloop] Unregistering source #{}", entry_token.get_id());
234            if !source.unregister(
235                &mut self.inner.poll.borrow_mut(),
236                &mut self
237                    .inner
238                    .sources_with_additional_lifecycle_events
239                    .borrow_mut(),
240                *token,
241            )? {
242                trace!("[calloop] Cannot do it now, storing for later.");
243                // we are in a callback, store for later processing
244                self.inner.pending_action.set(PostAction::Disable);
245            }
246            Ok(())
247        } else {
248            Err(crate::Error::InvalidToken)
249        }
250    }
251
252    /// Removes this source from the event loop.
253    pub fn remove(&self, token: RegistrationToken) {
254        if let Ok(&mut SourceEntry {
255            token: entry_token,
256            ref mut source,
257        }) = self.inner.sources.borrow_mut().get_mut(token.inner)
258        {
259            if let Some(source) = source.take() {
260                trace!("[calloop] Removing source #{}", entry_token.get_id());
261                if let Err(e) = source.unregister(
262                    &mut self.inner.poll.borrow_mut(),
263                    &mut self
264                        .inner
265                        .sources_with_additional_lifecycle_events
266                        .borrow_mut(),
267                    token,
268                ) {
269                    log::warn!(
270                        "[calloop] Failed to unregister source from the polling system: {:?}",
271                        e
272                    );
273                }
274            }
275        }
276    }
277
278    /// Wrap an IO object into an async adapter
279    ///
280    /// This adapter turns the IO object into an async-aware one that can be used in futures.
281    /// The readiness of these futures will be driven by the event loop.
282    ///
283    /// The produced futures can be polled in any executor, and notably the one provided by
284    /// calloop.
285    pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
286        crate::io::Async::new(self.inner.clone(), fd)
287    }
288}
289
290/// An event loop
291///
292/// This loop can host several event sources, that can be dynamically added or removed.
293pub struct EventLoop<'l, Data> {
294    #[allow(dead_code)]
295    poller: Arc<Poller>,
296    handle: LoopHandle<'l, Data>,
297    signals: Arc<Signals>,
298    // A caching vector for synthetic poll events
299    synthetic_events: Vec<PollEvent>,
300}
301
302impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
303    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        f.write_str("EventLoop { ... }")
306    }
307}
308
309/// Signals related to the event loop.
310struct Signals {
311    /// Signal to stop the event loop.
312    stop: AtomicBool,
313
314    /// Signal that the future is ready.
315    #[cfg(feature = "block_on")]
316    future_ready: AtomicBool,
317}
318
319impl<'l, Data> EventLoop<'l, Data> {
320    /// Create a new event loop
321    ///
322    /// Fails if the initialization of the polling system failed.
323    pub fn try_new() -> crate::Result<Self> {
324        let poll = Poll::new()?;
325        let poller = poll.poller.clone();
326        let handle = LoopHandle {
327            inner: Rc::new(LoopInner {
328                poll: RefCell::new(poll),
329                sources: RefCell::new(SourceList::new()),
330                idles: RefCell::new(Vec::new()),
331                pending_action: Cell::new(PostAction::Continue),
332                sources_with_additional_lifecycle_events: Default::default(),
333            }),
334        };
335
336        Ok(EventLoop {
337            handle,
338            signals: Arc::new(Signals {
339                stop: AtomicBool::new(false),
340                #[cfg(feature = "block_on")]
341                future_ready: AtomicBool::new(false),
342            }),
343            poller,
344            synthetic_events: vec![],
345        })
346    }
347
348    /// Retrieve a loop handle
349    pub fn handle(&self) -> LoopHandle<'l, Data> {
350        self.handle.clone()
351    }
352
353    fn dispatch_events(
354        &mut self,
355        mut timeout: Option<Duration>,
356        data: &mut Data,
357    ) -> crate::Result<()> {
358        let now = Instant::now();
359        {
360            let mut extra_lifecycle_sources = self
361                .handle
362                .inner
363                .sources_with_additional_lifecycle_events
364                .borrow_mut();
365            let sources = &self.handle.inner.sources.borrow();
366            for source in &mut *extra_lifecycle_sources.values {
367                if let Ok(SourceEntry {
368                    source: Some(disp), ..
369                }) = sources.get(source.inner)
370                {
371                    if let Some((readiness, token)) = disp.before_sleep()? {
372                        // Wake up instantly after polling if we recieved an event
373                        timeout = Some(Duration::ZERO);
374                        self.synthetic_events.push(PollEvent { readiness, token });
375                    }
376                } else {
377                    unreachable!()
378                }
379            }
380        }
381        let events = {
382            let poll = self.handle.inner.poll.borrow();
383            loop {
384                let result = poll.poll(timeout);
385
386                match result {
387                    Ok(events) => break events,
388                    Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
389                        // Interrupted by a signal. Update timeout and retry.
390                        if let Some(to) = timeout {
391                            let elapsed = now.elapsed();
392                            if elapsed >= to {
393                                return Ok(());
394                            } else {
395                                timeout = Some(to - elapsed);
396                            }
397                        }
398                    }
399                    Err(err) => return Err(err),
400                };
401            }
402        };
403        {
404            let mut extra_lifecycle_sources = self
405                .handle
406                .inner
407                .sources_with_additional_lifecycle_events
408                .borrow_mut();
409            if !extra_lifecycle_sources.values.is_empty() {
410                for source in &mut *extra_lifecycle_sources.values {
411                    if let Ok(SourceEntry {
412                        source: Some(disp), ..
413                    }) = self.handle.inner.sources.borrow().get(source.inner)
414                    {
415                        let iter = EventIterator {
416                            inner: events.iter(),
417                            registration_token: *source,
418                        };
419                        disp.before_handle_events(iter);
420                    } else {
421                        unreachable!()
422                    }
423                }
424            }
425        }
426
427        for event in self.synthetic_events.drain(..).chain(events) {
428            // Get the registration token associated with the event.
429            let reg_token = event.token.inner.forget_sub_id();
430
431            let opt_disp = self
432                .handle
433                .inner
434                .sources
435                .borrow()
436                .get(reg_token)
437                .ok()
438                .and_then(|entry| entry.source.clone());
439
440            if let Some(disp) = opt_disp {
441                trace!(
442                    "[calloop] Dispatching events for source #{}",
443                    reg_token.get_id()
444                );
445                let mut ret = disp.process_events(event.readiness, event.token, data)?;
446
447                // if the returned PostAction is Continue, it may be overwritten by an user-specified pending action
448                let pending_action = self
449                    .handle
450                    .inner
451                    .pending_action
452                    .replace(PostAction::Continue);
453                if let PostAction::Continue = ret {
454                    ret = pending_action;
455                }
456
457                match ret {
458                    PostAction::Reregister => {
459                        trace!(
460                            "[calloop] Postaction reregister for source #{}",
461                            reg_token.get_id()
462                        );
463                        disp.reregister(
464                            &mut self.handle.inner.poll.borrow_mut(),
465                            &mut self
466                                .handle
467                                .inner
468                                .sources_with_additional_lifecycle_events
469                                .borrow_mut(),
470                            &mut TokenFactory::new(reg_token),
471                        )?;
472                    }
473                    PostAction::Disable => {
474                        trace!(
475                            "[calloop] Postaction unregister for source #{}",
476                            reg_token.get_id()
477                        );
478                        disp.unregister(
479                            &mut self.handle.inner.poll.borrow_mut(),
480                            &mut self
481                                .handle
482                                .inner
483                                .sources_with_additional_lifecycle_events
484                                .borrow_mut(),
485                            RegistrationToken::new(reg_token),
486                        )?;
487                    }
488                    PostAction::Remove => {
489                        trace!(
490                            "[calloop] Postaction remove for source #{}",
491                            reg_token.get_id()
492                        );
493                        if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
494                        {
495                            entry.source = None;
496                        }
497                    }
498                    PostAction::Continue => {}
499                }
500
501                if self
502                    .handle
503                    .inner
504                    .sources
505                    .borrow()
506                    .get(reg_token)
507                    .ok()
508                    .map(|entry| entry.source.is_none())
509                    .unwrap_or(true)
510                {
511                    // the source has been removed from within its callback, unregister it
512                    let mut poll = self.handle.inner.poll.borrow_mut();
513                    if let Err(e) = disp.unregister(
514                        &mut poll,
515                        &mut self
516                            .handle
517                            .inner
518                            .sources_with_additional_lifecycle_events
519                            .borrow_mut(),
520                        RegistrationToken::new(reg_token),
521                    ) {
522                        log::warn!(
523                            "[calloop] Failed to unregister source from the polling system: {:?}",
524                            e
525                        );
526                    }
527                }
528            } else {
529                log::warn!(
530                    "[calloop] Received an event for non-existence source: {:?}",
531                    reg_token
532                );
533            }
534        }
535
536        Ok(())
537    }
538
539    fn dispatch_idles(&mut self, data: &mut Data) {
540        let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
541        for idle in idles {
542            idle.borrow_mut().dispatch(data);
543        }
544    }
545
546    /// Dispatch pending events to their callbacks
547    ///
548    /// If some sources have events available, their callbacks will be immediatly called.
549    /// Otherwise this will wait until an event is receive or the provided `timeout`
550    /// is reached. If `timeout` is `None`, it will wait without a duration limit.
551    ///
552    /// Once pending events have been processed or the timeout is reached, all pending
553    /// idle callbacks will be fired before this method returns.
554    pub fn dispatch<D: Into<Option<Duration>>>(
555        &mut self,
556        timeout: D,
557        data: &mut Data,
558    ) -> crate::Result<()> {
559        self.dispatch_events(timeout.into(), data)?;
560        self.dispatch_idles(data);
561
562        Ok(())
563    }
564
565    /// Get a signal to stop this event loop from running
566    ///
567    /// To be used in conjunction with the `run()` method.
568    pub fn get_signal(&self) -> LoopSignal {
569        LoopSignal {
570            signal: self.signals.clone(),
571            notifier: self.handle.inner.poll.borrow().notifier(),
572        }
573    }
574
575    /// Run this event loop
576    ///
577    /// This will repeatedly try to dispatch events (see the `dispatch()` method) on
578    /// this event loop, waiting at most `timeout` every time.
579    ///
580    /// Between each dispatch wait, your provided callback will be called.
581    ///
582    /// You can use the `get_signal()` method to retrieve a way to stop or wakeup
583    /// the event loop from anywhere.
584    pub fn run<F, D: Into<Option<Duration>>>(
585        &mut self,
586        timeout: D,
587        data: &mut Data,
588        mut cb: F,
589    ) -> crate::Result<()>
590    where
591        F: FnMut(&mut Data),
592    {
593        let timeout = timeout.into();
594        self.signals.stop.store(false, Ordering::Release);
595        while !self.signals.stop.load(Ordering::Acquire) {
596            self.dispatch(timeout, data)?;
597            cb(data);
598        }
599        Ok(())
600    }
601
602    /// Block a future on this event loop.
603    ///
604    /// This will run the provided future on this event loop, blocking until it is
605    /// resolved.
606    ///
607    /// If [`LoopSignal::stop()`] is called before the future is resolved, this function returns
608    /// `None`.
609    #[cfg(feature = "block_on")]
610    pub fn block_on<R>(
611        &mut self,
612        future: impl Future<Output = R>,
613        data: &mut Data,
614        mut cb: impl FnMut(&mut Data),
615    ) -> crate::Result<Option<R>> {
616        use std::task::{Context, Poll, Wake, Waker};
617
618        /// A waker that will wake up the event loop when it is ready to make progress.
619        struct EventLoopWaker(LoopSignal);
620
621        impl Wake for EventLoopWaker {
622            fn wake(self: Arc<Self>) {
623                // Set the waker.
624                self.0.signal.future_ready.store(true, Ordering::Release);
625                self.0.notifier.notify().ok();
626            }
627
628            fn wake_by_ref(self: &Arc<Self>) {
629                // Set the waker.
630                self.0.signal.future_ready.store(true, Ordering::Release);
631                self.0.notifier.notify().ok();
632            }
633        }
634
635        // Pin the future to the stack.
636        pin_utils::pin_mut!(future);
637
638        // Create a waker that will wake up the event loop when it is ready to make progress.
639        let waker = {
640            let handle = EventLoopWaker(self.get_signal());
641
642            Waker::from(Arc::new(handle))
643        };
644        let mut context = Context::from_waker(&waker);
645
646        // Begin running the loop.
647        let mut output = None;
648
649        self.signals.stop.store(false, Ordering::Release);
650        self.signals.future_ready.store(true, Ordering::Release);
651
652        while !self.signals.stop.load(Ordering::Acquire) {
653            // If the future is ready to be polled, poll it.
654            if self.signals.future_ready.swap(false, Ordering::AcqRel) {
655                // Poll the future and break the loop if it's ready.
656                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
657                    output = Some(result);
658                    break;
659                }
660            }
661
662            // Otherwise, block on the event loop.
663            self.dispatch_events(None, data)?;
664            self.dispatch_idles(data);
665            cb(data);
666        }
667
668        Ok(output)
669    }
670}
671
672#[cfg(unix)]
673impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
674    /// Get the underlying raw-fd of the poller.
675    ///
676    /// This could be used to create [`Generic`] source out of the current loop
677    /// and inserting into some other [`EventLoop`]. It's recommended to clone `fd`
678    /// before doing so.
679    ///
680    /// [`Generic`]: crate::generic::Generic
681    fn as_raw_fd(&self) -> RawFd {
682        self.poller.as_raw_fd()
683    }
684}
685
686#[cfg(unix)]
687impl<'l, Data> AsFd for EventLoop<'l, Data> {
688    /// Get the underlying fd of the poller.
689    ///
690    /// This could be used to create [`Generic`] source out of the current loop
691    /// and inserting into some other [`EventLoop`].
692    ///
693    /// [`Generic`]: crate::generic::Generic
694    fn as_fd(&self) -> BorrowedFd<'_> {
695        self.poller.as_fd()
696    }
697}
698
699#[cfg(windows)]
700impl<Data> AsRawHandle for EventLoop<'_, Data> {
701    fn as_raw_handle(&self) -> RawHandle {
702        self.poller.as_raw_handle()
703    }
704}
705
706#[cfg(windows)]
707impl<Data> AsHandle for EventLoop<'_, Data> {
708    fn as_handle(&self) -> BorrowedHandle<'_> {
709        self.poller.as_handle()
710    }
711}
712
713#[derive(Clone, Debug)]
714/// The EventIterator is an `Iterator` over the events relevant to a particular source
715/// This type is used in the [`EventSource::before_handle_events`] methods for
716/// two main reasons:
717/// - To avoid dynamic dispatch overhead
718/// - Secondly, it is to allow this type to be `Clone`, which is not
719/// possible with dynamic dispatch
720pub struct EventIterator<'a> {
721    inner: slice::Iter<'a, PollEvent>,
722    registration_token: RegistrationToken,
723}
724
725impl<'a> Iterator for EventIterator<'a> {
726    type Item = (Readiness, Token);
727
728    fn next(&mut self) -> Option<Self::Item> {
729        for next in self.inner.by_ref() {
730            if next
731                .token
732                .inner
733                .same_source_as(self.registration_token.inner)
734            {
735                return Some((next.readiness, next.token));
736            }
737        }
738        None
739    }
740}
741
742/// A signal that can be shared between thread to stop or wakeup a running
743/// event loop
744#[derive(Clone)]
745pub struct LoopSignal {
746    signal: Arc<Signals>,
747    notifier: Notifier,
748}
749
750impl std::fmt::Debug for LoopSignal {
751    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
752    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
753        f.write_str("LoopSignal { ... }")
754    }
755}
756
757impl LoopSignal {
758    /// Stop the event loop
759    ///
760    /// Once this method is called, the next time the event loop has finished
761    /// waiting for events, it will return rather than starting to wait again.
762    ///
763    /// This is only useful if you are using the `EventLoop::run()` method.
764    pub fn stop(&self) {
765        self.signal.stop.store(true, Ordering::Release);
766    }
767
768    /// Wake up the event loop
769    ///
770    /// This sends a dummy event to the event loop to simulate the reception
771    /// of an event, making the wait return early. Called after `stop()`, this
772    /// ensures the event loop will terminate quickly if you specified a long
773    /// timeout (or no timeout at all) to the `dispatch` or `run` method.
774    pub fn wakeup(&self) {
775        self.notifier.notify().ok();
776    }
777}
778
779#[cfg(test)]
780mod tests {
781    use std::{cell::Cell, rc::Rc, time::Duration};
782
783    use crate::{
784        channel::{channel, Channel},
785        ping::*,
786        EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
787        TokenFactory,
788    };
789
790    #[cfg(unix)]
791    use crate::{generic::Generic, Dispatcher, Interest, Mode};
792
793    use super::EventLoop;
794
795    #[test]
796    fn dispatch_idle() {
797        let mut event_loop = EventLoop::try_new().unwrap();
798
799        let mut dispatched = false;
800
801        event_loop.handle().insert_idle(|d| {
802            *d = true;
803        });
804
805        event_loop
806            .dispatch(Some(Duration::ZERO), &mut dispatched)
807            .unwrap();
808
809        assert!(dispatched);
810    }
811
812    #[test]
813    fn cancel_idle() {
814        let mut event_loop = EventLoop::try_new().unwrap();
815
816        let mut dispatched = false;
817
818        let handle = event_loop.handle();
819        let idle = handle.insert_idle(move |d| {
820            *d = true;
821        });
822
823        idle.cancel();
824
825        event_loop
826            .dispatch(Duration::ZERO, &mut dispatched)
827            .unwrap();
828
829        assert!(!dispatched);
830    }
831
832    #[test]
833    fn wakeup() {
834        let mut event_loop = EventLoop::try_new().unwrap();
835
836        let signal = event_loop.get_signal();
837
838        ::std::thread::spawn(move || {
839            ::std::thread::sleep(Duration::from_millis(500));
840            signal.wakeup();
841        });
842
843        // the test should return
844        event_loop.dispatch(None, &mut ()).unwrap();
845    }
846
847    #[test]
848    fn wakeup_stop() {
849        let mut event_loop = EventLoop::try_new().unwrap();
850
851        let signal = event_loop.get_signal();
852
853        ::std::thread::spawn(move || {
854            ::std::thread::sleep(Duration::from_millis(500));
855            signal.stop();
856            signal.wakeup();
857        });
858
859        // the test should return
860        event_loop.run(None, &mut (), |_| {}).unwrap();
861    }
862
863    #[test]
864    fn additional_events() {
865        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
866        let mut lock = Lock {
867            lock: Rc::new((
868                // Whether the lock is locked
869                Cell::new(false),
870                // The total number of events processed in process_events
871                Cell::new(0),
872                // The total number of events processed in before_handle_events
873                // This is used to ensure that the count seen in before_handle_events is expected
874                Cell::new(0),
875            )),
876        };
877        let (sender, channel) = channel();
878        let token = event_loop
879            .handle()
880            .insert_source(
881                LockingSource {
882                    channel,
883                    lock: lock.clone(),
884                },
885                |_, _, lock| {
886                    lock.lock();
887                    lock.unlock();
888                },
889            )
890            .unwrap();
891        sender.send(()).unwrap();
892
893        event_loop.dispatch(None, &mut lock).unwrap();
894        // We should have been locked twice so far
895        assert_eq!(lock.lock.1.get(), 2);
896        // And we should have received one event
897        assert_eq!(lock.lock.2.get(), 1);
898        event_loop.handle().disable(&token).unwrap();
899        event_loop
900            .dispatch(Some(Duration::ZERO), &mut lock)
901            .unwrap();
902        assert_eq!(lock.lock.1.get(), 2);
903
904        event_loop.handle().enable(&token).unwrap();
905        event_loop
906            .dispatch(Some(Duration::ZERO), &mut lock)
907            .unwrap();
908        assert_eq!(lock.lock.1.get(), 3);
909        event_loop.handle().remove(token);
910        event_loop
911            .dispatch(Some(Duration::ZERO), &mut lock)
912            .unwrap();
913        assert_eq!(lock.lock.1.get(), 3);
914        assert_eq!(lock.lock.2.get(), 1);
915
916        #[derive(Clone)]
917        struct Lock {
918            lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
919        }
920        impl Lock {
921            fn lock(&self) {
922                if self.lock.0.get() {
923                    panic!();
924                }
925                // Increase the count
926                self.lock.1.set(self.lock.1.get() + 1);
927                self.lock.0.set(true)
928            }
929            fn unlock(&self) {
930                if !self.lock.0.get() {
931                    panic!();
932                }
933                self.lock.0.set(false);
934            }
935        }
936        struct LockingSource {
937            channel: Channel<()>,
938            lock: Lock,
939        }
940        impl EventSource for LockingSource {
941            type Event = <Channel<()> as EventSource>::Event;
942
943            type Metadata = <Channel<()> as EventSource>::Metadata;
944
945            type Ret = <Channel<()> as EventSource>::Ret;
946
947            type Error = <Channel<()> as EventSource>::Error;
948
949            fn process_events<F>(
950                &mut self,
951                readiness: Readiness,
952                token: Token,
953                callback: F,
954            ) -> Result<PostAction, Self::Error>
955            where
956                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
957            {
958                self.channel.process_events(readiness, token, callback)
959            }
960
961            fn register(
962                &mut self,
963                poll: &mut Poll,
964                token_factory: &mut TokenFactory,
965            ) -> crate::Result<()> {
966                self.channel.register(poll, token_factory)
967            }
968
969            fn reregister(
970                &mut self,
971                poll: &mut Poll,
972                token_factory: &mut TokenFactory,
973            ) -> crate::Result<()> {
974                self.channel.reregister(poll, token_factory)
975            }
976
977            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
978                self.channel.unregister(poll)
979            }
980
981            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
982
983            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
984                self.lock.lock();
985                Ok(None)
986            }
987
988            fn before_handle_events(&mut self, events: EventIterator) {
989                let events_count = events.count();
990                let lock = &self.lock.lock;
991                lock.2.set(lock.2.get() + events_count as u32);
992                self.lock.unlock();
993            }
994        }
995    }
996    #[test]
997    fn default_additional_events() {
998        let (sender, channel) = channel();
999        let mut test_source = NoopWithDefaultHandlers { channel };
1000        let mut event_loop = EventLoop::try_new().unwrap();
1001        event_loop
1002            .handle()
1003            .insert_source(Box::new(&mut test_source), |_, _, _| {})
1004            .unwrap();
1005        sender.send(()).unwrap();
1006
1007        event_loop.dispatch(None, &mut ()).unwrap();
1008        struct NoopWithDefaultHandlers {
1009            channel: Channel<()>,
1010        }
1011        impl EventSource for NoopWithDefaultHandlers {
1012            type Event = <Channel<()> as EventSource>::Event;
1013
1014            type Metadata = <Channel<()> as EventSource>::Metadata;
1015
1016            type Ret = <Channel<()> as EventSource>::Ret;
1017
1018            type Error = <Channel<()> as EventSource>::Error;
1019
1020            fn process_events<F>(
1021                &mut self,
1022                readiness: Readiness,
1023                token: Token,
1024                callback: F,
1025            ) -> Result<PostAction, Self::Error>
1026            where
1027                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1028            {
1029                self.channel.process_events(readiness, token, callback)
1030            }
1031
1032            fn register(
1033                &mut self,
1034                poll: &mut Poll,
1035                token_factory: &mut TokenFactory,
1036            ) -> crate::Result<()> {
1037                self.channel.register(poll, token_factory)
1038            }
1039
1040            fn reregister(
1041                &mut self,
1042                poll: &mut Poll,
1043                token_factory: &mut TokenFactory,
1044            ) -> crate::Result<()> {
1045                self.channel.reregister(poll, token_factory)
1046            }
1047
1048            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1049                self.channel.unregister(poll)
1050            }
1051
1052            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1053        }
1054    }
1055
1056    #[test]
1057    fn additional_events_synthetic() {
1058        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1059        let mut lock = Lock {
1060            lock: Rc::new(Cell::new(false)),
1061        };
1062        event_loop
1063            .handle()
1064            .insert_source(
1065                InstantWakeupLockingSource {
1066                    lock: lock.clone(),
1067                    token: None,
1068                },
1069                |_, _, lock| {
1070                    lock.lock();
1071                    lock.unlock();
1072                },
1073            )
1074            .unwrap();
1075
1076        // Loop should finish, as
1077        event_loop.dispatch(None, &mut lock).unwrap();
1078        #[derive(Clone)]
1079        struct Lock {
1080            lock: Rc<Cell<bool>>,
1081        }
1082        impl Lock {
1083            fn lock(&self) {
1084                if self.lock.get() {
1085                    panic!();
1086                }
1087                self.lock.set(true)
1088            }
1089            fn unlock(&self) {
1090                if !self.lock.get() {
1091                    panic!();
1092                }
1093                self.lock.set(false);
1094            }
1095        }
1096        struct InstantWakeupLockingSource {
1097            lock: Lock,
1098            token: Option<Token>,
1099        }
1100        impl EventSource for InstantWakeupLockingSource {
1101            type Event = ();
1102
1103            type Metadata = ();
1104
1105            type Ret = ();
1106
1107            type Error = <Channel<()> as EventSource>::Error;
1108
1109            fn process_events<F>(
1110                &mut self,
1111                _: Readiness,
1112                token: Token,
1113                mut callback: F,
1114            ) -> Result<PostAction, Self::Error>
1115            where
1116                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1117            {
1118                assert_eq!(token, self.token.unwrap());
1119                callback((), &mut ());
1120                Ok(PostAction::Continue)
1121            }
1122
1123            fn register(
1124                &mut self,
1125                _: &mut Poll,
1126                token_factory: &mut TokenFactory,
1127            ) -> crate::Result<()> {
1128                self.token = Some(token_factory.token());
1129                Ok(())
1130            }
1131
1132            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1133                unreachable!()
1134            }
1135
1136            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1137                unreachable!()
1138            }
1139
1140            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1141
1142            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1143                self.lock.lock();
1144                Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1145            }
1146
1147            fn before_handle_events(&mut self, _: EventIterator) {
1148                self.lock.unlock();
1149            }
1150        }
1151    }
1152
1153    #[cfg(unix)]
1154    #[test]
1155    fn insert_bad_source() {
1156        use std::os::unix::io::FromRawFd;
1157
1158        let event_loop = EventLoop::<()>::try_new().unwrap();
1159        let fd = unsafe { std::os::unix::io::OwnedFd::from_raw_fd(420) };
1160        let ret = event_loop.handle().insert_source(
1161            crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1162            |_, _, _| Ok(PostAction::Continue),
1163        );
1164        assert!(ret.is_err());
1165    }
1166
1167    #[test]
1168    fn invalid_token() {
1169        let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1170
1171        let event_loop = EventLoop::<()>::try_new().unwrap();
1172        let handle = event_loop.handle();
1173        let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1174        handle.remove(reg_token);
1175
1176        let ret = handle.enable(&reg_token);
1177        assert!(ret.is_err());
1178    }
1179
1180    #[cfg(unix)]
1181    #[test]
1182    fn insert_source_no_interest() {
1183        use rustix::pipe::pipe;
1184
1185        // Create a pipe to get an arbitrary fd.
1186        let (read, _write) = pipe().unwrap();
1187
1188        let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1189        let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1190
1191        let event_loop = EventLoop::<()>::try_new().unwrap();
1192        let handle = event_loop.handle();
1193        let ret = handle.register_dispatcher(dispatcher.clone());
1194
1195        if let Ok(token) = ret {
1196            // Unwrap the dispatcher+source and close the read end.
1197            handle.remove(token);
1198        } else {
1199            // Fail the test.
1200            panic!();
1201        }
1202    }
1203
1204    #[test]
1205    fn disarm_rearm() {
1206        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1207        let (ping, ping_source) = make_ping().unwrap();
1208
1209        let ping_token = event_loop
1210            .handle()
1211            .insert_source(ping_source, |(), &mut (), dispatched| {
1212                *dispatched = true;
1213            })
1214            .unwrap();
1215
1216        ping.ping();
1217        let mut dispatched = false;
1218        event_loop
1219            .dispatch(Duration::ZERO, &mut dispatched)
1220            .unwrap();
1221        assert!(dispatched);
1222
1223        // disable the source
1224        ping.ping();
1225        event_loop.handle().disable(&ping_token).unwrap();
1226        let mut dispatched = false;
1227        event_loop
1228            .dispatch(Duration::ZERO, &mut dispatched)
1229            .unwrap();
1230        assert!(!dispatched);
1231
1232        // reenable it, the previous ping now gets dispatched
1233        event_loop.handle().enable(&ping_token).unwrap();
1234        let mut dispatched = false;
1235        event_loop
1236            .dispatch(Duration::ZERO, &mut dispatched)
1237            .unwrap();
1238        assert!(dispatched);
1239    }
1240
1241    #[test]
1242    fn multiple_tokens() {
1243        struct DoubleSource {
1244            ping1: PingSource,
1245            ping2: PingSource,
1246        }
1247
1248        impl crate::EventSource for DoubleSource {
1249            type Event = u32;
1250            type Metadata = ();
1251            type Ret = ();
1252            type Error = PingError;
1253
1254            fn process_events<F>(
1255                &mut self,
1256                readiness: Readiness,
1257                token: Token,
1258                mut callback: F,
1259            ) -> Result<PostAction, Self::Error>
1260            where
1261                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1262            {
1263                self.ping1
1264                    .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1265                self.ping2
1266                    .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1267                Ok(PostAction::Continue)
1268            }
1269
1270            fn register(
1271                &mut self,
1272                poll: &mut Poll,
1273                token_factory: &mut TokenFactory,
1274            ) -> crate::Result<()> {
1275                self.ping1.register(poll, token_factory)?;
1276                self.ping2.register(poll, token_factory)?;
1277                Ok(())
1278            }
1279
1280            fn reregister(
1281                &mut self,
1282                poll: &mut Poll,
1283                token_factory: &mut TokenFactory,
1284            ) -> crate::Result<()> {
1285                self.ping1.reregister(poll, token_factory)?;
1286                self.ping2.reregister(poll, token_factory)?;
1287                Ok(())
1288            }
1289
1290            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1291                self.ping1.unregister(poll)?;
1292                self.ping2.unregister(poll)?;
1293                Ok(())
1294            }
1295        }
1296
1297        let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1298
1299        let (ping1, source1) = make_ping().unwrap();
1300        let (ping2, source2) = make_ping().unwrap();
1301
1302        let source = DoubleSource {
1303            ping1: source1,
1304            ping2: source2,
1305        };
1306
1307        event_loop
1308            .handle()
1309            .insert_source(source, |i, _, d| {
1310                eprintln!("Dispatching {}", i);
1311                *d += i
1312            })
1313            .unwrap();
1314
1315        let mut dispatched = 0;
1316        ping1.ping();
1317        event_loop
1318            .dispatch(Duration::ZERO, &mut dispatched)
1319            .unwrap();
1320        assert_eq!(dispatched, 1);
1321
1322        dispatched = 0;
1323        ping2.ping();
1324        event_loop
1325            .dispatch(Duration::ZERO, &mut dispatched)
1326            .unwrap();
1327        assert_eq!(dispatched, 2);
1328
1329        dispatched = 0;
1330        ping1.ping();
1331        ping2.ping();
1332        event_loop
1333            .dispatch(Duration::ZERO, &mut dispatched)
1334            .unwrap();
1335        assert_eq!(dispatched, 3);
1336    }
1337
1338    #[cfg(unix)]
1339    #[test]
1340    fn change_interests() {
1341        use rustix::io::write;
1342        use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1343        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1344
1345        let (sock1, sock2) = socketpair(
1346            AddressFamily::UNIX,
1347            SocketType::STREAM,
1348            SocketFlags::empty(),
1349            None, // recv with DONTWAIT will suffice for platforms without SockFlag::SOCK_NONBLOCKING such as macOS
1350        )
1351        .unwrap();
1352
1353        let source = Generic::new(sock1, Interest::READ, Mode::Level);
1354        let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1355            *dispatched = true;
1356            // read all contents available to drain the socket
1357            let mut buf = [0u8; 32];
1358            loop {
1359                match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1360                    Ok(0) => break, // closed pipe, we are now inert
1361                    Ok(_) => {}
1362                    Err(e) => {
1363                        let e: std::io::Error = e.into();
1364                        if e.kind() == std::io::ErrorKind::WouldBlock {
1365                            break;
1366                        // nothing more to read
1367                        } else {
1368                            // propagate error
1369                            return Err(e);
1370                        }
1371                    }
1372                }
1373            }
1374            Ok(PostAction::Continue)
1375        });
1376
1377        let sock_token_1 = event_loop
1378            .handle()
1379            .register_dispatcher(dispatcher.clone())
1380            .unwrap();
1381
1382        // first dispatch, nothing is readable
1383        let mut dispatched = false;
1384        event_loop
1385            .dispatch(Duration::ZERO, &mut dispatched)
1386            .unwrap();
1387        assert!(!dispatched);
1388
1389        // write something, the socket becomes readable
1390        write(&sock2, &[1, 2, 3]).unwrap();
1391        dispatched = false;
1392        event_loop
1393            .dispatch(Duration::ZERO, &mut dispatched)
1394            .unwrap();
1395        assert!(dispatched);
1396
1397        // All has been read, no longer readable
1398        dispatched = false;
1399        event_loop
1400            .dispatch(Duration::ZERO, &mut dispatched)
1401            .unwrap();
1402        assert!(!dispatched);
1403
1404        // change the interests for writability instead
1405        dispatcher.as_source_mut().interest = Interest::WRITE;
1406        event_loop.handle().update(&sock_token_1).unwrap();
1407
1408        // the socket is writable
1409        dispatched = false;
1410        event_loop
1411            .dispatch(Duration::ZERO, &mut dispatched)
1412            .unwrap();
1413        assert!(dispatched);
1414
1415        // change back to readable
1416        dispatcher.as_source_mut().interest = Interest::READ;
1417        event_loop.handle().update(&sock_token_1).unwrap();
1418
1419        // the socket is not readable
1420        dispatched = false;
1421        event_loop
1422            .dispatch(Duration::ZERO, &mut dispatched)
1423            .unwrap();
1424        assert!(!dispatched);
1425    }
1426
1427    #[test]
1428    fn kill_source() {
1429        let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1430
1431        let handle = event_loop.handle();
1432        let (ping, ping_source) = make_ping().unwrap();
1433        let ping_token = event_loop
1434            .handle()
1435            .insert_source(ping_source, move |(), &mut (), opt_src| {
1436                if let Some(src) = opt_src.take() {
1437                    handle.remove(src);
1438                }
1439            })
1440            .unwrap();
1441
1442        ping.ping();
1443
1444        let mut opt_src = Some(ping_token);
1445
1446        event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1447
1448        assert!(opt_src.is_none());
1449    }
1450
1451    #[test]
1452    fn non_static_data() {
1453        use std::sync::mpsc;
1454
1455        let (sender, receiver) = mpsc::channel();
1456
1457        {
1458            struct RefSender<'a>(&'a mpsc::Sender<()>);
1459            let mut ref_sender = RefSender(&sender);
1460
1461            let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1462            let (ping, ping_source) = make_ping().unwrap();
1463            let _ping_token = event_loop
1464                .handle()
1465                .insert_source(ping_source, |_, _, ref_sender| {
1466                    ref_sender.0.send(()).unwrap();
1467                })
1468                .unwrap();
1469
1470            ping.ping();
1471
1472            event_loop
1473                .dispatch(Duration::ZERO, &mut ref_sender)
1474                .unwrap();
1475        }
1476
1477        receiver.recv().unwrap();
1478        // sender still usable (e.g. for another EventLoop)
1479        drop(sender);
1480    }
1481
1482    #[cfg(feature = "block_on")]
1483    #[test]
1484    fn block_on_test() {
1485        use crate::sources::timer::TimeoutFuture;
1486        use std::time::Duration;
1487
1488        let mut evl = EventLoop::<()>::try_new().unwrap();
1489
1490        let mut data = 22;
1491        let timeout = {
1492            let data = &mut data;
1493            let evl_handle = evl.handle();
1494
1495            async move {
1496                TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1497                *data = 32;
1498                11
1499            }
1500        };
1501
1502        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1503        assert_eq!(result, Some(11));
1504        assert_eq!(data, 32);
1505    }
1506
1507    #[cfg(feature = "block_on")]
1508    #[test]
1509    fn block_on_early_cancel() {
1510        use crate::sources::timer;
1511        use std::time::Duration;
1512
1513        let mut evl = EventLoop::<()>::try_new().unwrap();
1514
1515        let mut data = 22;
1516        let timeout = {
1517            let data = &mut data;
1518            let evl_handle = evl.handle();
1519
1520            async move {
1521                timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1522                *data = 32;
1523                11
1524            }
1525        };
1526
1527        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1528        let handle = evl.get_signal();
1529        let _timer_token = evl
1530            .handle()
1531            .insert_source(timer_source, move |_, _, _| {
1532                handle.stop();
1533                timer::TimeoutAction::Drop
1534            })
1535            .unwrap();
1536
1537        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1538        assert_eq!(result, None);
1539        assert_eq!(data, 22);
1540    }
1541
1542    #[test]
1543    fn reuse() {
1544        use crate::sources::timer;
1545        use std::sync::{Arc, Mutex};
1546        use std::time::{Duration, Instant};
1547
1548        let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1549        let handle = evl.handle();
1550
1551        let data = Arc::new(Mutex::new(1));
1552        let data_cloned = data.clone();
1553
1554        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1555        let mut first_timer_token = evl
1556            .handle()
1557            .insert_source(timer_source, move |_, _, own_token| {
1558                handle.remove(*own_token);
1559                let data_cloned = data_cloned.clone();
1560                let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1561                    *data_cloned.lock().unwrap() = 2;
1562                    timer::TimeoutAction::Drop
1563                });
1564                timer::TimeoutAction::Drop
1565            })
1566            .unwrap();
1567
1568        let now = Instant::now();
1569        loop {
1570            evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1571                .unwrap();
1572            if Instant::now().duration_since(now) > Duration::from_secs(3) {
1573                break;
1574            }
1575        }
1576
1577        assert_eq!(*data.lock().unwrap(), 2);
1578    }
1579
1580    #[test]
1581    fn drop_of_subsource() {
1582        struct WithSubSource {
1583            token: Option<Token>,
1584        }
1585
1586        impl crate::EventSource for WithSubSource {
1587            type Event = ();
1588            type Metadata = ();
1589            type Ret = ();
1590            type Error = crate::Error;
1591            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1592
1593            fn process_events<F>(
1594                &mut self,
1595                _: Readiness,
1596                _: Token,
1597                mut callback: F,
1598            ) -> Result<PostAction, Self::Error>
1599            where
1600                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1601            {
1602                callback((), &mut ());
1603                // Drop the source
1604                Ok(PostAction::Remove)
1605            }
1606
1607            fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1608                // produce a few tokens to emulate a subsource
1609                fact.token();
1610                fact.token();
1611                self.token = Some(fact.token());
1612                Ok(())
1613            }
1614
1615            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1616                Ok(())
1617            }
1618
1619            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1620                Ok(())
1621            }
1622
1623            // emulate a readiness
1624            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1625                Ok(self.token.map(|token| {
1626                    (
1627                        Readiness {
1628                            readable: true,
1629                            writable: false,
1630                            error: false,
1631                        },
1632                        token,
1633                    )
1634                }))
1635            }
1636        }
1637
1638        // Now the actual test
1639        let mut evl = EventLoop::<bool>::try_new().unwrap();
1640        evl.handle()
1641            .insert_source(WithSubSource { token: None }, |_, _, ran| {
1642                *ran = true;
1643            })
1644            .unwrap();
1645
1646        let mut ran = false;
1647
1648        evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1649
1650        assert!(ran);
1651    }
1652
1653    // A dummy EventSource to test insertion and removal of sources
1654    struct DummySource;
1655
1656    impl crate::EventSource for DummySource {
1657        type Event = ();
1658        type Metadata = ();
1659        type Ret = ();
1660        type Error = crate::Error;
1661
1662        fn process_events<F>(
1663            &mut self,
1664            _: Readiness,
1665            _: Token,
1666            mut callback: F,
1667        ) -> Result<PostAction, Self::Error>
1668        where
1669            F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1670        {
1671            callback((), &mut ());
1672            Ok(PostAction::Continue)
1673        }
1674
1675        fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1676            Ok(())
1677        }
1678
1679        fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1680            Ok(())
1681        }
1682
1683        fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1684            Ok(())
1685        }
1686    }
1687}