1use std::sync::mpsc;
12
13use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
14
15use super::ping::{make_ping, Ping, PingError, PingSource};
16
17#[derive(Debug)]
19pub enum Event<T> {
20 Msg(T),
22 Closed,
27}
28
29#[derive(Debug)]
33pub struct Sender<T> {
34 sender: mpsc::Sender<T>,
35 ping: Ping,
36}
37
38impl<T> Clone for Sender<T> {
39 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
40 fn clone(&self) -> Sender<T> {
41 Sender {
42 sender: self.sender.clone(),
43 ping: self.ping.clone(),
44 }
45 }
46}
47
48impl<T> Sender<T> {
49 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
54 self.sender.send(t).map(|()| self.ping.ping())
55 }
56}
57
58impl<T> Drop for Sender<T> {
59 fn drop(&mut self) {
60 self.ping.ping();
62 }
63}
64
65#[derive(Debug)]
69pub struct SyncSender<T> {
70 sender: mpsc::SyncSender<T>,
71 ping: Ping,
72}
73
74impl<T> Clone for SyncSender<T> {
75 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
76 fn clone(&self) -> SyncSender<T> {
77 SyncSender {
78 sender: self.sender.clone(),
79 ping: self.ping.clone(),
80 }
81 }
82}
83
84impl<T> SyncSender<T> {
85 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
95 let ret = self.try_send(t);
96 match ret {
97 Ok(()) => Ok(()),
98 Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
99 Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
100 }
101 }
102
103 pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
110 let ret = self.sender.try_send(t);
111 if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
112 self.ping.ping();
113 }
114 ret
115 }
116}
117
118#[derive(Debug)]
122pub struct Channel<T> {
123 receiver: mpsc::Receiver<T>,
124 source: PingSource,
125}
126
127unsafe impl<T: Send> Send for Channel<T> {}
133
134impl<T> Channel<T> {
135 pub fn recv(&self) -> Result<T, mpsc::RecvError> {
141 self.receiver.recv()
142 }
143
144 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
150 self.receiver.try_recv()
151 }
152}
153
154pub fn channel<T>() -> (Sender<T>, Channel<T>) {
156 let (sender, receiver) = mpsc::channel();
157 let (ping, source) = make_ping().expect("Failed to create a Ping.");
158 (Sender { sender, ping }, Channel { receiver, source })
159}
160
161pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
163 let (sender, receiver) = mpsc::sync_channel(bound);
164 let (ping, source) = make_ping().expect("Failed to create a Ping.");
165 (SyncSender { sender, ping }, Channel { receiver, source })
166}
167
168impl<T> EventSource for Channel<T> {
169 type Event = Event<T>;
170 type Metadata = ();
171 type Ret = ();
172 type Error = ChannelError;
173
174 fn process_events<C>(
175 &mut self,
176 readiness: Readiness,
177 token: Token,
178 mut callback: C,
179 ) -> Result<PostAction, Self::Error>
180 where
181 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
182 {
183 let receiver = &self.receiver;
184 self.source
185 .process_events(readiness, token, |(), &mut ()| loop {
186 match receiver.try_recv() {
187 Ok(val) => callback(Event::Msg(val), &mut ()),
188 Err(mpsc::TryRecvError::Empty) => break,
189 Err(mpsc::TryRecvError::Disconnected) => {
190 callback(Event::Closed, &mut ());
191 break;
192 }
193 }
194 })
195 .map_err(ChannelError)
196 }
197
198 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
199 self.source.register(poll, token_factory)
200 }
201
202 fn reregister(
203 &mut self,
204 poll: &mut Poll,
205 token_factory: &mut TokenFactory,
206 ) -> crate::Result<()> {
207 self.source.reregister(poll, token_factory)
208 }
209
210 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
211 self.source.unregister(poll)
212 }
213}
214
215#[derive(thiserror::Error, Debug)]
217#[error(transparent)]
218pub struct ChannelError(PingError);
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn basic_channel() {
226 let mut event_loop = crate::EventLoop::try_new().unwrap();
227
228 let handle = event_loop.handle();
229
230 let (tx, rx) = channel::<()>();
231
232 let mut got = (false, false);
234
235 let _channel_token = handle
236 .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
237 Event::Msg(()) => {
238 got.0 = true;
239 }
240 Event::Closed => {
241 got.1 = true;
242 }
243 })
244 .unwrap();
245
246 event_loop
248 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
249 .unwrap();
250
251 assert_eq!(got, (false, false));
252
253 tx.send(()).unwrap();
255 event_loop
256 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
257 .unwrap();
258
259 assert_eq!(got, (true, false));
260
261 ::std::mem::drop(tx);
263 event_loop
264 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
265 .unwrap();
266
267 assert_eq!(got, (true, true));
268 }
269
270 #[test]
271 fn basic_sync_channel() {
272 let mut event_loop = crate::EventLoop::try_new().unwrap();
273
274 let handle = event_loop.handle();
275
276 let (tx, rx) = sync_channel::<()>(2);
277
278 let mut received = (0, false);
279
280 let _channel_token = handle
281 .insert_source(
282 rx,
283 move |evt, &mut (), received: &mut (u32, bool)| match evt {
284 Event::Msg(()) => {
285 received.0 += 1;
286 }
287 Event::Closed => {
288 received.1 = true;
289 }
290 },
291 )
292 .unwrap();
293
294 event_loop
296 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
297 .unwrap();
298
299 assert_eq!(received.0, 0);
300 assert!(!received.1);
301
302 tx.send(()).unwrap();
304 tx.send(()).unwrap();
305 assert!(tx.try_send(()).is_err());
306
307 event_loop
309 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
310 .unwrap();
311
312 assert_eq!(received.0, 2);
313 assert!(!received.1);
314
315 tx.send(()).unwrap();
317 std::mem::drop(tx);
318
319 event_loop
321 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
322 .unwrap();
323
324 assert_eq!(received.0, 3);
325 assert!(received.1);
326 }
327}