crossbeam_channel/
channel.rs

1//! The channel interface.
2
3use std::fmt;
4use std::iter::FusedIterator;
5use std::mem;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use crate::context::Context;
11use crate::counter;
12use crate::err::{
13    RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
14};
15use crate::flavors;
16use crate::select::{Operation, SelectHandle, Token};
17
18/// Creates a channel of unbounded capacity.
19///
20/// This channel has a growable buffer that can hold any number of messages at a time.
21///
22/// # Examples
23///
24/// ```
25/// use std::thread;
26/// use crossbeam_channel::unbounded;
27///
28/// let (s, r) = unbounded();
29///
30/// // Computes the n-th Fibonacci number.
31/// fn fib(n: i32) -> i32 {
32///     if n <= 1 {
33///         n
34///     } else {
35///         fib(n - 1) + fib(n - 2)
36///     }
37/// }
38///
39/// // Spawn an asynchronous computation.
40/// thread::spawn(move || s.send(fib(20)).unwrap());
41///
42/// // Print the result of the computation.
43/// println!("{}", r.recv().unwrap());
44/// ```
45pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
46    let (s, r) = counter::new(flavors::list::Channel::new());
47    let s = Sender {
48        flavor: SenderFlavor::List(s),
49    };
50    let r = Receiver {
51        flavor: ReceiverFlavor::List(r),
52    };
53    (s, r)
54}
55
56/// Creates a channel of bounded capacity.
57///
58/// This channel has a buffer that can hold at most `cap` messages at a time.
59///
60/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
61/// receive operations must appear at the same time in order to pair up and pass the message over.
62///
63/// # Examples
64///
65/// A channel of capacity 1:
66///
67/// ```
68/// use std::thread;
69/// use std::time::Duration;
70/// use crossbeam_channel::bounded;
71///
72/// let (s, r) = bounded(1);
73///
74/// // This call returns immediately because there is enough space in the channel.
75/// s.send(1).unwrap();
76///
77/// thread::spawn(move || {
78///     // This call blocks the current thread because the channel is full.
79///     // It will be able to complete only after the first message is received.
80///     s.send(2).unwrap();
81/// });
82///
83/// thread::sleep(Duration::from_secs(1));
84/// assert_eq!(r.recv(), Ok(1));
85/// assert_eq!(r.recv(), Ok(2));
86/// ```
87///
88/// A zero-capacity channel:
89///
90/// ```
91/// use std::thread;
92/// use std::time::Duration;
93/// use crossbeam_channel::bounded;
94///
95/// let (s, r) = bounded(0);
96///
97/// thread::spawn(move || {
98///     // This call blocks the current thread until a receive operation appears
99///     // on the other side of the channel.
100///     s.send(1).unwrap();
101/// });
102///
103/// thread::sleep(Duration::from_secs(1));
104/// assert_eq!(r.recv(), Ok(1));
105/// ```
106pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
107    if cap == 0 {
108        let (s, r) = counter::new(flavors::zero::Channel::new());
109        let s = Sender {
110            flavor: SenderFlavor::Zero(s),
111        };
112        let r = Receiver {
113            flavor: ReceiverFlavor::Zero(r),
114        };
115        (s, r)
116    } else {
117        let (s, r) = counter::new(flavors::array::Channel::with_capacity(cap));
118        let s = Sender {
119            flavor: SenderFlavor::Array(s),
120        };
121        let r = Receiver {
122            flavor: ReceiverFlavor::Array(r),
123        };
124        (s, r)
125    }
126}
127
128/// Creates a receiver that delivers a message after a certain duration of time.
129///
130/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
131/// be sent into the channel after `duration` elapses. The message is the instant at which it is
132/// sent.
133///
134/// # Examples
135///
136/// Using an `after` channel for timeouts:
137///
138/// ```
139/// use std::time::Duration;
140/// use crossbeam_channel::{after, select, unbounded};
141///
142/// let (s, r) = unbounded::<i32>();
143/// let timeout = Duration::from_millis(100);
144///
145/// select! {
146///     recv(r) -> msg => println!("received {:?}", msg),
147///     recv(after(timeout)) -> _ => println!("timed out"),
148/// }
149/// ```
150///
151/// When the message gets sent:
152///
153/// ```
154/// use std::thread;
155/// use std::time::{Duration, Instant};
156/// use crossbeam_channel::after;
157///
158/// // Converts a number of milliseconds into a `Duration`.
159/// let ms = |ms| Duration::from_millis(ms);
160///
161/// // Returns `true` if `a` and `b` are very close `Instant`s.
162/// let eq = |a, b| a + ms(60) > b && b + ms(60) > a;
163///
164/// let start = Instant::now();
165/// let r = after(ms(100));
166///
167/// thread::sleep(ms(500));
168///
169/// // This message was sent 100 ms from the start and received 500 ms from the start.
170/// assert!(eq(r.recv().unwrap(), start + ms(100)));
171/// assert!(eq(Instant::now(), start + ms(500)));
172/// ```
173pub fn after(duration: Duration) -> Receiver<Instant> {
174    match Instant::now().checked_add(duration) {
175        Some(deadline) => Receiver {
176            flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
177        },
178        None => never(),
179    }
180}
181
182/// Creates a receiver that delivers a message at a certain instant in time.
183///
184/// The channel is bounded with capacity of 1 and never gets disconnected. Exactly one message will
185/// be sent into the channel at the moment in time `when`. The message is the instant at which it
186/// is sent, which is the same as `when`. If `when` is in the past, the message will be delivered
187/// instantly to the receiver.
188///
189/// # Examples
190///
191/// Using an `at` channel for timeouts:
192///
193/// ```
194/// use std::time::{Instant, Duration};
195/// use crossbeam_channel::{at, select, unbounded};
196///
197/// let (s, r) = unbounded::<i32>();
198/// let deadline = Instant::now() + Duration::from_millis(500);
199///
200/// select! {
201///     recv(r) -> msg => println!("received {:?}", msg),
202///     recv(at(deadline)) -> _ => println!("timed out"),
203/// }
204/// ```
205///
206/// When the message gets sent:
207///
208/// ```
209/// use std::time::{Duration, Instant};
210/// use crossbeam_channel::at;
211///
212/// // Converts a number of milliseconds into a `Duration`.
213/// let ms = |ms| Duration::from_millis(ms);
214///
215/// let start = Instant::now();
216/// let end = start + ms(100);
217///
218/// let r = at(end);
219///
220/// // This message was sent 100 ms from the start
221/// assert_eq!(r.recv().unwrap(), end);
222/// assert!(Instant::now() > start + ms(100));
223/// ```
224pub fn at(when: Instant) -> Receiver<Instant> {
225    Receiver {
226        flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(when))),
227    }
228}
229
230/// Creates a receiver that never delivers messages.
231///
232/// The channel is bounded with capacity of 0 and never gets disconnected.
233///
234/// # Examples
235///
236/// Using a `never` channel to optionally add a timeout to [`select!`]:
237///
238/// [`select!`]: crate::select!
239///
240/// ```
241/// use std::thread;
242/// use std::time::Duration;
243/// use crossbeam_channel::{after, select, never, unbounded};
244///
245/// let (s, r) = unbounded();
246///
247/// thread::spawn(move || {
248///     thread::sleep(Duration::from_secs(1));
249///     s.send(1).unwrap();
250/// });
251///
252/// // Suppose this duration can be a `Some` or a `None`.
253/// let duration = Some(Duration::from_millis(100));
254///
255/// // Create a channel that times out after the specified duration.
256/// let timeout = duration
257///     .map(|d| after(d))
258///     .unwrap_or(never());
259///
260/// select! {
261///     recv(r) -> msg => assert_eq!(msg, Ok(1)),
262///     recv(timeout) -> _ => println!("timed out"),
263/// }
264/// ```
265pub fn never<T>() -> Receiver<T> {
266    Receiver {
267        flavor: ReceiverFlavor::Never(flavors::never::Channel::new()),
268    }
269}
270
271/// Creates a receiver that delivers messages periodically.
272///
273/// The channel is bounded with capacity of 1 and never gets disconnected. Messages will be
274/// sent into the channel in intervals of `duration`. Each message is the instant at which it is
275/// sent.
276///
277/// # Examples
278///
279/// Using a `tick` channel to periodically print elapsed time:
280///
281/// ```
282/// use std::time::{Duration, Instant};
283/// use crossbeam_channel::tick;
284///
285/// let start = Instant::now();
286/// let ticker = tick(Duration::from_millis(100));
287///
288/// for _ in 0..5 {
289///     ticker.recv().unwrap();
290///     println!("elapsed: {:?}", start.elapsed());
291/// }
292/// ```
293///
294/// When messages get sent:
295///
296/// ```
297/// use std::thread;
298/// use std::time::{Duration, Instant};
299/// use crossbeam_channel::tick;
300///
301/// // Converts a number of milliseconds into a `Duration`.
302/// let ms = |ms| Duration::from_millis(ms);
303///
304/// // Returns `true` if `a` and `b` are very close `Instant`s.
305/// let eq = |a, b| a + ms(65) > b && b + ms(65) > a;
306///
307/// let start = Instant::now();
308/// let r = tick(ms(100));
309///
310/// // This message was sent 100 ms from the start and received 100 ms from the start.
311/// assert!(eq(r.recv().unwrap(), start + ms(100)));
312/// assert!(eq(Instant::now(), start + ms(100)));
313///
314/// thread::sleep(ms(500));
315///
316/// // This message was sent 200 ms from the start and received 600 ms from the start.
317/// assert!(eq(r.recv().unwrap(), start + ms(200)));
318/// assert!(eq(Instant::now(), start + ms(600)));
319///
320/// // This message was sent 700 ms from the start and received 700 ms from the start.
321/// assert!(eq(r.recv().unwrap(), start + ms(700)));
322/// assert!(eq(Instant::now(), start + ms(700)));
323/// ```
324pub fn tick(duration: Duration) -> Receiver<Instant> {
325    match Instant::now().checked_add(duration) {
326        Some(delivery_time) => Receiver {
327            flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
328                delivery_time,
329                duration,
330            ))),
331        },
332        None => never(),
333    }
334}
335
336/// The sending side of a channel.
337///
338/// # Examples
339///
340/// ```
341/// use std::thread;
342/// use crossbeam_channel::unbounded;
343///
344/// let (s1, r) = unbounded();
345/// let s2 = s1.clone();
346///
347/// thread::spawn(move || s1.send(1).unwrap());
348/// thread::spawn(move || s2.send(2).unwrap());
349///
350/// let msg1 = r.recv().unwrap();
351/// let msg2 = r.recv().unwrap();
352///
353/// assert_eq!(msg1 + msg2, 3);
354/// ```
355pub struct Sender<T> {
356    flavor: SenderFlavor<T>,
357}
358
359/// Sender flavors.
360enum SenderFlavor<T> {
361    /// Bounded channel based on a preallocated array.
362    Array(counter::Sender<flavors::array::Channel<T>>),
363
364    /// Unbounded channel implemented as a linked list.
365    List(counter::Sender<flavors::list::Channel<T>>),
366
367    /// Zero-capacity channel.
368    Zero(counter::Sender<flavors::zero::Channel<T>>),
369}
370
371unsafe impl<T: Send> Send for Sender<T> {}
372unsafe impl<T: Send> Sync for Sender<T> {}
373
374impl<T> UnwindSafe for Sender<T> {}
375impl<T> RefUnwindSafe for Sender<T> {}
376
377impl<T> Sender<T> {
378    /// Attempts to send a message into the channel without blocking.
379    ///
380    /// This method will either send a message into the channel immediately or return an error if
381    /// the channel is full or disconnected. The returned error contains the original message.
382    ///
383    /// If called on a zero-capacity channel, this method will send the message only if there
384    /// happens to be a receive operation on the other side of the channel at the same time.
385    ///
386    /// # Examples
387    ///
388    /// ```
389    /// use crossbeam_channel::{bounded, TrySendError};
390    ///
391    /// let (s, r) = bounded(1);
392    ///
393    /// assert_eq!(s.try_send(1), Ok(()));
394    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
395    ///
396    /// drop(r);
397    /// assert_eq!(s.try_send(3), Err(TrySendError::Disconnected(3)));
398    /// ```
399    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
400        match &self.flavor {
401            SenderFlavor::Array(chan) => chan.try_send(msg),
402            SenderFlavor::List(chan) => chan.try_send(msg),
403            SenderFlavor::Zero(chan) => chan.try_send(msg),
404        }
405    }
406
407    /// Blocks the current thread until a message is sent or the channel is disconnected.
408    ///
409    /// If the channel is full and not disconnected, this call will block until the send operation
410    /// can proceed. If the channel becomes disconnected, this call will wake up and return an
411    /// error. The returned error contains the original message.
412    ///
413    /// If called on a zero-capacity channel, this method will wait for a receive operation to
414    /// appear on the other side of the channel.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// use std::thread;
420    /// use std::time::Duration;
421    /// use crossbeam_channel::{bounded, SendError};
422    ///
423    /// let (s, r) = bounded(1);
424    /// assert_eq!(s.send(1), Ok(()));
425    ///
426    /// thread::spawn(move || {
427    ///     assert_eq!(r.recv(), Ok(1));
428    ///     thread::sleep(Duration::from_secs(1));
429    ///     drop(r);
430    /// });
431    ///
432    /// assert_eq!(s.send(2), Ok(()));
433    /// assert_eq!(s.send(3), Err(SendError(3)));
434    /// ```
435    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
436        match &self.flavor {
437            SenderFlavor::Array(chan) => chan.send(msg, None),
438            SenderFlavor::List(chan) => chan.send(msg, None),
439            SenderFlavor::Zero(chan) => chan.send(msg, None),
440        }
441        .map_err(|err| match err {
442            SendTimeoutError::Disconnected(msg) => SendError(msg),
443            SendTimeoutError::Timeout(_) => unreachable!(),
444        })
445    }
446
447    /// Waits for a message to be sent into the channel, but only for a limited time.
448    ///
449    /// If the channel is full and not disconnected, this call will block until the send operation
450    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
451    /// wake up and return an error. The returned error contains the original message.
452    ///
453    /// If called on a zero-capacity channel, this method will wait for a receive operation to
454    /// appear on the other side of the channel.
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// use std::thread;
460    /// use std::time::Duration;
461    /// use crossbeam_channel::{bounded, SendTimeoutError};
462    ///
463    /// let (s, r) = bounded(0);
464    ///
465    /// thread::spawn(move || {
466    ///     thread::sleep(Duration::from_secs(1));
467    ///     assert_eq!(r.recv(), Ok(2));
468    ///     drop(r);
469    /// });
470    ///
471    /// assert_eq!(
472    ///     s.send_timeout(1, Duration::from_millis(500)),
473    ///     Err(SendTimeoutError::Timeout(1)),
474    /// );
475    /// assert_eq!(
476    ///     s.send_timeout(2, Duration::from_secs(1)),
477    ///     Ok(()),
478    /// );
479    /// assert_eq!(
480    ///     s.send_timeout(3, Duration::from_millis(500)),
481    ///     Err(SendTimeoutError::Disconnected(3)),
482    /// );
483    /// ```
484    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
485        match Instant::now().checked_add(timeout) {
486            Some(deadline) => self.send_deadline(msg, deadline),
487            None => self.send(msg).map_err(SendTimeoutError::from),
488        }
489    }
490
491    /// Waits for a message to be sent into the channel, but only until a given deadline.
492    ///
493    /// If the channel is full and not disconnected, this call will block until the send operation
494    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
495    /// wake up and return an error. The returned error contains the original message.
496    ///
497    /// If called on a zero-capacity channel, this method will wait for a receive operation to
498    /// appear on the other side of the channel.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use std::thread;
504    /// use std::time::{Duration, Instant};
505    /// use crossbeam_channel::{bounded, SendTimeoutError};
506    ///
507    /// let (s, r) = bounded(0);
508    ///
509    /// thread::spawn(move || {
510    ///     thread::sleep(Duration::from_secs(1));
511    ///     assert_eq!(r.recv(), Ok(2));
512    ///     drop(r);
513    /// });
514    ///
515    /// let now = Instant::now();
516    ///
517    /// assert_eq!(
518    ///     s.send_deadline(1, now + Duration::from_millis(500)),
519    ///     Err(SendTimeoutError::Timeout(1)),
520    /// );
521    /// assert_eq!(
522    ///     s.send_deadline(2, now + Duration::from_millis(1500)),
523    ///     Ok(()),
524    /// );
525    /// assert_eq!(
526    ///     s.send_deadline(3, now + Duration::from_millis(2000)),
527    ///     Err(SendTimeoutError::Disconnected(3)),
528    /// );
529    /// ```
530    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
531        match &self.flavor {
532            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
533            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
534            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
535        }
536    }
537
538    /// Returns `true` if the channel is empty.
539    ///
540    /// Note: Zero-capacity channels are always empty.
541    ///
542    /// # Examples
543    ///
544    /// ```
545    /// use crossbeam_channel::unbounded;
546    ///
547    /// let (s, r) = unbounded();
548    /// assert!(s.is_empty());
549    ///
550    /// s.send(0).unwrap();
551    /// assert!(!s.is_empty());
552    /// ```
553    pub fn is_empty(&self) -> bool {
554        match &self.flavor {
555            SenderFlavor::Array(chan) => chan.is_empty(),
556            SenderFlavor::List(chan) => chan.is_empty(),
557            SenderFlavor::Zero(chan) => chan.is_empty(),
558        }
559    }
560
561    /// Returns `true` if the channel is full.
562    ///
563    /// Note: Zero-capacity channels are always full.
564    ///
565    /// # Examples
566    ///
567    /// ```
568    /// use crossbeam_channel::bounded;
569    ///
570    /// let (s, r) = bounded(1);
571    ///
572    /// assert!(!s.is_full());
573    /// s.send(0).unwrap();
574    /// assert!(s.is_full());
575    /// ```
576    pub fn is_full(&self) -> bool {
577        match &self.flavor {
578            SenderFlavor::Array(chan) => chan.is_full(),
579            SenderFlavor::List(chan) => chan.is_full(),
580            SenderFlavor::Zero(chan) => chan.is_full(),
581        }
582    }
583
584    /// Returns the number of messages in the channel.
585    ///
586    /// # Examples
587    ///
588    /// ```
589    /// use crossbeam_channel::unbounded;
590    ///
591    /// let (s, r) = unbounded();
592    /// assert_eq!(s.len(), 0);
593    ///
594    /// s.send(1).unwrap();
595    /// s.send(2).unwrap();
596    /// assert_eq!(s.len(), 2);
597    /// ```
598    pub fn len(&self) -> usize {
599        match &self.flavor {
600            SenderFlavor::Array(chan) => chan.len(),
601            SenderFlavor::List(chan) => chan.len(),
602            SenderFlavor::Zero(chan) => chan.len(),
603        }
604    }
605
606    /// If the channel is bounded, returns its capacity.
607    ///
608    /// # Examples
609    ///
610    /// ```
611    /// use crossbeam_channel::{bounded, unbounded};
612    ///
613    /// let (s, _) = unbounded::<i32>();
614    /// assert_eq!(s.capacity(), None);
615    ///
616    /// let (s, _) = bounded::<i32>(5);
617    /// assert_eq!(s.capacity(), Some(5));
618    ///
619    /// let (s, _) = bounded::<i32>(0);
620    /// assert_eq!(s.capacity(), Some(0));
621    /// ```
622    pub fn capacity(&self) -> Option<usize> {
623        match &self.flavor {
624            SenderFlavor::Array(chan) => chan.capacity(),
625            SenderFlavor::List(chan) => chan.capacity(),
626            SenderFlavor::Zero(chan) => chan.capacity(),
627        }
628    }
629
630    /// Returns `true` if senders belong to the same channel.
631    ///
632    /// # Examples
633    ///
634    /// ```rust
635    /// use crossbeam_channel::unbounded;
636    ///
637    /// let (s, _) = unbounded::<usize>();
638    ///
639    /// let s2 = s.clone();
640    /// assert!(s.same_channel(&s2));
641    ///
642    /// let (s3, _) = unbounded();
643    /// assert!(!s.same_channel(&s3));
644    /// ```
645    pub fn same_channel(&self, other: &Sender<T>) -> bool {
646        match (&self.flavor, &other.flavor) {
647            (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
648            (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
649            (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
650            _ => false,
651        }
652    }
653}
654
655impl<T> Drop for Sender<T> {
656    fn drop(&mut self) {
657        unsafe {
658            match &self.flavor {
659                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
660                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
661                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
662            }
663        }
664    }
665}
666
667impl<T> Clone for Sender<T> {
668    fn clone(&self) -> Self {
669        let flavor = match &self.flavor {
670            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
671            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
672            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
673        };
674
675        Sender { flavor }
676    }
677}
678
679impl<T> fmt::Debug for Sender<T> {
680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
681        f.pad("Sender { .. }")
682    }
683}
684
685/// The receiving side of a channel.
686///
687/// # Examples
688///
689/// ```
690/// use std::thread;
691/// use std::time::Duration;
692/// use crossbeam_channel::unbounded;
693///
694/// let (s, r) = unbounded();
695///
696/// thread::spawn(move || {
697///     let _ = s.send(1);
698///     thread::sleep(Duration::from_secs(1));
699///     let _ = s.send(2);
700/// });
701///
702/// assert_eq!(r.recv(), Ok(1)); // Received immediately.
703/// assert_eq!(r.recv(), Ok(2)); // Received after 1 second.
704/// ```
705pub struct Receiver<T> {
706    flavor: ReceiverFlavor<T>,
707}
708
709/// Receiver flavors.
710enum ReceiverFlavor<T> {
711    /// Bounded channel based on a preallocated array.
712    Array(counter::Receiver<flavors::array::Channel<T>>),
713
714    /// Unbounded channel implemented as a linked list.
715    List(counter::Receiver<flavors::list::Channel<T>>),
716
717    /// Zero-capacity channel.
718    Zero(counter::Receiver<flavors::zero::Channel<T>>),
719
720    /// The after flavor.
721    At(Arc<flavors::at::Channel>),
722
723    /// The tick flavor.
724    Tick(Arc<flavors::tick::Channel>),
725
726    /// The never flavor.
727    Never(flavors::never::Channel<T>),
728}
729
730unsafe impl<T: Send> Send for Receiver<T> {}
731unsafe impl<T: Send> Sync for Receiver<T> {}
732
733impl<T> UnwindSafe for Receiver<T> {}
734impl<T> RefUnwindSafe for Receiver<T> {}
735
736impl<T> Receiver<T> {
737    /// Attempts to receive a message from the channel without blocking.
738    ///
739    /// This method will either receive a message from the channel immediately or return an error
740    /// if the channel is empty.
741    ///
742    /// If called on a zero-capacity channel, this method will receive a message only if there
743    /// happens to be a send operation on the other side of the channel at the same time.
744    ///
745    /// # Examples
746    ///
747    /// ```
748    /// use crossbeam_channel::{unbounded, TryRecvError};
749    ///
750    /// let (s, r) = unbounded();
751    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
752    ///
753    /// s.send(5).unwrap();
754    /// drop(s);
755    ///
756    /// assert_eq!(r.try_recv(), Ok(5));
757    /// assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
758    /// ```
759    pub fn try_recv(&self) -> Result<T, TryRecvError> {
760        match &self.flavor {
761            ReceiverFlavor::Array(chan) => chan.try_recv(),
762            ReceiverFlavor::List(chan) => chan.try_recv(),
763            ReceiverFlavor::Zero(chan) => chan.try_recv(),
764            ReceiverFlavor::At(chan) => {
765                let msg = chan.try_recv();
766                unsafe {
767                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
768                        &msg,
769                    )
770                }
771            }
772            ReceiverFlavor::Tick(chan) => {
773                let msg = chan.try_recv();
774                unsafe {
775                    mem::transmute_copy::<Result<Instant, TryRecvError>, Result<T, TryRecvError>>(
776                        &msg,
777                    )
778                }
779            }
780            ReceiverFlavor::Never(chan) => chan.try_recv(),
781        }
782    }
783
784    /// Blocks the current thread until a message is received or the channel is empty and
785    /// disconnected.
786    ///
787    /// If the channel is empty and not disconnected, this call will block until the receive
788    /// operation can proceed. If the channel is empty and becomes disconnected, this call will
789    /// wake up and return an error.
790    ///
791    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
792    /// on the other side of the channel.
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// use std::thread;
798    /// use std::time::Duration;
799    /// use crossbeam_channel::{unbounded, RecvError};
800    ///
801    /// let (s, r) = unbounded();
802    ///
803    /// thread::spawn(move || {
804    ///     thread::sleep(Duration::from_secs(1));
805    ///     s.send(5).unwrap();
806    ///     drop(s);
807    /// });
808    ///
809    /// assert_eq!(r.recv(), Ok(5));
810    /// assert_eq!(r.recv(), Err(RecvError));
811    /// ```
812    pub fn recv(&self) -> Result<T, RecvError> {
813        match &self.flavor {
814            ReceiverFlavor::Array(chan) => chan.recv(None),
815            ReceiverFlavor::List(chan) => chan.recv(None),
816            ReceiverFlavor::Zero(chan) => chan.recv(None),
817            ReceiverFlavor::At(chan) => {
818                let msg = chan.recv(None);
819                unsafe {
820                    mem::transmute_copy::<
821                        Result<Instant, RecvTimeoutError>,
822                        Result<T, RecvTimeoutError>,
823                    >(&msg)
824                }
825            }
826            ReceiverFlavor::Tick(chan) => {
827                let msg = chan.recv(None);
828                unsafe {
829                    mem::transmute_copy::<
830                        Result<Instant, RecvTimeoutError>,
831                        Result<T, RecvTimeoutError>,
832                    >(&msg)
833                }
834            }
835            ReceiverFlavor::Never(chan) => chan.recv(None),
836        }
837        .map_err(|_| RecvError)
838    }
839
840    /// Waits for a message to be received from the channel, but only for a limited time.
841    ///
842    /// If the channel is empty and not disconnected, this call will block until the receive
843    /// operation can proceed or the operation times out. If the channel is empty and becomes
844    /// disconnected, this call will wake up and return an error.
845    ///
846    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
847    /// on the other side of the channel.
848    ///
849    /// # Examples
850    ///
851    /// ```
852    /// use std::thread;
853    /// use std::time::Duration;
854    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
855    ///
856    /// let (s, r) = unbounded();
857    ///
858    /// thread::spawn(move || {
859    ///     thread::sleep(Duration::from_secs(1));
860    ///     s.send(5).unwrap();
861    ///     drop(s);
862    /// });
863    ///
864    /// assert_eq!(
865    ///     r.recv_timeout(Duration::from_millis(500)),
866    ///     Err(RecvTimeoutError::Timeout),
867    /// );
868    /// assert_eq!(
869    ///     r.recv_timeout(Duration::from_secs(1)),
870    ///     Ok(5),
871    /// );
872    /// assert_eq!(
873    ///     r.recv_timeout(Duration::from_secs(1)),
874    ///     Err(RecvTimeoutError::Disconnected),
875    /// );
876    /// ```
877    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
878        match Instant::now().checked_add(timeout) {
879            Some(deadline) => self.recv_deadline(deadline),
880            None => self.recv().map_err(RecvTimeoutError::from),
881        }
882    }
883
884    /// Waits for a message to be received from the channel, but only before a given deadline.
885    ///
886    /// If the channel is empty and not disconnected, this call will block until the receive
887    /// operation can proceed or the operation times out. If the channel is empty and becomes
888    /// disconnected, this call will wake up and return an error.
889    ///
890    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
891    /// on the other side of the channel.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// use std::thread;
897    /// use std::time::{Instant, Duration};
898    /// use crossbeam_channel::{unbounded, RecvTimeoutError};
899    ///
900    /// let (s, r) = unbounded();
901    ///
902    /// thread::spawn(move || {
903    ///     thread::sleep(Duration::from_secs(1));
904    ///     s.send(5).unwrap();
905    ///     drop(s);
906    /// });
907    ///
908    /// let now = Instant::now();
909    ///
910    /// assert_eq!(
911    ///     r.recv_deadline(now + Duration::from_millis(500)),
912    ///     Err(RecvTimeoutError::Timeout),
913    /// );
914    /// assert_eq!(
915    ///     r.recv_deadline(now + Duration::from_millis(1500)),
916    ///     Ok(5),
917    /// );
918    /// assert_eq!(
919    ///     r.recv_deadline(now + Duration::from_secs(5)),
920    ///     Err(RecvTimeoutError::Disconnected),
921    /// );
922    /// ```
923    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
924        match &self.flavor {
925            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
926            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
927            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
928            ReceiverFlavor::At(chan) => {
929                let msg = chan.recv(Some(deadline));
930                unsafe {
931                    mem::transmute_copy::<
932                        Result<Instant, RecvTimeoutError>,
933                        Result<T, RecvTimeoutError>,
934                    >(&msg)
935                }
936            }
937            ReceiverFlavor::Tick(chan) => {
938                let msg = chan.recv(Some(deadline));
939                unsafe {
940                    mem::transmute_copy::<
941                        Result<Instant, RecvTimeoutError>,
942                        Result<T, RecvTimeoutError>,
943                    >(&msg)
944                }
945            }
946            ReceiverFlavor::Never(chan) => chan.recv(Some(deadline)),
947        }
948    }
949
950    /// Returns `true` if the channel is empty.
951    ///
952    /// Note: Zero-capacity channels are always empty.
953    ///
954    /// # Examples
955    ///
956    /// ```
957    /// use crossbeam_channel::unbounded;
958    ///
959    /// let (s, r) = unbounded();
960    ///
961    /// assert!(r.is_empty());
962    /// s.send(0).unwrap();
963    /// assert!(!r.is_empty());
964    /// ```
965    pub fn is_empty(&self) -> bool {
966        match &self.flavor {
967            ReceiverFlavor::Array(chan) => chan.is_empty(),
968            ReceiverFlavor::List(chan) => chan.is_empty(),
969            ReceiverFlavor::Zero(chan) => chan.is_empty(),
970            ReceiverFlavor::At(chan) => chan.is_empty(),
971            ReceiverFlavor::Tick(chan) => chan.is_empty(),
972            ReceiverFlavor::Never(chan) => chan.is_empty(),
973        }
974    }
975
976    /// Returns `true` if the channel is full.
977    ///
978    /// Note: Zero-capacity channels are always full.
979    ///
980    /// # Examples
981    ///
982    /// ```
983    /// use crossbeam_channel::bounded;
984    ///
985    /// let (s, r) = bounded(1);
986    ///
987    /// assert!(!r.is_full());
988    /// s.send(0).unwrap();
989    /// assert!(r.is_full());
990    /// ```
991    pub fn is_full(&self) -> bool {
992        match &self.flavor {
993            ReceiverFlavor::Array(chan) => chan.is_full(),
994            ReceiverFlavor::List(chan) => chan.is_full(),
995            ReceiverFlavor::Zero(chan) => chan.is_full(),
996            ReceiverFlavor::At(chan) => chan.is_full(),
997            ReceiverFlavor::Tick(chan) => chan.is_full(),
998            ReceiverFlavor::Never(chan) => chan.is_full(),
999        }
1000    }
1001
1002    /// Returns the number of messages in the channel.
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```
1007    /// use crossbeam_channel::unbounded;
1008    ///
1009    /// let (s, r) = unbounded();
1010    /// assert_eq!(r.len(), 0);
1011    ///
1012    /// s.send(1).unwrap();
1013    /// s.send(2).unwrap();
1014    /// assert_eq!(r.len(), 2);
1015    /// ```
1016    pub fn len(&self) -> usize {
1017        match &self.flavor {
1018            ReceiverFlavor::Array(chan) => chan.len(),
1019            ReceiverFlavor::List(chan) => chan.len(),
1020            ReceiverFlavor::Zero(chan) => chan.len(),
1021            ReceiverFlavor::At(chan) => chan.len(),
1022            ReceiverFlavor::Tick(chan) => chan.len(),
1023            ReceiverFlavor::Never(chan) => chan.len(),
1024        }
1025    }
1026
1027    /// If the channel is bounded, returns its capacity.
1028    ///
1029    /// # Examples
1030    ///
1031    /// ```
1032    /// use crossbeam_channel::{bounded, unbounded};
1033    ///
1034    /// let (_, r) = unbounded::<i32>();
1035    /// assert_eq!(r.capacity(), None);
1036    ///
1037    /// let (_, r) = bounded::<i32>(5);
1038    /// assert_eq!(r.capacity(), Some(5));
1039    ///
1040    /// let (_, r) = bounded::<i32>(0);
1041    /// assert_eq!(r.capacity(), Some(0));
1042    /// ```
1043    pub fn capacity(&self) -> Option<usize> {
1044        match &self.flavor {
1045            ReceiverFlavor::Array(chan) => chan.capacity(),
1046            ReceiverFlavor::List(chan) => chan.capacity(),
1047            ReceiverFlavor::Zero(chan) => chan.capacity(),
1048            ReceiverFlavor::At(chan) => chan.capacity(),
1049            ReceiverFlavor::Tick(chan) => chan.capacity(),
1050            ReceiverFlavor::Never(chan) => chan.capacity(),
1051        }
1052    }
1053
1054    /// A blocking iterator over messages in the channel.
1055    ///
1056    /// Each call to [`next`] blocks waiting for the next message and then returns it. However, if
1057    /// the channel becomes empty and disconnected, it returns [`None`] without blocking.
1058    ///
1059    /// [`next`]: Iterator::next
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```
1064    /// use std::thread;
1065    /// use crossbeam_channel::unbounded;
1066    ///
1067    /// let (s, r) = unbounded();
1068    ///
1069    /// thread::spawn(move || {
1070    ///     s.send(1).unwrap();
1071    ///     s.send(2).unwrap();
1072    ///     s.send(3).unwrap();
1073    ///     drop(s); // Disconnect the channel.
1074    /// });
1075    ///
1076    /// // Collect all messages from the channel.
1077    /// // Note that the call to `collect` blocks until the sender is dropped.
1078    /// let v: Vec<_> = r.iter().collect();
1079    ///
1080    /// assert_eq!(v, [1, 2, 3]);
1081    /// ```
1082    pub fn iter(&self) -> Iter<'_, T> {
1083        Iter { receiver: self }
1084    }
1085
1086    /// A non-blocking iterator over messages in the channel.
1087    ///
1088    /// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1089    /// never blocks waiting for the next message.
1090    ///
1091    /// [`next`]: Iterator::next
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use std::thread;
1097    /// use std::time::Duration;
1098    /// use crossbeam_channel::unbounded;
1099    ///
1100    /// let (s, r) = unbounded::<i32>();
1101    ///
1102    /// thread::spawn(move || {
1103    ///     s.send(1).unwrap();
1104    ///     thread::sleep(Duration::from_secs(1));
1105    ///     s.send(2).unwrap();
1106    ///     thread::sleep(Duration::from_secs(2));
1107    ///     s.send(3).unwrap();
1108    /// });
1109    ///
1110    /// thread::sleep(Duration::from_secs(2));
1111    ///
1112    /// // Collect all messages from the channel without blocking.
1113    /// // The third message hasn't been sent yet so we'll collect only the first two.
1114    /// let v: Vec<_> = r.try_iter().collect();
1115    ///
1116    /// assert_eq!(v, [1, 2]);
1117    /// ```
1118    pub fn try_iter(&self) -> TryIter<'_, T> {
1119        TryIter { receiver: self }
1120    }
1121
1122    /// Returns `true` if receivers belong to the same channel.
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```rust
1127    /// use crossbeam_channel::unbounded;
1128    ///
1129    /// let (_, r) = unbounded::<usize>();
1130    ///
1131    /// let r2 = r.clone();
1132    /// assert!(r.same_channel(&r2));
1133    ///
1134    /// let (_, r3) = unbounded();
1135    /// assert!(!r.same_channel(&r3));
1136    /// ```
1137    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1138        match (&self.flavor, &other.flavor) {
1139            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1140            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1141            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1142            (ReceiverFlavor::At(a), ReceiverFlavor::At(b)) => Arc::ptr_eq(a, b),
1143            (ReceiverFlavor::Tick(a), ReceiverFlavor::Tick(b)) => Arc::ptr_eq(a, b),
1144            (ReceiverFlavor::Never(_), ReceiverFlavor::Never(_)) => true,
1145            _ => false,
1146        }
1147    }
1148}
1149
1150impl<T> Drop for Receiver<T> {
1151    fn drop(&mut self) {
1152        unsafe {
1153            match &self.flavor {
1154                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1155                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1156                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1157                ReceiverFlavor::At(_) => {}
1158                ReceiverFlavor::Tick(_) => {}
1159                ReceiverFlavor::Never(_) => {}
1160            }
1161        }
1162    }
1163}
1164
1165impl<T> Clone for Receiver<T> {
1166    fn clone(&self) -> Self {
1167        let flavor = match &self.flavor {
1168            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1169            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1170            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1171            ReceiverFlavor::At(chan) => ReceiverFlavor::At(chan.clone()),
1172            ReceiverFlavor::Tick(chan) => ReceiverFlavor::Tick(chan.clone()),
1173            ReceiverFlavor::Never(_) => ReceiverFlavor::Never(flavors::never::Channel::new()),
1174        };
1175
1176        Receiver { flavor }
1177    }
1178}
1179
1180impl<T> fmt::Debug for Receiver<T> {
1181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1182        f.pad("Receiver { .. }")
1183    }
1184}
1185
1186impl<'a, T> IntoIterator for &'a Receiver<T> {
1187    type Item = T;
1188    type IntoIter = Iter<'a, T>;
1189
1190    fn into_iter(self) -> Self::IntoIter {
1191        self.iter()
1192    }
1193}
1194
1195impl<T> IntoIterator for Receiver<T> {
1196    type Item = T;
1197    type IntoIter = IntoIter<T>;
1198
1199    fn into_iter(self) -> Self::IntoIter {
1200        IntoIter { receiver: self }
1201    }
1202}
1203
1204/// A blocking iterator over messages in a channel.
1205///
1206/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1207/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1208///
1209/// [`next`]: Iterator::next
1210///
1211/// # Examples
1212///
1213/// ```
1214/// use std::thread;
1215/// use crossbeam_channel::unbounded;
1216///
1217/// let (s, r) = unbounded();
1218///
1219/// thread::spawn(move || {
1220///     s.send(1).unwrap();
1221///     s.send(2).unwrap();
1222///     s.send(3).unwrap();
1223///     drop(s); // Disconnect the channel.
1224/// });
1225///
1226/// // Collect all messages from the channel.
1227/// // Note that the call to `collect` blocks until the sender is dropped.
1228/// let v: Vec<_> = r.iter().collect();
1229///
1230/// assert_eq!(v, [1, 2, 3]);
1231/// ```
1232pub struct Iter<'a, T> {
1233    receiver: &'a Receiver<T>,
1234}
1235
1236impl<T> FusedIterator for Iter<'_, T> {}
1237
1238impl<T> Iterator for Iter<'_, T> {
1239    type Item = T;
1240
1241    fn next(&mut self) -> Option<Self::Item> {
1242        self.receiver.recv().ok()
1243    }
1244}
1245
1246impl<T> fmt::Debug for Iter<'_, T> {
1247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248        f.pad("Iter { .. }")
1249    }
1250}
1251
1252/// A non-blocking iterator over messages in a channel.
1253///
1254/// Each call to [`next`] returns a message if there is one ready to be received. The iterator
1255/// never blocks waiting for the next message.
1256///
1257/// [`next`]: Iterator::next
1258///
1259/// # Examples
1260///
1261/// ```
1262/// use std::thread;
1263/// use std::time::Duration;
1264/// use crossbeam_channel::unbounded;
1265///
1266/// let (s, r) = unbounded::<i32>();
1267///
1268/// thread::spawn(move || {
1269///     s.send(1).unwrap();
1270///     thread::sleep(Duration::from_secs(1));
1271///     s.send(2).unwrap();
1272///     thread::sleep(Duration::from_secs(2));
1273///     s.send(3).unwrap();
1274/// });
1275///
1276/// thread::sleep(Duration::from_secs(2));
1277///
1278/// // Collect all messages from the channel without blocking.
1279/// // The third message hasn't been sent yet so we'll collect only the first two.
1280/// let v: Vec<_> = r.try_iter().collect();
1281///
1282/// assert_eq!(v, [1, 2]);
1283/// ```
1284pub struct TryIter<'a, T> {
1285    receiver: &'a Receiver<T>,
1286}
1287
1288impl<T> Iterator for TryIter<'_, T> {
1289    type Item = T;
1290
1291    fn next(&mut self) -> Option<Self::Item> {
1292        self.receiver.try_recv().ok()
1293    }
1294}
1295
1296impl<T> fmt::Debug for TryIter<'_, T> {
1297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1298        f.pad("TryIter { .. }")
1299    }
1300}
1301
1302/// A blocking iterator over messages in a channel.
1303///
1304/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the
1305/// channel becomes empty and disconnected, it returns [`None`] without blocking.
1306///
1307/// [`next`]: Iterator::next
1308///
1309/// # Examples
1310///
1311/// ```
1312/// use std::thread;
1313/// use crossbeam_channel::unbounded;
1314///
1315/// let (s, r) = unbounded();
1316///
1317/// thread::spawn(move || {
1318///     s.send(1).unwrap();
1319///     s.send(2).unwrap();
1320///     s.send(3).unwrap();
1321///     drop(s); // Disconnect the channel.
1322/// });
1323///
1324/// // Collect all messages from the channel.
1325/// // Note that the call to `collect` blocks until the sender is dropped.
1326/// let v: Vec<_> = r.into_iter().collect();
1327///
1328/// assert_eq!(v, [1, 2, 3]);
1329/// ```
1330pub struct IntoIter<T> {
1331    receiver: Receiver<T>,
1332}
1333
1334impl<T> FusedIterator for IntoIter<T> {}
1335
1336impl<T> Iterator for IntoIter<T> {
1337    type Item = T;
1338
1339    fn next(&mut self) -> Option<Self::Item> {
1340        self.receiver.recv().ok()
1341    }
1342}
1343
1344impl<T> fmt::Debug for IntoIter<T> {
1345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346        f.pad("IntoIter { .. }")
1347    }
1348}
1349
1350impl<T> SelectHandle for Sender<T> {
1351    fn try_select(&self, token: &mut Token) -> bool {
1352        match &self.flavor {
1353            SenderFlavor::Array(chan) => chan.sender().try_select(token),
1354            SenderFlavor::List(chan) => chan.sender().try_select(token),
1355            SenderFlavor::Zero(chan) => chan.sender().try_select(token),
1356        }
1357    }
1358
1359    fn deadline(&self) -> Option<Instant> {
1360        None
1361    }
1362
1363    fn register(&self, oper: Operation, cx: &Context) -> bool {
1364        match &self.flavor {
1365            SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1366            SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1367            SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1368        }
1369    }
1370
1371    fn unregister(&self, oper: Operation) {
1372        match &self.flavor {
1373            SenderFlavor::Array(chan) => chan.sender().unregister(oper),
1374            SenderFlavor::List(chan) => chan.sender().unregister(oper),
1375            SenderFlavor::Zero(chan) => chan.sender().unregister(oper),
1376        }
1377    }
1378
1379    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1380        match &self.flavor {
1381            SenderFlavor::Array(chan) => chan.sender().accept(token, cx),
1382            SenderFlavor::List(chan) => chan.sender().accept(token, cx),
1383            SenderFlavor::Zero(chan) => chan.sender().accept(token, cx),
1384        }
1385    }
1386
1387    fn is_ready(&self) -> bool {
1388        match &self.flavor {
1389            SenderFlavor::Array(chan) => chan.sender().is_ready(),
1390            SenderFlavor::List(chan) => chan.sender().is_ready(),
1391            SenderFlavor::Zero(chan) => chan.sender().is_ready(),
1392        }
1393    }
1394
1395    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1396        match &self.flavor {
1397            SenderFlavor::Array(chan) => chan.sender().watch(oper, cx),
1398            SenderFlavor::List(chan) => chan.sender().watch(oper, cx),
1399            SenderFlavor::Zero(chan) => chan.sender().watch(oper, cx),
1400        }
1401    }
1402
1403    fn unwatch(&self, oper: Operation) {
1404        match &self.flavor {
1405            SenderFlavor::Array(chan) => chan.sender().unwatch(oper),
1406            SenderFlavor::List(chan) => chan.sender().unwatch(oper),
1407            SenderFlavor::Zero(chan) => chan.sender().unwatch(oper),
1408        }
1409    }
1410}
1411
1412impl<T> SelectHandle for Receiver<T> {
1413    fn try_select(&self, token: &mut Token) -> bool {
1414        match &self.flavor {
1415            ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
1416            ReceiverFlavor::List(chan) => chan.receiver().try_select(token),
1417            ReceiverFlavor::Zero(chan) => chan.receiver().try_select(token),
1418            ReceiverFlavor::At(chan) => chan.try_select(token),
1419            ReceiverFlavor::Tick(chan) => chan.try_select(token),
1420            ReceiverFlavor::Never(chan) => chan.try_select(token),
1421        }
1422    }
1423
1424    fn deadline(&self) -> Option<Instant> {
1425        match &self.flavor {
1426            ReceiverFlavor::Array(_) => None,
1427            ReceiverFlavor::List(_) => None,
1428            ReceiverFlavor::Zero(_) => None,
1429            ReceiverFlavor::At(chan) => chan.deadline(),
1430            ReceiverFlavor::Tick(chan) => chan.deadline(),
1431            ReceiverFlavor::Never(chan) => chan.deadline(),
1432        }
1433    }
1434
1435    fn register(&self, oper: Operation, cx: &Context) -> bool {
1436        match &self.flavor {
1437            ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1438            ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1439            ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1440            ReceiverFlavor::At(chan) => chan.register(oper, cx),
1441            ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1442            ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1443        }
1444    }
1445
1446    fn unregister(&self, oper: Operation) {
1447        match &self.flavor {
1448            ReceiverFlavor::Array(chan) => chan.receiver().unregister(oper),
1449            ReceiverFlavor::List(chan) => chan.receiver().unregister(oper),
1450            ReceiverFlavor::Zero(chan) => chan.receiver().unregister(oper),
1451            ReceiverFlavor::At(chan) => chan.unregister(oper),
1452            ReceiverFlavor::Tick(chan) => chan.unregister(oper),
1453            ReceiverFlavor::Never(chan) => chan.unregister(oper),
1454        }
1455    }
1456
1457    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
1458        match &self.flavor {
1459            ReceiverFlavor::Array(chan) => chan.receiver().accept(token, cx),
1460            ReceiverFlavor::List(chan) => chan.receiver().accept(token, cx),
1461            ReceiverFlavor::Zero(chan) => chan.receiver().accept(token, cx),
1462            ReceiverFlavor::At(chan) => chan.accept(token, cx),
1463            ReceiverFlavor::Tick(chan) => chan.accept(token, cx),
1464            ReceiverFlavor::Never(chan) => chan.accept(token, cx),
1465        }
1466    }
1467
1468    fn is_ready(&self) -> bool {
1469        match &self.flavor {
1470            ReceiverFlavor::Array(chan) => chan.receiver().is_ready(),
1471            ReceiverFlavor::List(chan) => chan.receiver().is_ready(),
1472            ReceiverFlavor::Zero(chan) => chan.receiver().is_ready(),
1473            ReceiverFlavor::At(chan) => chan.is_ready(),
1474            ReceiverFlavor::Tick(chan) => chan.is_ready(),
1475            ReceiverFlavor::Never(chan) => chan.is_ready(),
1476        }
1477    }
1478
1479    fn watch(&self, oper: Operation, cx: &Context) -> bool {
1480        match &self.flavor {
1481            ReceiverFlavor::Array(chan) => chan.receiver().watch(oper, cx),
1482            ReceiverFlavor::List(chan) => chan.receiver().watch(oper, cx),
1483            ReceiverFlavor::Zero(chan) => chan.receiver().watch(oper, cx),
1484            ReceiverFlavor::At(chan) => chan.watch(oper, cx),
1485            ReceiverFlavor::Tick(chan) => chan.watch(oper, cx),
1486            ReceiverFlavor::Never(chan) => chan.watch(oper, cx),
1487        }
1488    }
1489
1490    fn unwatch(&self, oper: Operation) {
1491        match &self.flavor {
1492            ReceiverFlavor::Array(chan) => chan.receiver().unwatch(oper),
1493            ReceiverFlavor::List(chan) => chan.receiver().unwatch(oper),
1494            ReceiverFlavor::Zero(chan) => chan.receiver().unwatch(oper),
1495            ReceiverFlavor::At(chan) => chan.unwatch(oper),
1496            ReceiverFlavor::Tick(chan) => chan.unwatch(oper),
1497            ReceiverFlavor::Never(chan) => chan.unwatch(oper),
1498        }
1499    }
1500}
1501
1502/// Writes a message into the channel.
1503pub(crate) unsafe fn write<T>(s: &Sender<T>, token: &mut Token, msg: T) -> Result<(), T> {
1504    match &s.flavor {
1505        SenderFlavor::Array(chan) => chan.write(token, msg),
1506        SenderFlavor::List(chan) => chan.write(token, msg),
1507        SenderFlavor::Zero(chan) => chan.write(token, msg),
1508    }
1509}
1510
1511/// Reads a message from the channel.
1512pub(crate) unsafe fn read<T>(r: &Receiver<T>, token: &mut Token) -> Result<T, ()> {
1513    match &r.flavor {
1514        ReceiverFlavor::Array(chan) => chan.read(token),
1515        ReceiverFlavor::List(chan) => chan.read(token),
1516        ReceiverFlavor::Zero(chan) => chan.read(token),
1517        ReceiverFlavor::At(chan) => {
1518            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1519        }
1520        ReceiverFlavor::Tick(chan) => {
1521            mem::transmute_copy::<Result<Instant, ()>, Result<T, ()>>(&chan.read(token))
1522        }
1523        ReceiverFlavor::Never(chan) => chan.read(token),
1524    }
1525}