calloop_wayland_source/
lib.rs

1// SPDX-License-Identifier: MIT
2
3//! Utilities for using an [`EventQueue`] from wayland-client with an event loop
4//! that performs polling with [`calloop`](https://crates.io/crates/calloop).
5//!
6//! # Example
7//!
8//! ```no_run,rust
9//! use calloop::EventLoop;
10//! use calloop_wayland_source::WaylandSource;
11//! use wayland_client::{Connection, QueueHandle};
12//!
13//! // Create a Wayland connection and a queue.
14//! let connection = Connection::connect_to_env().unwrap();
15//! let event_queue = connection.new_event_queue();
16//! let queue_handle = event_queue.handle();
17//!
18//! // Create the calloop event loop to drive everytihng.
19//! let mut event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
20//! let loop_handle = event_loop.handle();
21//!
22//! // Insert the wayland source into the calloop's event loop.
23//! WaylandSource::new(connection, event_queue).insert(loop_handle).unwrap();
24//!
25//! // This will start dispatching the event loop and processing pending wayland requests.
26//! while let Ok(_) = event_loop.dispatch(None, &mut ()) {
27//!     // Your logic here.
28//! }
29//! ```
30
31#![deny(unsafe_op_in_unsafe_fn)]
32use std::io;
33
34use calloop::generic::Generic;
35use calloop::{
36    EventSource, InsertError, Interest, LoopHandle, Mode, Poll, PostAction, Readiness,
37    RegistrationToken, Token, TokenFactory,
38};
39use rustix::io::Errno;
40use wayland_backend::client::{ReadEventsGuard, WaylandError};
41use wayland_client::{Connection, DispatchError, EventQueue};
42
43#[cfg(feature = "log")]
44use log::error as log_error;
45#[cfg(not(feature = "log"))]
46use std::eprintln as log_error;
47
48/// An adapter to insert an [`EventQueue`] into a calloop
49/// [`EventLoop`](calloop::EventLoop).
50///
51/// This type implements [`EventSource`] which generates an event whenever
52/// events on the event queue need to be dispatched. The event queue available
53/// in the callback calloop registers may be used to dispatch pending
54/// events using [`EventQueue::dispatch_pending`].
55///
56/// [`WaylandSource::insert`] can be used to insert this source into an event
57/// loop and automatically dispatch pending events on the event queue.
58#[derive(Debug)]
59pub struct WaylandSource<D> {
60    // In theory, we could use the same event queue inside `connection_source`
61    // However, calloop's safety requirements mean that we cannot then give
62    // mutable access to the queue, which is incompatible with our current interface
63    // Additionally, `Connection` is cheaply cloneable, so it's not a huge burden
64    queue: EventQueue<D>,
65    connection_source: Generic<Connection>,
66    read_guard: Option<ReadEventsGuard>,
67    /// Calloop's before_will_sleep method allows
68    /// skipping the sleeping by returning a `Token`.
69    /// We cannot produce this on the fly, so store it here instead
70    fake_token: Option<Token>,
71    // Some calloop event handlers don't support error handling, so we have to store the error
72    // for a short time until we reach a method which allows it
73    stored_error: Result<(), io::Error>,
74}
75
76impl<D> WaylandSource<D> {
77    /// Wrap an [`EventQueue`] as a [`WaylandSource`].
78    ///
79    /// `queue` must be from the connection `Connection`.
80    /// This is not a safety invariant, but not following this may cause
81    /// freezes or hangs
82    pub fn new(connection: Connection, queue: EventQueue<D>) -> WaylandSource<D> {
83        let connection_source = Generic::new(connection, Interest::READ, Mode::Level);
84
85        WaylandSource {
86            queue,
87            connection_source,
88            read_guard: None,
89            fake_token: None,
90            stored_error: Ok(()),
91        }
92    }
93
94    /// Access the underlying event queue
95    ///
96    /// Note that you should not replace this queue with a queue from a
97    /// different `Connection`, as that may cause freezes or other hangs.
98    pub fn queue(&mut self) -> &mut EventQueue<D> {
99        &mut self.queue
100    }
101
102    /// Access the connection to the Wayland server
103    pub fn connection(&self) -> &Connection {
104        self.connection_source.get_ref()
105    }
106
107    /// Insert this source into the given event loop.
108    ///
109    /// This adapter will pass the event loop's shared data as the `D` type for
110    /// the event loop.
111    pub fn insert(self, handle: LoopHandle<D>) -> Result<RegistrationToken, InsertError<Self>>
112    where
113        D: 'static,
114    {
115        handle.insert_source(self, |_, queue, data| queue.dispatch_pending(data))
116    }
117}
118
119impl<D> EventSource for WaylandSource<D> {
120    type Error = calloop::Error;
121    type Event = ();
122    /// The underlying event queue.
123    ///
124    /// You should call [`EventQueue::dispatch_pending`] inside your callback
125    /// using this queue.
126    type Metadata = EventQueue<D>;
127    type Ret = Result<usize, DispatchError>;
128
129    const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
130
131    fn process_events<F>(
132        &mut self,
133        _: Readiness,
134        _: Token,
135        mut callback: F,
136    ) -> Result<PostAction, Self::Error>
137    where
138        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
139    {
140        debug_assert!(self.read_guard.is_none());
141
142        // Take the stored error
143        std::mem::replace(&mut self.stored_error, Ok(()))?;
144
145        // We know that the event will either be a fake event
146        // produced in `before_will_sleep`, or a "real" event from the underlying
147        // source (self.queue_events). Our behaviour in both cases is the same.
148        // In theory we might want to call the process_events handler on the underlying
149        // event source. However, we know that Generic's `process_events` call is a
150        // no-op, so we just handle the event ourselves.
151
152        let queue = &mut self.queue;
153        // Dispatch any pending events in the queue
154        Self::loop_callback_pending(queue, &mut callback)?;
155
156        // Once dispatching is finished, flush the responses to the compositor
157        flush_queue(queue)?;
158
159        Ok(PostAction::Continue)
160    }
161
162    fn register(
163        &mut self,
164        poll: &mut Poll,
165        token_factory: &mut TokenFactory,
166    ) -> calloop::Result<()> {
167        self.fake_token = Some(token_factory.token());
168        self.connection_source.register(poll, token_factory)
169    }
170
171    fn reregister(
172        &mut self,
173        poll: &mut Poll,
174        token_factory: &mut TokenFactory,
175    ) -> calloop::Result<()> {
176        self.connection_source.reregister(poll, token_factory)
177    }
178
179    fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
180        self.connection_source.unregister(poll)
181    }
182
183    fn before_sleep(&mut self) -> calloop::Result<Option<(Readiness, Token)>> {
184        debug_assert!(self.read_guard.is_none());
185
186        flush_queue(&mut self.queue)?;
187
188        self.read_guard = self.queue.prepare_read();
189        match self.read_guard {
190            Some(_) => Ok(None),
191            // If getting the guard failed, that means that there are
192            // events in the queue, and so we need to handle the events instantly
193            // rather than waiting on an event in polling. We tell calloop this
194            // by returning Some here. Note that the readiness value is
195            // never used, so we just need some marker
196            None => Ok(Some((Readiness::EMPTY, self.fake_token.unwrap()))),
197        }
198    }
199
200    fn before_handle_events(&mut self, events: calloop::EventIterator<'_>) {
201        // It's important that the guard isn't held whilst process_events calls occur
202        // This can use arbitrary user-provided code, which may want to use the wayland
203        // socket For example, creating a Vulkan surface needs access to the
204        // connection
205        let guard = self.read_guard.take();
206        if events.count() > 0 {
207            // Read events from the socket if any are available
208            if let Some(Err(WaylandError::Io(err))) = guard.map(ReadEventsGuard::read) {
209                // If some other thread read events before us, concurrently, that's an expected
210                // case, so this error isn't an issue. Other error kinds do need to be returned,
211                // however
212                if err.kind() != io::ErrorKind::WouldBlock {
213                    // before_handle_events doesn't allow returning errors
214                    // For now, cache it in self until process_events is called
215                    self.stored_error = Err(err);
216                }
217            }
218        }
219    }
220}
221
222fn flush_queue<D>(queue: &mut EventQueue<D>) -> Result<(), calloop::Error> {
223    if let Err(WaylandError::Io(err)) = queue.flush() {
224        // WouldBlock error means the compositor could not process all
225        // our messages quickly. Either it is slowed
226        // down or we are a spammer. Should not really
227        // happen, if it does we do nothing and will flush again later
228        if err.kind() != io::ErrorKind::WouldBlock {
229            // in case of error, forward it and fast-exit
230            log_error!("Error trying to flush the wayland display: {}", err);
231            return Err(err.into());
232        }
233    }
234    Ok(())
235}
236
237impl<D> WaylandSource<D> {
238    /// Loop over the callback until all pending messages have been dispatched.
239    fn loop_callback_pending<F>(queue: &mut EventQueue<D>, callback: &mut F) -> io::Result<()>
240    where
241        F: FnMut((), &mut EventQueue<D>) -> Result<usize, DispatchError>,
242    {
243        // Loop on the callback until no pending events are left.
244        loop {
245            match callback((), queue) {
246                // No more pending events.
247                Ok(0) => break Ok(()),
248                Ok(_) => continue,
249                Err(DispatchError::Backend(WaylandError::Io(err))) => {
250                    return Err(err);
251                },
252                Err(DispatchError::Backend(WaylandError::Protocol(err))) => {
253                    log_error!("Protocol error received on display: {}", err);
254
255                    break Err(Errno::PROTO.into());
256                },
257                Err(DispatchError::BadMessage { interface, sender_id, opcode }) => {
258                    log_error!(
259                        "Bad message on interface \"{}\": (sender_id: {}, opcode: {})",
260                        interface,
261                        sender_id,
262                        opcode,
263                    );
264
265                    break Err(Errno::PROTO.into());
266                },
267            }
268        }
269    }
270}