calloop/sources/
channel.rs

1//! An MPSC channel whose receiving end is an event source
2//!
3//! Create a channel using [`channel()`](channel), which returns a
4//! [`Sender`] that can be cloned and sent accross threads if `T: Send`,
5//! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop).
6//! It will generate one event per message.
7//!
8//! A synchronous version of the channel is provided by [`sync_channel`], in which
9//! the [`SyncSender`] will block when the channel is full.
10
11use std::sync::mpsc;
12
13use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
14
15use super::ping::{make_ping, Ping, PingError, PingSource};
16
17/// The events generated by the channel event source
18#[derive(Debug)]
19pub enum Event<T> {
20    /// A message was received and is bundled here
21    Msg(T),
22    /// The channel was closed
23    ///
24    /// This means all the `Sender`s associated with this channel
25    /// have been dropped, no more messages will ever be received.
26    Closed,
27}
28
29/// The sender end of a channel
30///
31/// It can be cloned and sent accross threads (if `T` is).
32#[derive(Debug)]
33pub struct Sender<T> {
34    sender: mpsc::Sender<T>,
35    ping: Ping,
36}
37
38impl<T> Clone for Sender<T> {
39    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
40    fn clone(&self) -> Sender<T> {
41        Sender {
42            sender: self.sender.clone(),
43            ping: self.ping.clone(),
44        }
45    }
46}
47
48impl<T> Sender<T> {
49    /// Send a message to the channel
50    ///
51    /// This will wake the event loop and deliver an `Event::Msg` to
52    /// it containing the provided value.
53    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
54        self.sender.send(t).map(|()| self.ping.ping())
55    }
56}
57
58impl<T> Drop for Sender<T> {
59    fn drop(&mut self) {
60        // ping on drop, to notify about channel closure
61        self.ping.ping();
62    }
63}
64
65/// The sender end of a synchronous channel
66///
67/// It can be cloned and sent accross threads (if `T` is).
68#[derive(Debug)]
69pub struct SyncSender<T> {
70    sender: mpsc::SyncSender<T>,
71    ping: Ping,
72}
73
74impl<T> Clone for SyncSender<T> {
75    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
76    fn clone(&self) -> SyncSender<T> {
77        SyncSender {
78            sender: self.sender.clone(),
79            ping: self.ping.clone(),
80        }
81    }
82}
83
84impl<T> SyncSender<T> {
85    /// Send a message to the synchronous channel
86    ///
87    /// This will wake the event loop and deliver an `Event::Msg` to
88    /// it containing the provided value. If the channel is full, this
89    /// function will block until the event loop empties it and it can
90    /// deliver the message.
91    ///
92    /// Due to the blocking behavior, this method should not be used on the
93    /// same thread as the one running the event loop, as it could cause deadlocks.
94    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
95        let ret = self.try_send(t);
96        match ret {
97            Ok(()) => Ok(()),
98            Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
99            Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
100        }
101    }
102
103    /// Send a message to the synchronous channel
104    ///
105    /// This will wake the event loop and deliver an `Event::Msg` to
106    /// it containing the provided value. If the channel is full, this
107    /// function will return an error, but the event loop will still be
108    /// signaled for readiness.
109    pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
110        let ret = self.sender.try_send(t);
111        if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
112            self.ping.ping();
113        }
114        ret
115    }
116}
117
118/// The receiving end of the channel
119///
120/// This is the event source to be inserted into your `EventLoop`.
121#[derive(Debug)]
122pub struct Channel<T> {
123    receiver: mpsc::Receiver<T>,
124    source: PingSource,
125}
126
127// This impl is safe because the Channel is only able to move around threads
128// when it is not inserted into an event loop. (Otherwise it is stuck into
129// a Source<_> and the internals of calloop, which are not Send).
130// At this point, the Arc<Receiver> has a count of 1, and it is obviously
131// safe to Send between threads.
132unsafe impl<T: Send> Send for Channel<T> {}
133
134impl<T> Channel<T> {
135    /// Proxy for [`mpsc::Receiver::recv`] to manually poll events.
136    ///
137    /// *Note*: Normally you would want to use the `Channel` by inserting
138    /// it into an event loop instead. Use this for example to immediately
139    /// dispatch pending events after creation.
140    pub fn recv(&self) -> Result<T, mpsc::RecvError> {
141        self.receiver.recv()
142    }
143
144    /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events.
145    ///
146    /// *Note*: Normally you would want to use the `Channel` by inserting
147    /// it into an event loop instead. Use this for example to immediately
148    /// dispatch pending events after creation.
149    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
150        self.receiver.try_recv()
151    }
152}
153
154/// Create a new asynchronous channel
155pub fn channel<T>() -> (Sender<T>, Channel<T>) {
156    let (sender, receiver) = mpsc::channel();
157    let (ping, source) = make_ping().expect("Failed to create a Ping.");
158    (Sender { sender, ping }, Channel { receiver, source })
159}
160
161/// Create a new synchronous, bounded channel
162pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
163    let (sender, receiver) = mpsc::sync_channel(bound);
164    let (ping, source) = make_ping().expect("Failed to create a Ping.");
165    (SyncSender { sender, ping }, Channel { receiver, source })
166}
167
168impl<T> EventSource for Channel<T> {
169    type Event = Event<T>;
170    type Metadata = ();
171    type Ret = ();
172    type Error = ChannelError;
173
174    fn process_events<C>(
175        &mut self,
176        readiness: Readiness,
177        token: Token,
178        mut callback: C,
179    ) -> Result<PostAction, Self::Error>
180    where
181        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
182    {
183        let receiver = &self.receiver;
184        self.source
185            .process_events(readiness, token, |(), &mut ()| loop {
186                match receiver.try_recv() {
187                    Ok(val) => callback(Event::Msg(val), &mut ()),
188                    Err(mpsc::TryRecvError::Empty) => break,
189                    Err(mpsc::TryRecvError::Disconnected) => {
190                        callback(Event::Closed, &mut ());
191                        break;
192                    }
193                }
194            })
195            .map_err(ChannelError)
196    }
197
198    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
199        self.source.register(poll, token_factory)
200    }
201
202    fn reregister(
203        &mut self,
204        poll: &mut Poll,
205        token_factory: &mut TokenFactory,
206    ) -> crate::Result<()> {
207        self.source.reregister(poll, token_factory)
208    }
209
210    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
211        self.source.unregister(poll)
212    }
213}
214
215/// An error arising from processing events for a channel.
216#[derive(thiserror::Error, Debug)]
217#[error(transparent)]
218pub struct ChannelError(PingError);
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn basic_channel() {
226        let mut event_loop = crate::EventLoop::try_new().unwrap();
227
228        let handle = event_loop.handle();
229
230        let (tx, rx) = channel::<()>();
231
232        // (got_msg, got_closed)
233        let mut got = (false, false);
234
235        let _channel_token = handle
236            .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
237                Event::Msg(()) => {
238                    got.0 = true;
239                }
240                Event::Closed => {
241                    got.1 = true;
242                }
243            })
244            .unwrap();
245
246        // nothing is sent, nothing is received
247        event_loop
248            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
249            .unwrap();
250
251        assert_eq!(got, (false, false));
252
253        // a message is send
254        tx.send(()).unwrap();
255        event_loop
256            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
257            .unwrap();
258
259        assert_eq!(got, (true, false));
260
261        // the sender is dropped
262        ::std::mem::drop(tx);
263        event_loop
264            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
265            .unwrap();
266
267        assert_eq!(got, (true, true));
268    }
269
270    #[test]
271    fn basic_sync_channel() {
272        let mut event_loop = crate::EventLoop::try_new().unwrap();
273
274        let handle = event_loop.handle();
275
276        let (tx, rx) = sync_channel::<()>(2);
277
278        let mut received = (0, false);
279
280        let _channel_token = handle
281            .insert_source(
282                rx,
283                move |evt, &mut (), received: &mut (u32, bool)| match evt {
284                    Event::Msg(()) => {
285                        received.0 += 1;
286                    }
287                    Event::Closed => {
288                        received.1 = true;
289                    }
290                },
291            )
292            .unwrap();
293
294        // nothing is sent, nothing is received
295        event_loop
296            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
297            .unwrap();
298
299        assert_eq!(received.0, 0);
300        assert!(!received.1);
301
302        // fill the channel
303        tx.send(()).unwrap();
304        tx.send(()).unwrap();
305        assert!(tx.try_send(()).is_err());
306
307        // empty it
308        event_loop
309            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
310            .unwrap();
311
312        assert_eq!(received.0, 2);
313        assert!(!received.1);
314
315        // send a final message and drop the sender
316        tx.send(()).unwrap();
317        std::mem::drop(tx);
318
319        // final read of the channel
320        event_loop
321            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
322            .unwrap();
323
324        assert_eq!(received.0, 3);
325        assert!(received.1);
326    }
327}