async_channel/
lib.rs

1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![cfg_attr(not(feature = "std"), no_std)]
30#![forbid(unsafe_code)]
31#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32#![doc(
33    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34)]
35#![doc(
36    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37)]
38
39extern crate alloc;
40
41use core::fmt;
42use core::future::Future;
43use core::marker::PhantomPinned;
44use core::pin::Pin;
45use core::sync::atomic::{AtomicUsize, Ordering};
46use core::task::{Context, Poll};
47
48use alloc::sync::Arc;
49
50use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
51use event_listener_strategy::{
52    easy_wrapper,
53    event_listener::{Event, EventListener},
54    EventListenerFuture, Strategy,
55};
56use futures_core::ready;
57use futures_core::stream::Stream;
58use pin_project_lite::pin_project;
59
60struct Channel<T> {
61    /// Inner message queue.
62    queue: ConcurrentQueue<T>,
63
64    /// Send operations waiting while the channel is full.
65    send_ops: Event,
66
67    /// Receive operations waiting while the channel is empty and not closed.
68    recv_ops: Event,
69
70    /// Stream operations while the channel is empty and not closed.
71    stream_ops: Event,
72
73    /// The number of currently active `Sender`s.
74    sender_count: AtomicUsize,
75
76    /// The number of currently active `Receivers`s.
77    receiver_count: AtomicUsize,
78}
79
80impl<T> Channel<T> {
81    /// Closes the channel and notifies all blocked operations.
82    ///
83    /// Returns `true` if this call has closed the channel and it was not closed already.
84    fn close(&self) -> bool {
85        if self.queue.close() {
86            // Notify all send operations.
87            self.send_ops.notify(usize::MAX);
88
89            // Notify all receive and stream operations.
90            self.recv_ops.notify(usize::MAX);
91            self.stream_ops.notify(usize::MAX);
92
93            true
94        } else {
95            false
96        }
97    }
98}
99
100/// Creates a bounded channel.
101///
102/// The created channel has space to hold at most `cap` messages at a time.
103///
104/// # Panics
105///
106/// Capacity must be a positive number. If `cap` is zero, this function will panic.
107///
108/// # Examples
109///
110/// ```
111/// # futures_lite::future::block_on(async {
112/// use async_channel::{bounded, TryRecvError, TrySendError};
113///
114/// let (s, r) = bounded(1);
115///
116/// assert_eq!(s.send(10).await, Ok(()));
117/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
118///
119/// assert_eq!(r.recv().await, Ok(10));
120/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
121/// # });
122/// ```
123pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
124    assert!(cap > 0, "capacity cannot be zero");
125
126    let channel = Arc::new(Channel {
127        queue: ConcurrentQueue::bounded(cap),
128        send_ops: Event::new(),
129        recv_ops: Event::new(),
130        stream_ops: Event::new(),
131        sender_count: AtomicUsize::new(1),
132        receiver_count: AtomicUsize::new(1),
133    });
134
135    let s = Sender {
136        channel: channel.clone(),
137    };
138    let r = Receiver {
139        listener: None,
140        channel,
141        _pin: PhantomPinned,
142    };
143    (s, r)
144}
145
146/// Creates an unbounded channel.
147///
148/// The created channel can hold an unlimited number of messages.
149///
150/// # Examples
151///
152/// ```
153/// # futures_lite::future::block_on(async {
154/// use async_channel::{unbounded, TryRecvError};
155///
156/// let (s, r) = unbounded();
157///
158/// assert_eq!(s.send(10).await, Ok(()));
159/// assert_eq!(s.send(20).await, Ok(()));
160///
161/// assert_eq!(r.recv().await, Ok(10));
162/// assert_eq!(r.recv().await, Ok(20));
163/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
164/// # });
165/// ```
166pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
167    let channel = Arc::new(Channel {
168        queue: ConcurrentQueue::unbounded(),
169        send_ops: Event::new(),
170        recv_ops: Event::new(),
171        stream_ops: Event::new(),
172        sender_count: AtomicUsize::new(1),
173        receiver_count: AtomicUsize::new(1),
174    });
175
176    let s = Sender {
177        channel: channel.clone(),
178    };
179    let r = Receiver {
180        listener: None,
181        channel,
182        _pin: PhantomPinned,
183    };
184    (s, r)
185}
186
187/// The sending side of a channel.
188///
189/// Senders can be cloned and shared among threads. When all senders associated with a channel are
190/// dropped, the channel becomes closed.
191///
192/// The channel can also be closed manually by calling [`Sender::close()`].
193pub struct Sender<T> {
194    /// Inner channel state.
195    channel: Arc<Channel<T>>,
196}
197
198impl<T> Sender<T> {
199    /// Attempts to send a message into the channel.
200    ///
201    /// If the channel is full or closed, this method returns an error.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// use async_channel::{bounded, TrySendError};
207    ///
208    /// let (s, r) = bounded(1);
209    ///
210    /// assert_eq!(s.try_send(1), Ok(()));
211    /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
212    ///
213    /// drop(r);
214    /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
215    /// ```
216    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
217        match self.channel.queue.push(msg) {
218            Ok(()) => {
219                // Notify a blocked receive operation. If the notified operation gets canceled,
220                // it will notify another blocked receive operation.
221                self.channel.recv_ops.notify_additional(1);
222
223                // Notify all blocked streams.
224                self.channel.stream_ops.notify(usize::MAX);
225
226                Ok(())
227            }
228            Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
229            Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
230        }
231    }
232
233    /// Sends a message into the channel.
234    ///
235    /// If the channel is full, this method waits until there is space for a message.
236    ///
237    /// If the channel is closed, this method returns an error.
238    ///
239    /// # Examples
240    ///
241    /// ```
242    /// # futures_lite::future::block_on(async {
243    /// use async_channel::{unbounded, SendError};
244    ///
245    /// let (s, r) = unbounded();
246    ///
247    /// assert_eq!(s.send(1).await, Ok(()));
248    /// drop(r);
249    /// assert_eq!(s.send(2).await, Err(SendError(2)));
250    /// # });
251    /// ```
252    pub fn send(&self, msg: T) -> Send<'_, T> {
253        Send::_new(SendInner {
254            sender: self,
255            msg: Some(msg),
256            listener: None,
257            _pin: PhantomPinned,
258        })
259    }
260
261    /// Sends a message into this channel using the blocking strategy.
262    ///
263    /// If the channel is full, this method will block until there is room.
264    /// If the channel is closed, this method returns an error.
265    ///
266    /// # Blocking
267    ///
268    /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
269    /// this method will block the current thread until the message is sent.
270    ///
271    /// This method should not be used in an asynchronous context. It is intended
272    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
273    /// Calling this method in an asynchronous context may result in deadlocks.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use async_channel::{unbounded, SendError};
279    ///
280    /// let (s, r) = unbounded();
281    ///
282    /// assert_eq!(s.send_blocking(1), Ok(()));
283    /// drop(r);
284    /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
285    /// ```
286    #[cfg(all(feature = "std", not(target_family = "wasm")))]
287    pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
288        self.send(msg).wait()
289    }
290
291    /// Forcefully push a message into this channel.
292    ///
293    /// If the channel is full, this method will replace an existing message in the
294    /// channel and return it as `Ok(Some(value))`. If the channel is closed, this
295    /// method will return an error.
296    ///
297    /// # Examples
298    ///
299    /// ```
300    /// # futures_lite::future::block_on(async {
301    /// use async_channel::{bounded, SendError};
302    ///
303    /// let (s, r) = bounded(3);
304    ///
305    /// assert_eq!(s.send(1).await, Ok(()));
306    /// assert_eq!(s.send(2).await, Ok(()));
307    /// assert_eq!(s.force_send(3), Ok(None));
308    /// assert_eq!(s.force_send(4), Ok(Some(1)));
309    ///
310    /// assert_eq!(r.recv().await, Ok(2));
311    /// assert_eq!(r.recv().await, Ok(3));
312    /// assert_eq!(r.recv().await, Ok(4));
313    /// # });
314    /// ```
315    pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
316        match self.channel.queue.force_push(msg) {
317            Ok(backlog) => {
318                // Notify a blocked receive operation. If the notified operation gets canceled,
319                // it will notify another blocked receive operation.
320                self.channel.recv_ops.notify_additional(1);
321
322                // Notify all blocked streams.
323                self.channel.stream_ops.notify(usize::MAX);
324
325                Ok(backlog)
326            }
327
328            Err(ForcePushError(reject)) => Err(SendError(reject)),
329        }
330    }
331
332    /// Closes the channel.
333    ///
334    /// Returns `true` if this call has closed the channel and it was not closed already.
335    ///
336    /// The remaining messages can still be received.
337    ///
338    /// # Examples
339    ///
340    /// ```
341    /// # futures_lite::future::block_on(async {
342    /// use async_channel::{unbounded, RecvError};
343    ///
344    /// let (s, r) = unbounded();
345    /// assert_eq!(s.send(1).await, Ok(()));
346    /// assert!(s.close());
347    ///
348    /// assert_eq!(r.recv().await, Ok(1));
349    /// assert_eq!(r.recv().await, Err(RecvError));
350    /// # });
351    /// ```
352    pub fn close(&self) -> bool {
353        self.channel.close()
354    }
355
356    /// Returns `true` if the channel is closed.
357    ///
358    /// # Examples
359    ///
360    /// ```
361    /// # futures_lite::future::block_on(async {
362    /// use async_channel::{unbounded, RecvError};
363    ///
364    /// let (s, r) = unbounded::<()>();
365    /// assert!(!s.is_closed());
366    ///
367    /// drop(r);
368    /// assert!(s.is_closed());
369    /// # });
370    /// ```
371    pub fn is_closed(&self) -> bool {
372        self.channel.queue.is_closed()
373    }
374
375    /// Returns `true` if the channel is empty.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// # futures_lite::future::block_on(async {
381    /// use async_channel::unbounded;
382    ///
383    /// let (s, r) = unbounded();
384    ///
385    /// assert!(s.is_empty());
386    /// s.send(1).await;
387    /// assert!(!s.is_empty());
388    /// # });
389    /// ```
390    pub fn is_empty(&self) -> bool {
391        self.channel.queue.is_empty()
392    }
393
394    /// Returns `true` if the channel is full.
395    ///
396    /// Unbounded channels are never full.
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// # futures_lite::future::block_on(async {
402    /// use async_channel::bounded;
403    ///
404    /// let (s, r) = bounded(1);
405    ///
406    /// assert!(!s.is_full());
407    /// s.send(1).await;
408    /// assert!(s.is_full());
409    /// # });
410    /// ```
411    pub fn is_full(&self) -> bool {
412        self.channel.queue.is_full()
413    }
414
415    /// Returns the number of messages in the channel.
416    ///
417    /// # Examples
418    ///
419    /// ```
420    /// # futures_lite::future::block_on(async {
421    /// use async_channel::unbounded;
422    ///
423    /// let (s, r) = unbounded();
424    /// assert_eq!(s.len(), 0);
425    ///
426    /// s.send(1).await;
427    /// s.send(2).await;
428    /// assert_eq!(s.len(), 2);
429    /// # });
430    /// ```
431    pub fn len(&self) -> usize {
432        self.channel.queue.len()
433    }
434
435    /// Returns the channel capacity if it's bounded.
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// use async_channel::{bounded, unbounded};
441    ///
442    /// let (s, r) = bounded::<i32>(5);
443    /// assert_eq!(s.capacity(), Some(5));
444    ///
445    /// let (s, r) = unbounded::<i32>();
446    /// assert_eq!(s.capacity(), None);
447    /// ```
448    pub fn capacity(&self) -> Option<usize> {
449        self.channel.queue.capacity()
450    }
451
452    /// Returns the number of receivers for the channel.
453    ///
454    /// # Examples
455    ///
456    /// ```
457    /// # futures_lite::future::block_on(async {
458    /// use async_channel::unbounded;
459    ///
460    /// let (s, r) = unbounded::<()>();
461    /// assert_eq!(s.receiver_count(), 1);
462    ///
463    /// let r2 = r.clone();
464    /// assert_eq!(s.receiver_count(), 2);
465    /// # });
466    /// ```
467    pub fn receiver_count(&self) -> usize {
468        self.channel.receiver_count.load(Ordering::SeqCst)
469    }
470
471    /// Returns the number of senders for the channel.
472    ///
473    /// # Examples
474    ///
475    /// ```
476    /// # futures_lite::future::block_on(async {
477    /// use async_channel::unbounded;
478    ///
479    /// let (s, r) = unbounded::<()>();
480    /// assert_eq!(s.sender_count(), 1);
481    ///
482    /// let s2 = s.clone();
483    /// assert_eq!(s.sender_count(), 2);
484    /// # });
485    /// ```
486    pub fn sender_count(&self) -> usize {
487        self.channel.sender_count.load(Ordering::SeqCst)
488    }
489
490    /// Downgrade the sender to a weak reference.
491    pub fn downgrade(&self) -> WeakSender<T> {
492        WeakSender {
493            channel: self.channel.clone(),
494        }
495    }
496}
497
498impl<T> Drop for Sender<T> {
499    fn drop(&mut self) {
500        // Decrement the sender count and close the channel if it drops down to zero.
501        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
502            self.channel.close();
503        }
504    }
505}
506
507impl<T> fmt::Debug for Sender<T> {
508    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509        write!(f, "Sender {{ .. }}")
510    }
511}
512
513impl<T> Clone for Sender<T> {
514    fn clone(&self) -> Sender<T> {
515        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
516
517        // Make sure the count never overflows, even if lots of sender clones are leaked.
518        if count > usize::MAX / 2 {
519            abort();
520        }
521
522        Sender {
523            channel: self.channel.clone(),
524        }
525    }
526}
527
528pin_project! {
529    /// The receiving side of a channel.
530    ///
531    /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
532    /// are dropped, the channel becomes closed.
533    ///
534    /// The channel can also be closed manually by calling [`Receiver::close()`].
535    ///
536    /// Receivers implement the [`Stream`] trait.
537    pub struct Receiver<T> {
538        // Inner channel state.
539        channel: Arc<Channel<T>>,
540
541        // Listens for a send or close event to unblock this stream.
542        listener: Option<EventListener>,
543
544        // Keeping this type `!Unpin` enables future optimizations.
545        #[pin]
546        _pin: PhantomPinned
547    }
548
549    impl<T> PinnedDrop for Receiver<T> {
550        fn drop(this: Pin<&mut Self>) {
551            let this = this.project();
552
553            // Decrement the receiver count and close the channel if it drops down to zero.
554            if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
555                this.channel.close();
556            }
557        }
558    }
559}
560
561impl<T> Receiver<T> {
562    /// Attempts to receive a message from the channel.
563    ///
564    /// If the channel is empty, or empty and closed, this method returns an error.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// # futures_lite::future::block_on(async {
570    /// use async_channel::{unbounded, TryRecvError};
571    ///
572    /// let (s, r) = unbounded();
573    /// assert_eq!(s.send(1).await, Ok(()));
574    ///
575    /// assert_eq!(r.try_recv(), Ok(1));
576    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
577    ///
578    /// drop(s);
579    /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
580    /// # });
581    /// ```
582    pub fn try_recv(&self) -> Result<T, TryRecvError> {
583        match self.channel.queue.pop() {
584            Ok(msg) => {
585                // Notify a blocked send operation. If the notified operation gets canceled, it
586                // will notify another blocked send operation.
587                self.channel.send_ops.notify_additional(1);
588
589                Ok(msg)
590            }
591            Err(PopError::Empty) => Err(TryRecvError::Empty),
592            Err(PopError::Closed) => Err(TryRecvError::Closed),
593        }
594    }
595
596    /// Receives a message from the channel.
597    ///
598    /// If the channel is empty, this method waits until there is a message.
599    ///
600    /// If the channel is closed, this method receives a message or returns an error if there are
601    /// no more messages.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// # futures_lite::future::block_on(async {
607    /// use async_channel::{unbounded, RecvError};
608    ///
609    /// let (s, r) = unbounded();
610    ///
611    /// assert_eq!(s.send(1).await, Ok(()));
612    /// drop(s);
613    ///
614    /// assert_eq!(r.recv().await, Ok(1));
615    /// assert_eq!(r.recv().await, Err(RecvError));
616    /// # });
617    /// ```
618    pub fn recv(&self) -> Recv<'_, T> {
619        Recv::_new(RecvInner {
620            receiver: self,
621            listener: None,
622            _pin: PhantomPinned,
623        })
624    }
625
626    /// Receives a message from the channel using the blocking strategy.
627    ///
628    /// If the channel is empty, this method waits until there is a message.
629    /// If the channel is closed, this method receives a message or returns an error if there are
630    /// no more messages.
631    ///
632    /// # Blocking
633    ///
634    /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
635    /// this method will block the current thread until the message is sent.
636    ///
637    /// This method should not be used in an asynchronous context. It is intended
638    /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
639    /// Calling this method in an asynchronous context may result in deadlocks.
640    ///
641    /// # Examples
642    ///
643    /// ```
644    /// use async_channel::{unbounded, RecvError};
645    ///
646    /// let (s, r) = unbounded();
647    ///
648    /// assert_eq!(s.send_blocking(1), Ok(()));
649    /// drop(s);
650    ///
651    /// assert_eq!(r.recv_blocking(), Ok(1));
652    /// assert_eq!(r.recv_blocking(), Err(RecvError));
653    /// ```
654    #[cfg(all(feature = "std", not(target_family = "wasm")))]
655    pub fn recv_blocking(&self) -> Result<T, RecvError> {
656        self.recv().wait()
657    }
658
659    /// Closes the channel.
660    ///
661    /// Returns `true` if this call has closed the channel and it was not closed already.
662    ///
663    /// The remaining messages can still be received.
664    ///
665    /// # Examples
666    ///
667    /// ```
668    /// # futures_lite::future::block_on(async {
669    /// use async_channel::{unbounded, RecvError};
670    ///
671    /// let (s, r) = unbounded();
672    /// assert_eq!(s.send(1).await, Ok(()));
673    ///
674    /// assert!(r.close());
675    /// assert_eq!(r.recv().await, Ok(1));
676    /// assert_eq!(r.recv().await, Err(RecvError));
677    /// # });
678    /// ```
679    pub fn close(&self) -> bool {
680        self.channel.close()
681    }
682
683    /// Returns `true` if the channel is closed.
684    ///
685    /// # Examples
686    ///
687    /// ```
688    /// # futures_lite::future::block_on(async {
689    /// use async_channel::{unbounded, RecvError};
690    ///
691    /// let (s, r) = unbounded::<()>();
692    /// assert!(!r.is_closed());
693    ///
694    /// drop(s);
695    /// assert!(r.is_closed());
696    /// # });
697    /// ```
698    pub fn is_closed(&self) -> bool {
699        self.channel.queue.is_closed()
700    }
701
702    /// Returns `true` if the channel is empty.
703    ///
704    /// # Examples
705    ///
706    /// ```
707    /// # futures_lite::future::block_on(async {
708    /// use async_channel::unbounded;
709    ///
710    /// let (s, r) = unbounded();
711    ///
712    /// assert!(s.is_empty());
713    /// s.send(1).await;
714    /// assert!(!s.is_empty());
715    /// # });
716    /// ```
717    pub fn is_empty(&self) -> bool {
718        self.channel.queue.is_empty()
719    }
720
721    /// Returns `true` if the channel is full.
722    ///
723    /// Unbounded channels are never full.
724    ///
725    /// # Examples
726    ///
727    /// ```
728    /// # futures_lite::future::block_on(async {
729    /// use async_channel::bounded;
730    ///
731    /// let (s, r) = bounded(1);
732    ///
733    /// assert!(!r.is_full());
734    /// s.send(1).await;
735    /// assert!(r.is_full());
736    /// # });
737    /// ```
738    pub fn is_full(&self) -> bool {
739        self.channel.queue.is_full()
740    }
741
742    /// Returns the number of messages in the channel.
743    ///
744    /// # Examples
745    ///
746    /// ```
747    /// # futures_lite::future::block_on(async {
748    /// use async_channel::unbounded;
749    ///
750    /// let (s, r) = unbounded();
751    /// assert_eq!(r.len(), 0);
752    ///
753    /// s.send(1).await;
754    /// s.send(2).await;
755    /// assert_eq!(r.len(), 2);
756    /// # });
757    /// ```
758    pub fn len(&self) -> usize {
759        self.channel.queue.len()
760    }
761
762    /// Returns the channel capacity if it's bounded.
763    ///
764    /// # Examples
765    ///
766    /// ```
767    /// use async_channel::{bounded, unbounded};
768    ///
769    /// let (s, r) = bounded::<i32>(5);
770    /// assert_eq!(r.capacity(), Some(5));
771    ///
772    /// let (s, r) = unbounded::<i32>();
773    /// assert_eq!(r.capacity(), None);
774    /// ```
775    pub fn capacity(&self) -> Option<usize> {
776        self.channel.queue.capacity()
777    }
778
779    /// Returns the number of receivers for the channel.
780    ///
781    /// # Examples
782    ///
783    /// ```
784    /// # futures_lite::future::block_on(async {
785    /// use async_channel::unbounded;
786    ///
787    /// let (s, r) = unbounded::<()>();
788    /// assert_eq!(r.receiver_count(), 1);
789    ///
790    /// let r2 = r.clone();
791    /// assert_eq!(r.receiver_count(), 2);
792    /// # });
793    /// ```
794    pub fn receiver_count(&self) -> usize {
795        self.channel.receiver_count.load(Ordering::SeqCst)
796    }
797
798    /// Returns the number of senders for the channel.
799    ///
800    /// # Examples
801    ///
802    /// ```
803    /// # futures_lite::future::block_on(async {
804    /// use async_channel::unbounded;
805    ///
806    /// let (s, r) = unbounded::<()>();
807    /// assert_eq!(r.sender_count(), 1);
808    ///
809    /// let s2 = s.clone();
810    /// assert_eq!(r.sender_count(), 2);
811    /// # });
812    /// ```
813    pub fn sender_count(&self) -> usize {
814        self.channel.sender_count.load(Ordering::SeqCst)
815    }
816
817    /// Downgrade the receiver to a weak reference.
818    pub fn downgrade(&self) -> WeakReceiver<T> {
819        WeakReceiver {
820            channel: self.channel.clone(),
821        }
822    }
823}
824
825impl<T> fmt::Debug for Receiver<T> {
826    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827        write!(f, "Receiver {{ .. }}")
828    }
829}
830
831impl<T> Clone for Receiver<T> {
832    fn clone(&self) -> Receiver<T> {
833        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
834
835        // Make sure the count never overflows, even if lots of receiver clones are leaked.
836        if count > usize::MAX / 2 {
837            abort();
838        }
839
840        Receiver {
841            channel: self.channel.clone(),
842            listener: None,
843            _pin: PhantomPinned,
844        }
845    }
846}
847
848impl<T> Stream for Receiver<T> {
849    type Item = T;
850
851    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
852        loop {
853            // If this stream is listening for events, first wait for a notification.
854            {
855                let this = self.as_mut().project();
856                if let Some(listener) = this.listener.as_mut() {
857                    ready!(Pin::new(listener).poll(cx));
858                    *this.listener = None;
859                }
860            }
861
862            loop {
863                // Attempt to receive a message.
864                match self.try_recv() {
865                    Ok(msg) => {
866                        // The stream is not blocked on an event - drop the listener.
867                        let this = self.as_mut().project();
868                        *this.listener = None;
869                        return Poll::Ready(Some(msg));
870                    }
871                    Err(TryRecvError::Closed) => {
872                        // The stream is not blocked on an event - drop the listener.
873                        let this = self.as_mut().project();
874                        *this.listener = None;
875                        return Poll::Ready(None);
876                    }
877                    Err(TryRecvError::Empty) => {}
878                }
879
880                // Receiving failed - now start listening for notifications or wait for one.
881                let this = self.as_mut().project();
882                if this.listener.is_some() {
883                    // Go back to the outer loop to wait for a notification.
884                    break;
885                } else {
886                    *this.listener = Some(this.channel.stream_ops.listen());
887                }
888            }
889        }
890    }
891}
892
893impl<T> futures_core::stream::FusedStream for Receiver<T> {
894    fn is_terminated(&self) -> bool {
895        self.channel.queue.is_closed() && self.channel.queue.is_empty()
896    }
897}
898
899/// A [`Sender`] that prevents the channel from not being closed.
900///
901/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
902/// to be upgraded into a [`Sender`] through the `upgrade` method.
903pub struct WeakSender<T> {
904    channel: Arc<Channel<T>>,
905}
906
907impl<T> WeakSender<T> {
908    /// Upgrade the [`WeakSender`] into a [`Sender`].
909    pub fn upgrade(&self) -> Option<Sender<T>> {
910        if self.channel.queue.is_closed() {
911            None
912        } else {
913            match self.channel.sender_count.fetch_update(
914                Ordering::Relaxed,
915                Ordering::Relaxed,
916                |count| if count == 0 { None } else { Some(count + 1) },
917            ) {
918                Err(_) => None,
919                Ok(new_value) if new_value > usize::MAX / 2 => {
920                    // Make sure the count never overflows, even if lots of sender clones are leaked.
921                    abort();
922                }
923                Ok(_) => Some(Sender {
924                    channel: self.channel.clone(),
925                }),
926            }
927        }
928    }
929}
930
931impl<T> Clone for WeakSender<T> {
932    fn clone(&self) -> Self {
933        WeakSender {
934            channel: self.channel.clone(),
935        }
936    }
937}
938
939impl<T> fmt::Debug for WeakSender<T> {
940    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941        write!(f, "WeakSender {{ .. }}")
942    }
943}
944
945/// A [`Receiver`] that prevents the channel from not being closed.
946///
947/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
948/// to be upgraded into a [`Receiver`] through the `upgrade` method.
949pub struct WeakReceiver<T> {
950    channel: Arc<Channel<T>>,
951}
952
953impl<T> WeakReceiver<T> {
954    /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
955    pub fn upgrade(&self) -> Option<Receiver<T>> {
956        if self.channel.queue.is_closed() {
957            None
958        } else {
959            match self.channel.receiver_count.fetch_update(
960                Ordering::Relaxed,
961                Ordering::Relaxed,
962                |count| if count == 0 { None } else { Some(count + 1) },
963            ) {
964                Err(_) => None,
965                Ok(new_value) if new_value > usize::MAX / 2 => {
966                    // Make sure the count never overflows, even if lots of receiver clones are leaked.
967                    abort();
968                }
969                Ok(_) => Some(Receiver {
970                    channel: self.channel.clone(),
971                    listener: None,
972                    _pin: PhantomPinned,
973                }),
974            }
975        }
976    }
977}
978
979impl<T> Clone for WeakReceiver<T> {
980    fn clone(&self) -> Self {
981        WeakReceiver {
982            channel: self.channel.clone(),
983        }
984    }
985}
986
987impl<T> fmt::Debug for WeakReceiver<T> {
988    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
989        write!(f, "WeakReceiver {{ .. }}")
990    }
991}
992
993/// An error returned from [`Sender::send()`].
994///
995/// Received because the channel is closed.
996#[derive(PartialEq, Eq, Clone, Copy)]
997pub struct SendError<T>(pub T);
998
999impl<T> SendError<T> {
1000    /// Unwraps the message that couldn't be sent.
1001    pub fn into_inner(self) -> T {
1002        self.0
1003    }
1004}
1005
1006#[cfg(feature = "std")]
1007impl<T> std::error::Error for SendError<T> {}
1008
1009impl<T> fmt::Debug for SendError<T> {
1010    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1011        write!(f, "SendError(..)")
1012    }
1013}
1014
1015impl<T> fmt::Display for SendError<T> {
1016    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1017        write!(f, "sending into a closed channel")
1018    }
1019}
1020
1021/// An error returned from [`Sender::try_send()`].
1022#[derive(PartialEq, Eq, Clone, Copy)]
1023pub enum TrySendError<T> {
1024    /// The channel is full but not closed.
1025    Full(T),
1026
1027    /// The channel is closed.
1028    Closed(T),
1029}
1030
1031impl<T> TrySendError<T> {
1032    /// Unwraps the message that couldn't be sent.
1033    pub fn into_inner(self) -> T {
1034        match self {
1035            TrySendError::Full(t) => t,
1036            TrySendError::Closed(t) => t,
1037        }
1038    }
1039
1040    /// Returns `true` if the channel is full but not closed.
1041    pub fn is_full(&self) -> bool {
1042        match self {
1043            TrySendError::Full(_) => true,
1044            TrySendError::Closed(_) => false,
1045        }
1046    }
1047
1048    /// Returns `true` if the channel is closed.
1049    pub fn is_closed(&self) -> bool {
1050        match self {
1051            TrySendError::Full(_) => false,
1052            TrySendError::Closed(_) => true,
1053        }
1054    }
1055}
1056
1057#[cfg(feature = "std")]
1058impl<T> std::error::Error for TrySendError<T> {}
1059
1060impl<T> fmt::Debug for TrySendError<T> {
1061    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062        match *self {
1063            TrySendError::Full(..) => write!(f, "Full(..)"),
1064            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1065        }
1066    }
1067}
1068
1069impl<T> fmt::Display for TrySendError<T> {
1070    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1071        match *self {
1072            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1073            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1074        }
1075    }
1076}
1077
1078/// An error returned from [`Receiver::recv()`].
1079///
1080/// Received because the channel is empty and closed.
1081#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1082pub struct RecvError;
1083
1084#[cfg(feature = "std")]
1085impl std::error::Error for RecvError {}
1086
1087impl fmt::Display for RecvError {
1088    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089        write!(f, "receiving from an empty and closed channel")
1090    }
1091}
1092
1093/// An error returned from [`Receiver::try_recv()`].
1094#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1095pub enum TryRecvError {
1096    /// The channel is empty but not closed.
1097    Empty,
1098
1099    /// The channel is empty and closed.
1100    Closed,
1101}
1102
1103impl TryRecvError {
1104    /// Returns `true` if the channel is empty but not closed.
1105    pub fn is_empty(&self) -> bool {
1106        match self {
1107            TryRecvError::Empty => true,
1108            TryRecvError::Closed => false,
1109        }
1110    }
1111
1112    /// Returns `true` if the channel is empty and closed.
1113    pub fn is_closed(&self) -> bool {
1114        match self {
1115            TryRecvError::Empty => false,
1116            TryRecvError::Closed => true,
1117        }
1118    }
1119}
1120
1121#[cfg(feature = "std")]
1122impl std::error::Error for TryRecvError {}
1123
1124impl fmt::Display for TryRecvError {
1125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126        match *self {
1127            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1128            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1129        }
1130    }
1131}
1132
1133easy_wrapper! {
1134    /// A future returned by [`Sender::send()`].
1135    #[derive(Debug)]
1136    #[must_use = "futures do nothing unless you `.await` or poll them"]
1137    pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1138    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1139    pub(crate) wait();
1140}
1141
1142pin_project! {
1143    #[derive(Debug)]
1144    #[project(!Unpin)]
1145    struct SendInner<'a, T> {
1146        // Reference to the original sender.
1147        sender: &'a Sender<T>,
1148
1149        // The message to send.
1150        msg: Option<T>,
1151
1152        // Listener waiting on the channel.
1153        listener: Option<EventListener>,
1154
1155        // Keeping this type `!Unpin` enables future optimizations.
1156        #[pin]
1157        _pin: PhantomPinned
1158    }
1159}
1160
1161impl<'a, T> EventListenerFuture for SendInner<'a, T> {
1162    type Output = Result<(), SendError<T>>;
1163
1164    /// Run this future with the given `Strategy`.
1165    fn poll_with_strategy<'x, S: Strategy<'x>>(
1166        self: Pin<&mut Self>,
1167        strategy: &mut S,
1168        context: &mut S::Context,
1169    ) -> Poll<Result<(), SendError<T>>> {
1170        let this = self.project();
1171
1172        loop {
1173            let msg = this.msg.take().unwrap();
1174            // Attempt to send a message.
1175            match this.sender.try_send(msg) {
1176                Ok(()) => return Poll::Ready(Ok(())),
1177                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1178                Err(TrySendError::Full(m)) => *this.msg = Some(m),
1179            }
1180
1181            // Sending failed - now start listening for notifications or wait for one.
1182            if this.listener.is_some() {
1183                // Poll using the given strategy
1184                ready!(S::poll(strategy, &mut *this.listener, context));
1185            } else {
1186                *this.listener = Some(this.sender.channel.send_ops.listen());
1187            }
1188        }
1189    }
1190}
1191
1192easy_wrapper! {
1193    /// A future returned by [`Receiver::recv()`].
1194    #[derive(Debug)]
1195    #[must_use = "futures do nothing unless you `.await` or poll them"]
1196    pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1197    #[cfg(all(feature = "std", not(target_family = "wasm")))]
1198    pub(crate) wait();
1199}
1200
1201pin_project! {
1202    #[derive(Debug)]
1203    #[project(!Unpin)]
1204    struct RecvInner<'a, T> {
1205        // Reference to the receiver.
1206        receiver: &'a Receiver<T>,
1207
1208        // Listener waiting on the channel.
1209        listener: Option<EventListener>,
1210
1211        // Keeping this type `!Unpin` enables future optimizations.
1212        #[pin]
1213        _pin: PhantomPinned
1214    }
1215}
1216
1217impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
1218    type Output = Result<T, RecvError>;
1219
1220    /// Run this future with the given `Strategy`.
1221    fn poll_with_strategy<'x, S: Strategy<'x>>(
1222        self: Pin<&mut Self>,
1223        strategy: &mut S,
1224        cx: &mut S::Context,
1225    ) -> Poll<Result<T, RecvError>> {
1226        let this = self.project();
1227
1228        loop {
1229            // Attempt to receive a message.
1230            match this.receiver.try_recv() {
1231                Ok(msg) => return Poll::Ready(Ok(msg)),
1232                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1233                Err(TryRecvError::Empty) => {}
1234            }
1235
1236            // Receiving failed - now start listening for notifications or wait for one.
1237            if this.listener.is_some() {
1238                // Poll using the given strategy
1239                ready!(S::poll(strategy, &mut *this.listener, cx));
1240            } else {
1241                *this.listener = Some(this.receiver.channel.recv_ops.listen());
1242            }
1243        }
1244    }
1245}
1246
1247#[cfg(feature = "std")]
1248use std::process::abort;
1249
1250#[cfg(not(feature = "std"))]
1251fn abort() -> ! {
1252    struct PanicOnDrop;
1253
1254    impl Drop for PanicOnDrop {
1255        fn drop(&mut self) {
1256            panic!("Panic while panicking to abort");
1257        }
1258    }
1259
1260    let _bomb = PanicOnDrop;
1261    panic!("Panic while panicking to abort")
1262}