async_channel/lib.rs
1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![cfg_attr(not(feature = "std"), no_std)]
30#![forbid(unsafe_code)]
31#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32#![doc(
33 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34)]
35#![doc(
36 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37)]
38
39extern crate alloc;
40
41use core::fmt;
42use core::future::Future;
43use core::marker::PhantomPinned;
44use core::pin::Pin;
45use core::sync::atomic::{AtomicUsize, Ordering};
46use core::task::{Context, Poll};
47
48use alloc::sync::Arc;
49
50use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
51use event_listener_strategy::{
52 easy_wrapper,
53 event_listener::{Event, EventListener},
54 EventListenerFuture, Strategy,
55};
56use futures_core::ready;
57use futures_core::stream::Stream;
58use pin_project_lite::pin_project;
59
60struct Channel<T> {
61 /// Inner message queue.
62 queue: ConcurrentQueue<T>,
63
64 /// Send operations waiting while the channel is full.
65 send_ops: Event,
66
67 /// Receive operations waiting while the channel is empty and not closed.
68 recv_ops: Event,
69
70 /// Stream operations while the channel is empty and not closed.
71 stream_ops: Event,
72
73 /// The number of currently active `Sender`s.
74 sender_count: AtomicUsize,
75
76 /// The number of currently active `Receivers`s.
77 receiver_count: AtomicUsize,
78}
79
80impl<T> Channel<T> {
81 /// Closes the channel and notifies all blocked operations.
82 ///
83 /// Returns `true` if this call has closed the channel and it was not closed already.
84 fn close(&self) -> bool {
85 if self.queue.close() {
86 // Notify all send operations.
87 self.send_ops.notify(usize::MAX);
88
89 // Notify all receive and stream operations.
90 self.recv_ops.notify(usize::MAX);
91 self.stream_ops.notify(usize::MAX);
92
93 true
94 } else {
95 false
96 }
97 }
98}
99
100/// Creates a bounded channel.
101///
102/// The created channel has space to hold at most `cap` messages at a time.
103///
104/// # Panics
105///
106/// Capacity must be a positive number. If `cap` is zero, this function will panic.
107///
108/// # Examples
109///
110/// ```
111/// # futures_lite::future::block_on(async {
112/// use async_channel::{bounded, TryRecvError, TrySendError};
113///
114/// let (s, r) = bounded(1);
115///
116/// assert_eq!(s.send(10).await, Ok(()));
117/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
118///
119/// assert_eq!(r.recv().await, Ok(10));
120/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
121/// # });
122/// ```
123pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
124 assert!(cap > 0, "capacity cannot be zero");
125
126 let channel = Arc::new(Channel {
127 queue: ConcurrentQueue::bounded(cap),
128 send_ops: Event::new(),
129 recv_ops: Event::new(),
130 stream_ops: Event::new(),
131 sender_count: AtomicUsize::new(1),
132 receiver_count: AtomicUsize::new(1),
133 });
134
135 let s = Sender {
136 channel: channel.clone(),
137 };
138 let r = Receiver {
139 listener: None,
140 channel,
141 _pin: PhantomPinned,
142 };
143 (s, r)
144}
145
146/// Creates an unbounded channel.
147///
148/// The created channel can hold an unlimited number of messages.
149///
150/// # Examples
151///
152/// ```
153/// # futures_lite::future::block_on(async {
154/// use async_channel::{unbounded, TryRecvError};
155///
156/// let (s, r) = unbounded();
157///
158/// assert_eq!(s.send(10).await, Ok(()));
159/// assert_eq!(s.send(20).await, Ok(()));
160///
161/// assert_eq!(r.recv().await, Ok(10));
162/// assert_eq!(r.recv().await, Ok(20));
163/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
164/// # });
165/// ```
166pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
167 let channel = Arc::new(Channel {
168 queue: ConcurrentQueue::unbounded(),
169 send_ops: Event::new(),
170 recv_ops: Event::new(),
171 stream_ops: Event::new(),
172 sender_count: AtomicUsize::new(1),
173 receiver_count: AtomicUsize::new(1),
174 });
175
176 let s = Sender {
177 channel: channel.clone(),
178 };
179 let r = Receiver {
180 listener: None,
181 channel,
182 _pin: PhantomPinned,
183 };
184 (s, r)
185}
186
187/// The sending side of a channel.
188///
189/// Senders can be cloned and shared among threads. When all senders associated with a channel are
190/// dropped, the channel becomes closed.
191///
192/// The channel can also be closed manually by calling [`Sender::close()`].
193pub struct Sender<T> {
194 /// Inner channel state.
195 channel: Arc<Channel<T>>,
196}
197
198impl<T> Sender<T> {
199 /// Attempts to send a message into the channel.
200 ///
201 /// If the channel is full or closed, this method returns an error.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// use async_channel::{bounded, TrySendError};
207 ///
208 /// let (s, r) = bounded(1);
209 ///
210 /// assert_eq!(s.try_send(1), Ok(()));
211 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
212 ///
213 /// drop(r);
214 /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
215 /// ```
216 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
217 match self.channel.queue.push(msg) {
218 Ok(()) => {
219 // Notify a blocked receive operation. If the notified operation gets canceled,
220 // it will notify another blocked receive operation.
221 self.channel.recv_ops.notify_additional(1);
222
223 // Notify all blocked streams.
224 self.channel.stream_ops.notify(usize::MAX);
225
226 Ok(())
227 }
228 Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
229 Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
230 }
231 }
232
233 /// Sends a message into the channel.
234 ///
235 /// If the channel is full, this method waits until there is space for a message.
236 ///
237 /// If the channel is closed, this method returns an error.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// # futures_lite::future::block_on(async {
243 /// use async_channel::{unbounded, SendError};
244 ///
245 /// let (s, r) = unbounded();
246 ///
247 /// assert_eq!(s.send(1).await, Ok(()));
248 /// drop(r);
249 /// assert_eq!(s.send(2).await, Err(SendError(2)));
250 /// # });
251 /// ```
252 pub fn send(&self, msg: T) -> Send<'_, T> {
253 Send::_new(SendInner {
254 sender: self,
255 msg: Some(msg),
256 listener: None,
257 _pin: PhantomPinned,
258 })
259 }
260
261 /// Sends a message into this channel using the blocking strategy.
262 ///
263 /// If the channel is full, this method will block until there is room.
264 /// If the channel is closed, this method returns an error.
265 ///
266 /// # Blocking
267 ///
268 /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
269 /// this method will block the current thread until the message is sent.
270 ///
271 /// This method should not be used in an asynchronous context. It is intended
272 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
273 /// Calling this method in an asynchronous context may result in deadlocks.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// use async_channel::{unbounded, SendError};
279 ///
280 /// let (s, r) = unbounded();
281 ///
282 /// assert_eq!(s.send_blocking(1), Ok(()));
283 /// drop(r);
284 /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
285 /// ```
286 #[cfg(all(feature = "std", not(target_family = "wasm")))]
287 pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
288 self.send(msg).wait()
289 }
290
291 /// Forcefully push a message into this channel.
292 ///
293 /// If the channel is full, this method will replace an existing message in the
294 /// channel and return it as `Ok(Some(value))`. If the channel is closed, this
295 /// method will return an error.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// # futures_lite::future::block_on(async {
301 /// use async_channel::{bounded, SendError};
302 ///
303 /// let (s, r) = bounded(3);
304 ///
305 /// assert_eq!(s.send(1).await, Ok(()));
306 /// assert_eq!(s.send(2).await, Ok(()));
307 /// assert_eq!(s.force_send(3), Ok(None));
308 /// assert_eq!(s.force_send(4), Ok(Some(1)));
309 ///
310 /// assert_eq!(r.recv().await, Ok(2));
311 /// assert_eq!(r.recv().await, Ok(3));
312 /// assert_eq!(r.recv().await, Ok(4));
313 /// # });
314 /// ```
315 pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
316 match self.channel.queue.force_push(msg) {
317 Ok(backlog) => {
318 // Notify a blocked receive operation. If the notified operation gets canceled,
319 // it will notify another blocked receive operation.
320 self.channel.recv_ops.notify_additional(1);
321
322 // Notify all blocked streams.
323 self.channel.stream_ops.notify(usize::MAX);
324
325 Ok(backlog)
326 }
327
328 Err(ForcePushError(reject)) => Err(SendError(reject)),
329 }
330 }
331
332 /// Closes the channel.
333 ///
334 /// Returns `true` if this call has closed the channel and it was not closed already.
335 ///
336 /// The remaining messages can still be received.
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// # futures_lite::future::block_on(async {
342 /// use async_channel::{unbounded, RecvError};
343 ///
344 /// let (s, r) = unbounded();
345 /// assert_eq!(s.send(1).await, Ok(()));
346 /// assert!(s.close());
347 ///
348 /// assert_eq!(r.recv().await, Ok(1));
349 /// assert_eq!(r.recv().await, Err(RecvError));
350 /// # });
351 /// ```
352 pub fn close(&self) -> bool {
353 self.channel.close()
354 }
355
356 /// Returns `true` if the channel is closed.
357 ///
358 /// # Examples
359 ///
360 /// ```
361 /// # futures_lite::future::block_on(async {
362 /// use async_channel::{unbounded, RecvError};
363 ///
364 /// let (s, r) = unbounded::<()>();
365 /// assert!(!s.is_closed());
366 ///
367 /// drop(r);
368 /// assert!(s.is_closed());
369 /// # });
370 /// ```
371 pub fn is_closed(&self) -> bool {
372 self.channel.queue.is_closed()
373 }
374
375 /// Returns `true` if the channel is empty.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// # futures_lite::future::block_on(async {
381 /// use async_channel::unbounded;
382 ///
383 /// let (s, r) = unbounded();
384 ///
385 /// assert!(s.is_empty());
386 /// s.send(1).await;
387 /// assert!(!s.is_empty());
388 /// # });
389 /// ```
390 pub fn is_empty(&self) -> bool {
391 self.channel.queue.is_empty()
392 }
393
394 /// Returns `true` if the channel is full.
395 ///
396 /// Unbounded channels are never full.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// # futures_lite::future::block_on(async {
402 /// use async_channel::bounded;
403 ///
404 /// let (s, r) = bounded(1);
405 ///
406 /// assert!(!s.is_full());
407 /// s.send(1).await;
408 /// assert!(s.is_full());
409 /// # });
410 /// ```
411 pub fn is_full(&self) -> bool {
412 self.channel.queue.is_full()
413 }
414
415 /// Returns the number of messages in the channel.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// # futures_lite::future::block_on(async {
421 /// use async_channel::unbounded;
422 ///
423 /// let (s, r) = unbounded();
424 /// assert_eq!(s.len(), 0);
425 ///
426 /// s.send(1).await;
427 /// s.send(2).await;
428 /// assert_eq!(s.len(), 2);
429 /// # });
430 /// ```
431 pub fn len(&self) -> usize {
432 self.channel.queue.len()
433 }
434
435 /// Returns the channel capacity if it's bounded.
436 ///
437 /// # Examples
438 ///
439 /// ```
440 /// use async_channel::{bounded, unbounded};
441 ///
442 /// let (s, r) = bounded::<i32>(5);
443 /// assert_eq!(s.capacity(), Some(5));
444 ///
445 /// let (s, r) = unbounded::<i32>();
446 /// assert_eq!(s.capacity(), None);
447 /// ```
448 pub fn capacity(&self) -> Option<usize> {
449 self.channel.queue.capacity()
450 }
451
452 /// Returns the number of receivers for the channel.
453 ///
454 /// # Examples
455 ///
456 /// ```
457 /// # futures_lite::future::block_on(async {
458 /// use async_channel::unbounded;
459 ///
460 /// let (s, r) = unbounded::<()>();
461 /// assert_eq!(s.receiver_count(), 1);
462 ///
463 /// let r2 = r.clone();
464 /// assert_eq!(s.receiver_count(), 2);
465 /// # });
466 /// ```
467 pub fn receiver_count(&self) -> usize {
468 self.channel.receiver_count.load(Ordering::SeqCst)
469 }
470
471 /// Returns the number of senders for the channel.
472 ///
473 /// # Examples
474 ///
475 /// ```
476 /// # futures_lite::future::block_on(async {
477 /// use async_channel::unbounded;
478 ///
479 /// let (s, r) = unbounded::<()>();
480 /// assert_eq!(s.sender_count(), 1);
481 ///
482 /// let s2 = s.clone();
483 /// assert_eq!(s.sender_count(), 2);
484 /// # });
485 /// ```
486 pub fn sender_count(&self) -> usize {
487 self.channel.sender_count.load(Ordering::SeqCst)
488 }
489
490 /// Downgrade the sender to a weak reference.
491 pub fn downgrade(&self) -> WeakSender<T> {
492 WeakSender {
493 channel: self.channel.clone(),
494 }
495 }
496}
497
498impl<T> Drop for Sender<T> {
499 fn drop(&mut self) {
500 // Decrement the sender count and close the channel if it drops down to zero.
501 if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
502 self.channel.close();
503 }
504 }
505}
506
507impl<T> fmt::Debug for Sender<T> {
508 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
509 write!(f, "Sender {{ .. }}")
510 }
511}
512
513impl<T> Clone for Sender<T> {
514 fn clone(&self) -> Sender<T> {
515 let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
516
517 // Make sure the count never overflows, even if lots of sender clones are leaked.
518 if count > usize::MAX / 2 {
519 abort();
520 }
521
522 Sender {
523 channel: self.channel.clone(),
524 }
525 }
526}
527
528pin_project! {
529 /// The receiving side of a channel.
530 ///
531 /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
532 /// are dropped, the channel becomes closed.
533 ///
534 /// The channel can also be closed manually by calling [`Receiver::close()`].
535 ///
536 /// Receivers implement the [`Stream`] trait.
537 pub struct Receiver<T> {
538 // Inner channel state.
539 channel: Arc<Channel<T>>,
540
541 // Listens for a send or close event to unblock this stream.
542 listener: Option<EventListener>,
543
544 // Keeping this type `!Unpin` enables future optimizations.
545 #[pin]
546 _pin: PhantomPinned
547 }
548
549 impl<T> PinnedDrop for Receiver<T> {
550 fn drop(this: Pin<&mut Self>) {
551 let this = this.project();
552
553 // Decrement the receiver count and close the channel if it drops down to zero.
554 if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
555 this.channel.close();
556 }
557 }
558 }
559}
560
561impl<T> Receiver<T> {
562 /// Attempts to receive a message from the channel.
563 ///
564 /// If the channel is empty, or empty and closed, this method returns an error.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// # futures_lite::future::block_on(async {
570 /// use async_channel::{unbounded, TryRecvError};
571 ///
572 /// let (s, r) = unbounded();
573 /// assert_eq!(s.send(1).await, Ok(()));
574 ///
575 /// assert_eq!(r.try_recv(), Ok(1));
576 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
577 ///
578 /// drop(s);
579 /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
580 /// # });
581 /// ```
582 pub fn try_recv(&self) -> Result<T, TryRecvError> {
583 match self.channel.queue.pop() {
584 Ok(msg) => {
585 // Notify a blocked send operation. If the notified operation gets canceled, it
586 // will notify another blocked send operation.
587 self.channel.send_ops.notify_additional(1);
588
589 Ok(msg)
590 }
591 Err(PopError::Empty) => Err(TryRecvError::Empty),
592 Err(PopError::Closed) => Err(TryRecvError::Closed),
593 }
594 }
595
596 /// Receives a message from the channel.
597 ///
598 /// If the channel is empty, this method waits until there is a message.
599 ///
600 /// If the channel is closed, this method receives a message or returns an error if there are
601 /// no more messages.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// # futures_lite::future::block_on(async {
607 /// use async_channel::{unbounded, RecvError};
608 ///
609 /// let (s, r) = unbounded();
610 ///
611 /// assert_eq!(s.send(1).await, Ok(()));
612 /// drop(s);
613 ///
614 /// assert_eq!(r.recv().await, Ok(1));
615 /// assert_eq!(r.recv().await, Err(RecvError));
616 /// # });
617 /// ```
618 pub fn recv(&self) -> Recv<'_, T> {
619 Recv::_new(RecvInner {
620 receiver: self,
621 listener: None,
622 _pin: PhantomPinned,
623 })
624 }
625
626 /// Receives a message from the channel using the blocking strategy.
627 ///
628 /// If the channel is empty, this method waits until there is a message.
629 /// If the channel is closed, this method receives a message or returns an error if there are
630 /// no more messages.
631 ///
632 /// # Blocking
633 ///
634 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
635 /// this method will block the current thread until the message is sent.
636 ///
637 /// This method should not be used in an asynchronous context. It is intended
638 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
639 /// Calling this method in an asynchronous context may result in deadlocks.
640 ///
641 /// # Examples
642 ///
643 /// ```
644 /// use async_channel::{unbounded, RecvError};
645 ///
646 /// let (s, r) = unbounded();
647 ///
648 /// assert_eq!(s.send_blocking(1), Ok(()));
649 /// drop(s);
650 ///
651 /// assert_eq!(r.recv_blocking(), Ok(1));
652 /// assert_eq!(r.recv_blocking(), Err(RecvError));
653 /// ```
654 #[cfg(all(feature = "std", not(target_family = "wasm")))]
655 pub fn recv_blocking(&self) -> Result<T, RecvError> {
656 self.recv().wait()
657 }
658
659 /// Closes the channel.
660 ///
661 /// Returns `true` if this call has closed the channel and it was not closed already.
662 ///
663 /// The remaining messages can still be received.
664 ///
665 /// # Examples
666 ///
667 /// ```
668 /// # futures_lite::future::block_on(async {
669 /// use async_channel::{unbounded, RecvError};
670 ///
671 /// let (s, r) = unbounded();
672 /// assert_eq!(s.send(1).await, Ok(()));
673 ///
674 /// assert!(r.close());
675 /// assert_eq!(r.recv().await, Ok(1));
676 /// assert_eq!(r.recv().await, Err(RecvError));
677 /// # });
678 /// ```
679 pub fn close(&self) -> bool {
680 self.channel.close()
681 }
682
683 /// Returns `true` if the channel is closed.
684 ///
685 /// # Examples
686 ///
687 /// ```
688 /// # futures_lite::future::block_on(async {
689 /// use async_channel::{unbounded, RecvError};
690 ///
691 /// let (s, r) = unbounded::<()>();
692 /// assert!(!r.is_closed());
693 ///
694 /// drop(s);
695 /// assert!(r.is_closed());
696 /// # });
697 /// ```
698 pub fn is_closed(&self) -> bool {
699 self.channel.queue.is_closed()
700 }
701
702 /// Returns `true` if the channel is empty.
703 ///
704 /// # Examples
705 ///
706 /// ```
707 /// # futures_lite::future::block_on(async {
708 /// use async_channel::unbounded;
709 ///
710 /// let (s, r) = unbounded();
711 ///
712 /// assert!(s.is_empty());
713 /// s.send(1).await;
714 /// assert!(!s.is_empty());
715 /// # });
716 /// ```
717 pub fn is_empty(&self) -> bool {
718 self.channel.queue.is_empty()
719 }
720
721 /// Returns `true` if the channel is full.
722 ///
723 /// Unbounded channels are never full.
724 ///
725 /// # Examples
726 ///
727 /// ```
728 /// # futures_lite::future::block_on(async {
729 /// use async_channel::bounded;
730 ///
731 /// let (s, r) = bounded(1);
732 ///
733 /// assert!(!r.is_full());
734 /// s.send(1).await;
735 /// assert!(r.is_full());
736 /// # });
737 /// ```
738 pub fn is_full(&self) -> bool {
739 self.channel.queue.is_full()
740 }
741
742 /// Returns the number of messages in the channel.
743 ///
744 /// # Examples
745 ///
746 /// ```
747 /// # futures_lite::future::block_on(async {
748 /// use async_channel::unbounded;
749 ///
750 /// let (s, r) = unbounded();
751 /// assert_eq!(r.len(), 0);
752 ///
753 /// s.send(1).await;
754 /// s.send(2).await;
755 /// assert_eq!(r.len(), 2);
756 /// # });
757 /// ```
758 pub fn len(&self) -> usize {
759 self.channel.queue.len()
760 }
761
762 /// Returns the channel capacity if it's bounded.
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// use async_channel::{bounded, unbounded};
768 ///
769 /// let (s, r) = bounded::<i32>(5);
770 /// assert_eq!(r.capacity(), Some(5));
771 ///
772 /// let (s, r) = unbounded::<i32>();
773 /// assert_eq!(r.capacity(), None);
774 /// ```
775 pub fn capacity(&self) -> Option<usize> {
776 self.channel.queue.capacity()
777 }
778
779 /// Returns the number of receivers for the channel.
780 ///
781 /// # Examples
782 ///
783 /// ```
784 /// # futures_lite::future::block_on(async {
785 /// use async_channel::unbounded;
786 ///
787 /// let (s, r) = unbounded::<()>();
788 /// assert_eq!(r.receiver_count(), 1);
789 ///
790 /// let r2 = r.clone();
791 /// assert_eq!(r.receiver_count(), 2);
792 /// # });
793 /// ```
794 pub fn receiver_count(&self) -> usize {
795 self.channel.receiver_count.load(Ordering::SeqCst)
796 }
797
798 /// Returns the number of senders for the channel.
799 ///
800 /// # Examples
801 ///
802 /// ```
803 /// # futures_lite::future::block_on(async {
804 /// use async_channel::unbounded;
805 ///
806 /// let (s, r) = unbounded::<()>();
807 /// assert_eq!(r.sender_count(), 1);
808 ///
809 /// let s2 = s.clone();
810 /// assert_eq!(r.sender_count(), 2);
811 /// # });
812 /// ```
813 pub fn sender_count(&self) -> usize {
814 self.channel.sender_count.load(Ordering::SeqCst)
815 }
816
817 /// Downgrade the receiver to a weak reference.
818 pub fn downgrade(&self) -> WeakReceiver<T> {
819 WeakReceiver {
820 channel: self.channel.clone(),
821 }
822 }
823}
824
825impl<T> fmt::Debug for Receiver<T> {
826 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
827 write!(f, "Receiver {{ .. }}")
828 }
829}
830
831impl<T> Clone for Receiver<T> {
832 fn clone(&self) -> Receiver<T> {
833 let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
834
835 // Make sure the count never overflows, even if lots of receiver clones are leaked.
836 if count > usize::MAX / 2 {
837 abort();
838 }
839
840 Receiver {
841 channel: self.channel.clone(),
842 listener: None,
843 _pin: PhantomPinned,
844 }
845 }
846}
847
848impl<T> Stream for Receiver<T> {
849 type Item = T;
850
851 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
852 loop {
853 // If this stream is listening for events, first wait for a notification.
854 {
855 let this = self.as_mut().project();
856 if let Some(listener) = this.listener.as_mut() {
857 ready!(Pin::new(listener).poll(cx));
858 *this.listener = None;
859 }
860 }
861
862 loop {
863 // Attempt to receive a message.
864 match self.try_recv() {
865 Ok(msg) => {
866 // The stream is not blocked on an event - drop the listener.
867 let this = self.as_mut().project();
868 *this.listener = None;
869 return Poll::Ready(Some(msg));
870 }
871 Err(TryRecvError::Closed) => {
872 // The stream is not blocked on an event - drop the listener.
873 let this = self.as_mut().project();
874 *this.listener = None;
875 return Poll::Ready(None);
876 }
877 Err(TryRecvError::Empty) => {}
878 }
879
880 // Receiving failed - now start listening for notifications or wait for one.
881 let this = self.as_mut().project();
882 if this.listener.is_some() {
883 // Go back to the outer loop to wait for a notification.
884 break;
885 } else {
886 *this.listener = Some(this.channel.stream_ops.listen());
887 }
888 }
889 }
890 }
891}
892
893impl<T> futures_core::stream::FusedStream for Receiver<T> {
894 fn is_terminated(&self) -> bool {
895 self.channel.queue.is_closed() && self.channel.queue.is_empty()
896 }
897}
898
899/// A [`Sender`] that prevents the channel from not being closed.
900///
901/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
902/// to be upgraded into a [`Sender`] through the `upgrade` method.
903pub struct WeakSender<T> {
904 channel: Arc<Channel<T>>,
905}
906
907impl<T> WeakSender<T> {
908 /// Upgrade the [`WeakSender`] into a [`Sender`].
909 pub fn upgrade(&self) -> Option<Sender<T>> {
910 if self.channel.queue.is_closed() {
911 None
912 } else {
913 match self.channel.sender_count.fetch_update(
914 Ordering::Relaxed,
915 Ordering::Relaxed,
916 |count| if count == 0 { None } else { Some(count + 1) },
917 ) {
918 Err(_) => None,
919 Ok(new_value) if new_value > usize::MAX / 2 => {
920 // Make sure the count never overflows, even if lots of sender clones are leaked.
921 abort();
922 }
923 Ok(_) => Some(Sender {
924 channel: self.channel.clone(),
925 }),
926 }
927 }
928 }
929}
930
931impl<T> Clone for WeakSender<T> {
932 fn clone(&self) -> Self {
933 WeakSender {
934 channel: self.channel.clone(),
935 }
936 }
937}
938
939impl<T> fmt::Debug for WeakSender<T> {
940 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
941 write!(f, "WeakSender {{ .. }}")
942 }
943}
944
945/// A [`Receiver`] that prevents the channel from not being closed.
946///
947/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
948/// to be upgraded into a [`Receiver`] through the `upgrade` method.
949pub struct WeakReceiver<T> {
950 channel: Arc<Channel<T>>,
951}
952
953impl<T> WeakReceiver<T> {
954 /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
955 pub fn upgrade(&self) -> Option<Receiver<T>> {
956 if self.channel.queue.is_closed() {
957 None
958 } else {
959 match self.channel.receiver_count.fetch_update(
960 Ordering::Relaxed,
961 Ordering::Relaxed,
962 |count| if count == 0 { None } else { Some(count + 1) },
963 ) {
964 Err(_) => None,
965 Ok(new_value) if new_value > usize::MAX / 2 => {
966 // Make sure the count never overflows, even if lots of receiver clones are leaked.
967 abort();
968 }
969 Ok(_) => Some(Receiver {
970 channel: self.channel.clone(),
971 listener: None,
972 _pin: PhantomPinned,
973 }),
974 }
975 }
976 }
977}
978
979impl<T> Clone for WeakReceiver<T> {
980 fn clone(&self) -> Self {
981 WeakReceiver {
982 channel: self.channel.clone(),
983 }
984 }
985}
986
987impl<T> fmt::Debug for WeakReceiver<T> {
988 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
989 write!(f, "WeakReceiver {{ .. }}")
990 }
991}
992
993/// An error returned from [`Sender::send()`].
994///
995/// Received because the channel is closed.
996#[derive(PartialEq, Eq, Clone, Copy)]
997pub struct SendError<T>(pub T);
998
999impl<T> SendError<T> {
1000 /// Unwraps the message that couldn't be sent.
1001 pub fn into_inner(self) -> T {
1002 self.0
1003 }
1004}
1005
1006#[cfg(feature = "std")]
1007impl<T> std::error::Error for SendError<T> {}
1008
1009impl<T> fmt::Debug for SendError<T> {
1010 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1011 write!(f, "SendError(..)")
1012 }
1013}
1014
1015impl<T> fmt::Display for SendError<T> {
1016 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1017 write!(f, "sending into a closed channel")
1018 }
1019}
1020
1021/// An error returned from [`Sender::try_send()`].
1022#[derive(PartialEq, Eq, Clone, Copy)]
1023pub enum TrySendError<T> {
1024 /// The channel is full but not closed.
1025 Full(T),
1026
1027 /// The channel is closed.
1028 Closed(T),
1029}
1030
1031impl<T> TrySendError<T> {
1032 /// Unwraps the message that couldn't be sent.
1033 pub fn into_inner(self) -> T {
1034 match self {
1035 TrySendError::Full(t) => t,
1036 TrySendError::Closed(t) => t,
1037 }
1038 }
1039
1040 /// Returns `true` if the channel is full but not closed.
1041 pub fn is_full(&self) -> bool {
1042 match self {
1043 TrySendError::Full(_) => true,
1044 TrySendError::Closed(_) => false,
1045 }
1046 }
1047
1048 /// Returns `true` if the channel is closed.
1049 pub fn is_closed(&self) -> bool {
1050 match self {
1051 TrySendError::Full(_) => false,
1052 TrySendError::Closed(_) => true,
1053 }
1054 }
1055}
1056
1057#[cfg(feature = "std")]
1058impl<T> std::error::Error for TrySendError<T> {}
1059
1060impl<T> fmt::Debug for TrySendError<T> {
1061 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062 match *self {
1063 TrySendError::Full(..) => write!(f, "Full(..)"),
1064 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1065 }
1066 }
1067}
1068
1069impl<T> fmt::Display for TrySendError<T> {
1070 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1071 match *self {
1072 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1073 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1074 }
1075 }
1076}
1077
1078/// An error returned from [`Receiver::recv()`].
1079///
1080/// Received because the channel is empty and closed.
1081#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1082pub struct RecvError;
1083
1084#[cfg(feature = "std")]
1085impl std::error::Error for RecvError {}
1086
1087impl fmt::Display for RecvError {
1088 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089 write!(f, "receiving from an empty and closed channel")
1090 }
1091}
1092
1093/// An error returned from [`Receiver::try_recv()`].
1094#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1095pub enum TryRecvError {
1096 /// The channel is empty but not closed.
1097 Empty,
1098
1099 /// The channel is empty and closed.
1100 Closed,
1101}
1102
1103impl TryRecvError {
1104 /// Returns `true` if the channel is empty but not closed.
1105 pub fn is_empty(&self) -> bool {
1106 match self {
1107 TryRecvError::Empty => true,
1108 TryRecvError::Closed => false,
1109 }
1110 }
1111
1112 /// Returns `true` if the channel is empty and closed.
1113 pub fn is_closed(&self) -> bool {
1114 match self {
1115 TryRecvError::Empty => false,
1116 TryRecvError::Closed => true,
1117 }
1118 }
1119}
1120
1121#[cfg(feature = "std")]
1122impl std::error::Error for TryRecvError {}
1123
1124impl fmt::Display for TryRecvError {
1125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126 match *self {
1127 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1128 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1129 }
1130 }
1131}
1132
1133easy_wrapper! {
1134 /// A future returned by [`Sender::send()`].
1135 #[derive(Debug)]
1136 #[must_use = "futures do nothing unless you `.await` or poll them"]
1137 pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1138 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1139 pub(crate) wait();
1140}
1141
1142pin_project! {
1143 #[derive(Debug)]
1144 #[project(!Unpin)]
1145 struct SendInner<'a, T> {
1146 // Reference to the original sender.
1147 sender: &'a Sender<T>,
1148
1149 // The message to send.
1150 msg: Option<T>,
1151
1152 // Listener waiting on the channel.
1153 listener: Option<EventListener>,
1154
1155 // Keeping this type `!Unpin` enables future optimizations.
1156 #[pin]
1157 _pin: PhantomPinned
1158 }
1159}
1160
1161impl<'a, T> EventListenerFuture for SendInner<'a, T> {
1162 type Output = Result<(), SendError<T>>;
1163
1164 /// Run this future with the given `Strategy`.
1165 fn poll_with_strategy<'x, S: Strategy<'x>>(
1166 self: Pin<&mut Self>,
1167 strategy: &mut S,
1168 context: &mut S::Context,
1169 ) -> Poll<Result<(), SendError<T>>> {
1170 let this = self.project();
1171
1172 loop {
1173 let msg = this.msg.take().unwrap();
1174 // Attempt to send a message.
1175 match this.sender.try_send(msg) {
1176 Ok(()) => return Poll::Ready(Ok(())),
1177 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1178 Err(TrySendError::Full(m)) => *this.msg = Some(m),
1179 }
1180
1181 // Sending failed - now start listening for notifications or wait for one.
1182 if this.listener.is_some() {
1183 // Poll using the given strategy
1184 ready!(S::poll(strategy, &mut *this.listener, context));
1185 } else {
1186 *this.listener = Some(this.sender.channel.send_ops.listen());
1187 }
1188 }
1189 }
1190}
1191
1192easy_wrapper! {
1193 /// A future returned by [`Receiver::recv()`].
1194 #[derive(Debug)]
1195 #[must_use = "futures do nothing unless you `.await` or poll them"]
1196 pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1197 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1198 pub(crate) wait();
1199}
1200
1201pin_project! {
1202 #[derive(Debug)]
1203 #[project(!Unpin)]
1204 struct RecvInner<'a, T> {
1205 // Reference to the receiver.
1206 receiver: &'a Receiver<T>,
1207
1208 // Listener waiting on the channel.
1209 listener: Option<EventListener>,
1210
1211 // Keeping this type `!Unpin` enables future optimizations.
1212 #[pin]
1213 _pin: PhantomPinned
1214 }
1215}
1216
1217impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
1218 type Output = Result<T, RecvError>;
1219
1220 /// Run this future with the given `Strategy`.
1221 fn poll_with_strategy<'x, S: Strategy<'x>>(
1222 self: Pin<&mut Self>,
1223 strategy: &mut S,
1224 cx: &mut S::Context,
1225 ) -> Poll<Result<T, RecvError>> {
1226 let this = self.project();
1227
1228 loop {
1229 // Attempt to receive a message.
1230 match this.receiver.try_recv() {
1231 Ok(msg) => return Poll::Ready(Ok(msg)),
1232 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1233 Err(TryRecvError::Empty) => {}
1234 }
1235
1236 // Receiving failed - now start listening for notifications or wait for one.
1237 if this.listener.is_some() {
1238 // Poll using the given strategy
1239 ready!(S::poll(strategy, &mut *this.listener, cx));
1240 } else {
1241 *this.listener = Some(this.receiver.channel.recv_ops.listen());
1242 }
1243 }
1244 }
1245}
1246
1247#[cfg(feature = "std")]
1248use std::process::abort;
1249
1250#[cfg(not(feature = "std"))]
1251fn abort() -> ! {
1252 struct PanicOnDrop;
1253
1254 impl Drop for PanicOnDrop {
1255 fn drop(&mut self) {
1256 panic!("Panic while panicking to abort");
1257 }
1258 }
1259
1260 let _bomb = PanicOnDrop;
1261 panic!("Panic while panicking to abort")
1262}