calloop/
sys.rs

1use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::Arc, time::Duration};
2
3#[cfg(unix)]
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd as Borrowed, RawFd as Raw};
5
6#[cfg(windows)]
7use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket as Borrowed, RawSocket as Raw};
8
9use polling::{Event, Events, PollMode, Poller};
10
11use crate::sources::timer::TimerWheel;
12use crate::token::TokenInner;
13use crate::RegistrationToken;
14
15/// Possible modes for registering a file descriptor
16#[derive(Copy, Clone, Debug)]
17pub enum Mode {
18    /// Single event generation
19    ///
20    /// This FD will be disabled as soon as it has generated one event.
21    ///
22    /// The user will need to use `LoopHandle::update()` to re-enable it if
23    /// desired.
24    OneShot,
25
26    /// Level-triggering
27    ///
28    /// This FD will report events on every poll as long as the requested interests
29    /// are available.
30    Level,
31
32    /// Edge-triggering
33    ///
34    /// This FD will report events only when it *gains* one of the requested interests.
35    /// it must thus be fully processed before it'll generate events again.
36    ///
37    /// This mode is not supported on certain platforms, and an error will be returned
38    /// if it is used.
39    ///
40    /// ## Supported Platforms
41    ///
42    /// As of the time of writing, the platforms that support edge triggered polling are
43    /// as follows:
44    ///
45    /// - Linux/Android
46    /// - macOS/iOS/tvOS/watchOS
47    /// - FreeBSD/OpenBSD/NetBSD/DragonflyBSD
48    Edge,
49}
50
51/// Interest to register regarding the file descriptor
52#[derive(Copy, Clone, Debug)]
53pub struct Interest {
54    /// Wait for the FD to be readable
55    pub readable: bool,
56
57    /// Wait for the FD to be writable
58    pub writable: bool,
59}
60
61impl Interest {
62    /// Shorthand for empty interest
63    pub const EMPTY: Interest = Interest {
64        readable: false,
65        writable: false,
66    };
67
68    /// Shorthand for read interest
69    pub const READ: Interest = Interest {
70        readable: true,
71        writable: false,
72    };
73
74    /// Shorthand for write interest
75    pub const WRITE: Interest = Interest {
76        readable: false,
77        writable: true,
78    };
79
80    /// Shorthand for read and write interest
81    pub const BOTH: Interest = Interest {
82        readable: true,
83        writable: true,
84    };
85}
86
87/// Readiness for a file descriptor notification
88#[derive(Copy, Clone, Debug)]
89pub struct Readiness {
90    /// Is the FD readable
91    pub readable: bool,
92
93    /// Is the FD writable
94    pub writable: bool,
95
96    /// Is the FD in an error state
97    pub error: bool,
98}
99
100impl Readiness {
101    /// Shorthand for empty readiness
102    pub const EMPTY: Readiness = Readiness {
103        readable: false,
104        writable: false,
105        error: false,
106    };
107}
108
109#[derive(Debug)]
110pub(crate) struct PollEvent {
111    pub(crate) readiness: Readiness,
112    pub(crate) token: Token,
113}
114
115/// Factory for creating tokens in your registrations
116///
117/// When composing event sources, each sub-source needs to
118/// have its own token to identify itself. This factory is
119/// provided to produce such unique tokens.
120
121#[derive(Debug)]
122pub struct TokenFactory {
123    next_token: TokenInner,
124}
125
126impl TokenFactory {
127    pub(crate) fn new(token: TokenInner) -> TokenFactory {
128        TokenFactory {
129            next_token: token.forget_sub_id(),
130        }
131    }
132
133    /// Get the "raw" registration token of this TokenFactory
134    pub(crate) fn registration_token(&self) -> RegistrationToken {
135        RegistrationToken::new(self.next_token.forget_sub_id())
136    }
137
138    /// Produce a new unique token
139    pub fn token(&mut self) -> Token {
140        let token = self.next_token;
141        self.next_token = token.increment_sub_id();
142        Token { inner: token }
143    }
144}
145
146/// A token (for implementation of the [`EventSource`](crate::EventSource) trait)
147///
148/// This token is produced by the [`TokenFactory`] and is used when calling the
149/// [`EventSource`](crate::EventSource) implementations to process event, in order
150/// to identify which sub-source produced them.
151///
152/// You should forward it to the [`Poll`] when registering your file descriptors.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub struct Token {
155    pub(crate) inner: TokenInner,
156}
157
158/// The polling system
159///
160/// This type represents the polling system of calloop, on which you
161/// can register your file descriptors. This interface is only accessible in
162/// implementations of the [`EventSource`](crate::EventSource) trait.
163///
164/// You only need to interact with this type if you are implementing your
165/// own event sources, while implementing the [`EventSource`](crate::EventSource) trait.
166/// And even in this case, you can often just use the [`Generic`](crate::generic::Generic) event
167/// source and delegate the implementations to it.
168pub struct Poll {
169    /// The handle to wepoll/epoll/kqueue/... used to poll for events.
170    pub(crate) poller: Arc<Poller>,
171
172    /// The buffer of events returned by the poller.
173    events: RefCell<Events>,
174
175    /// The sources registered as level triggered.
176    ///
177    /// Some platforms that `polling` supports do not support level-triggered events. As of the time
178    /// of writing, this only includes Solaris and illumos. To work around this, we emulate level
179    /// triggered events by keeping this map of file descriptors.
180    ///
181    /// One can emulate level triggered events on top of oneshot events by just re-registering the
182    /// file descriptor every time it is polled. However, this is not ideal, as it requires a
183    /// system call every time. It's better to use the intergrated system, if available.
184    level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
185
186    pub(crate) timers: Rc<RefCell<TimerWheel>>,
187}
188
189impl std::fmt::Debug for Poll {
190    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        f.write_str("Poll { ... }")
193    }
194}
195
196impl Poll {
197    pub(crate) fn new() -> crate::Result<Poll> {
198        Self::new_inner(false)
199    }
200
201    fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
202        let poller = Poller::new()?;
203        let level_triggered = if poller.supports_level() && !force_fallback_lt {
204            None
205        } else {
206            Some(RefCell::new(HashMap::new()))
207        };
208
209        Ok(Poll {
210            poller: Arc::new(poller),
211            events: RefCell::new(Events::new()),
212            timers: Rc::new(RefCell::new(TimerWheel::new())),
213            level_triggered,
214        })
215    }
216
217    pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
218        let now = std::time::Instant::now();
219
220        // Adjust the timeout for the timers.
221        if let Some(next_timeout) = self.timers.borrow().next_deadline() {
222            if next_timeout <= now {
223                timeout = Some(Duration::ZERO);
224            } else if let Some(deadline) = timeout {
225                timeout = Some(std::cmp::min(deadline, next_timeout - now));
226            } else {
227                timeout = Some(next_timeout - now);
228            }
229        };
230
231        let mut events = self.events.borrow_mut();
232        events.clear();
233        self.poller.wait(&mut events, timeout)?;
234
235        // Convert `polling` events to `calloop` events.
236        let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
237        let mut poll_events = events
238            .iter()
239            .map(|ev| {
240                // If we need to emulate level-triggered events...
241                if let Some(level_triggered) = level_triggered.as_ref() {
242                    // ...and this event is from a level-triggered source...
243                    if let Some((source, interest)) = level_triggered.get(&ev.key) {
244                        // ...then we need to re-register the source.
245                        // SAFETY: The source is valid.
246                        self.poller
247                            .modify(unsafe { Borrowed::borrow_raw(*source) }, *interest)?;
248                    }
249                }
250
251                Ok(PollEvent {
252                    readiness: Readiness {
253                        readable: ev.readable,
254                        writable: ev.writable,
255                        error: false,
256                    },
257                    token: Token {
258                        inner: TokenInner::from(ev.key),
259                    },
260                })
261            })
262            .collect::<std::io::Result<Vec<_>>>()?;
263
264        drop(events);
265
266        // Update 'now' as some time may have elapsed in poll()
267        let now = std::time::Instant::now();
268        let mut timers = self.timers.borrow_mut();
269        while let Some((_, token)) = timers.next_expired(now) {
270            poll_events.push(PollEvent {
271                readiness: Readiness {
272                    readable: true,
273                    writable: false,
274                    error: false,
275                },
276                token,
277            });
278        }
279
280        Ok(poll_events)
281    }
282
283    /// Register a new file descriptor for polling
284    ///
285    /// The file descriptor will be registered with given interest,
286    /// mode and token. This function will fail if given a
287    /// bad file descriptor or if the provided file descriptor is already
288    /// registered.
289    ///
290    /// # Safety
291    ///
292    /// The registered source must not be dropped before it is unregistered.
293    ///
294    /// # Leaking tokens
295    ///
296    /// If your event source is dropped without being unregistered, the token
297    /// passed in here will remain on the heap and continue to be used by the
298    /// polling system even though no event source will match it.
299    pub unsafe fn register(
300        &self,
301        #[cfg(unix)] fd: impl AsFd,
302        #[cfg(windows)] fd: impl AsSocket,
303        interest: Interest,
304        mode: Mode,
305        token: Token,
306    ) -> crate::Result<()> {
307        let raw = {
308            #[cfg(unix)]
309            {
310                fd.as_fd().as_raw_fd()
311            }
312
313            #[cfg(windows)]
314            {
315                fd.as_socket().as_raw_socket()
316            }
317        };
318
319        let ev = cvt_interest(interest, token);
320
321        // SAFETY: See invariant on function.
322        unsafe {
323            self.poller
324                .add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
325        }
326
327        // If this is level triggered and we're emulating level triggered mode...
328        if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
329            // ...then we need to keep track of the source.
330            let mut level_triggered = level_triggered.borrow_mut();
331            level_triggered.insert(ev.key, (raw, ev));
332        }
333
334        Ok(())
335    }
336
337    /// Update the registration for a file descriptor
338    ///
339    /// This allows you to change the interest, mode or token of a file
340    /// descriptor. Fails if the provided fd is not currently registered.
341    ///
342    /// See note on [`register()`](Self::register()) regarding leaking.
343    pub fn reregister(
344        &self,
345        #[cfg(unix)] fd: impl AsFd,
346        #[cfg(windows)] fd: impl AsSocket,
347        interest: Interest,
348        mode: Mode,
349        token: Token,
350    ) -> crate::Result<()> {
351        let (borrowed, raw) = {
352            #[cfg(unix)]
353            {
354                (fd.as_fd(), fd.as_fd().as_raw_fd())
355            }
356
357            #[cfg(windows)]
358            {
359                (fd.as_socket(), fd.as_socket().as_raw_socket())
360            }
361        };
362
363        let ev = cvt_interest(interest, token);
364        self.poller
365            .modify_with_mode(borrowed, ev, cvt_mode(mode, self.poller.supports_level()))?;
366
367        // If this is level triggered and we're emulating level triggered mode...
368        if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
369            // ...then we need to keep track of the source.
370            let mut level_triggered = level_triggered.borrow_mut();
371            level_triggered.insert(ev.key, (raw, ev));
372        }
373
374        Ok(())
375    }
376
377    /// Unregister a file descriptor
378    ///
379    /// This file descriptor will no longer generate events. Fails if the
380    /// provided file descriptor is not currently registered.
381    pub fn unregister(
382        &self,
383        #[cfg(unix)] fd: impl AsFd,
384        #[cfg(windows)] fd: impl AsSocket,
385    ) -> crate::Result<()> {
386        let (borrowed, raw) = {
387            #[cfg(unix)]
388            {
389                (fd.as_fd(), fd.as_fd().as_raw_fd())
390            }
391
392            #[cfg(windows)]
393            {
394                (fd.as_socket(), fd.as_socket().as_raw_socket())
395            }
396        };
397        self.poller.delete(borrowed)?;
398
399        if let Some(level_triggered) = self.level_triggered.as_ref() {
400            let mut level_triggered = level_triggered.borrow_mut();
401            level_triggered.retain(|_, (source, _)| *source != raw);
402        }
403
404        Ok(())
405    }
406
407    /// Get a thread-safe handle which can be used to wake up the `Poll`.
408    pub(crate) fn notifier(&self) -> Notifier {
409        Notifier(self.poller.clone())
410    }
411
412    /// Get a reference to the poller.
413    pub(crate) fn poller(&self) -> &Arc<Poller> {
414        &self.poller
415    }
416}
417
418/// Thread-safe handle which can be used to wake up the `Poll`.
419#[derive(Clone)]
420pub(crate) struct Notifier(Arc<Poller>);
421
422impl Notifier {
423    pub(crate) fn notify(&self) -> crate::Result<()> {
424        self.0.notify()?;
425
426        Ok(())
427    }
428}
429
430fn cvt_interest(interest: Interest, tok: Token) -> Event {
431    let mut event = Event::none(tok.inner.into());
432    event.readable = interest.readable;
433    event.writable = interest.writable;
434    event
435}
436
437fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
438    if !supports_other_modes {
439        return PollMode::Oneshot;
440    }
441
442    match mode {
443        Mode::Edge => PollMode::Edge,
444        Mode::Level => PollMode::Level,
445        Mode::OneShot => PollMode::Oneshot,
446    }
447}