async_broadcast/
lib.rs

1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//!     let (s1, mut r1) = broadcast(2);
23//!     let s2 = s1.clone();
24//!     let mut r2 = r1.clone();
25//!
26//!     // Send 2 messages from two different senders.
27//!     s1.broadcast(7).await.unwrap();
28//!     s2.broadcast(8).await.unwrap();
29//!
30//!     // Channel is now at capacity so sending more messages will result in an error.
31//!     assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//!     assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//!     // We can use `recv` method of the `Stream` implementation to receive messages.
35//!     assert_eq!(r1.next().await.unwrap(), 7);
36//!     assert_eq!(r1.recv().await.unwrap(), 8);
37//!     assert_eq!(r2.next().await.unwrap(), 7);
38//!     assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//!     // All receiver got all messages so channel is now empty.
41//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//!     // Drop both senders, which closes the channel.
45//!     drop(s1);
46//!     drop(s2);
47//!
48//!     assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//!     assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//!   receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//!   are sent are sent to all channel clones. While this can work for many cases, the lack of
67//!   sender and receiver split, means that often times, you'll find yourself having to drain the
68//!   channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//!   it:
72//!   - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//!   - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//!     inactive receiver blocking the whole channel is not a solvable problem.
75//!   - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//!     channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//!    here are:
80//!   - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//!     opt-in.
82//!   - There is no equivalent of inactive receivers.
83//!   - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//!     you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
95#![deny(missing_debug_implementations, nonstandard_style)]
96#![warn(missing_docs, rustdoc::missing_doc_code_examples, unreachable_pub)]
97
98#[cfg(doctest)]
99mod doctests {
100    doc_comment::doctest!("../README.md");
101}
102
103use std::collections::VecDeque;
104use std::convert::TryInto;
105use std::error;
106use std::fmt;
107use std::future::Future;
108use std::pin::Pin;
109use std::sync::{Arc, RwLock};
110use std::task::{Context, Poll};
111
112use event_listener::{Event, EventListener};
113use futures_core::{ready, stream::Stream};
114
115/// Create a new broadcast channel.
116///
117/// The created channel has space to hold at most `cap` messages at a time.
118///
119/// # Panics
120///
121/// Capacity must be a positive number. If `cap` is zero, this function will panic.
122///
123/// # Examples
124///
125/// ```
126/// # futures_lite::future::block_on(async {
127/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
128///
129/// let (s, mut r1) = broadcast(1);
130/// let mut r2 = r1.clone();
131///
132/// assert_eq!(s.broadcast(10).await, Ok(None));
133/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
134///
135/// assert_eq!(r1.recv().await, Ok(10));
136/// assert_eq!(r2.recv().await, Ok(10));
137/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
138/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
139/// # });
140/// ```
141pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
142    assert!(cap > 0, "capacity cannot be zero");
143
144    let inner = Arc::new(RwLock::new(Inner {
145        queue: VecDeque::with_capacity(cap),
146        capacity: cap,
147        overflow: false,
148        await_active: true,
149        receiver_count: 1,
150        inactive_receiver_count: 0,
151        sender_count: 1,
152        head_pos: 0,
153        is_closed: false,
154        send_ops: Event::new(),
155        recv_ops: Event::new(),
156    }));
157
158    let s = Sender {
159        inner: inner.clone(),
160    };
161    let r = Receiver {
162        inner,
163        pos: 0,
164        listener: None,
165    };
166
167    (s, r)
168}
169
170#[derive(Debug)]
171struct Inner<T> {
172    queue: VecDeque<(T, usize)>,
173    // We assign the same capacity to the queue but that's just specifying the minimum capacity and
174    // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
175    capacity: usize,
176    receiver_count: usize,
177    inactive_receiver_count: usize,
178    sender_count: usize,
179    /// Send sequence number of the front of the queue
180    head_pos: u64,
181    overflow: bool,
182    await_active: bool,
183
184    is_closed: bool,
185
186    /// Send operations waiting while the channel is full.
187    send_ops: Event,
188
189    /// Receive operations waiting while the channel is empty and not closed.
190    recv_ops: Event,
191}
192
193impl<T> Inner<T> {
194    /// Try receiving at the given position, returning either the element or a reference to it.
195    ///
196    /// Result is used here instead of Cow because we don't have a Clone bound on T.
197    fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
198        let i = match pos.checked_sub(self.head_pos) {
199            Some(i) => i
200                .try_into()
201                .expect("Head position more than usize::MAX behind a receiver"),
202            None => {
203                let count = self.head_pos - *pos;
204                *pos = self.head_pos;
205                return Err(TryRecvError::Overflowed(count));
206            }
207        };
208
209        let last_waiter;
210        if let Some((_elt, waiters)) = self.queue.get_mut(i) {
211            *pos += 1;
212            *waiters -= 1;
213            last_waiter = *waiters == 0;
214        } else {
215            debug_assert_eq!(i, self.queue.len());
216            if self.is_closed {
217                return Err(TryRecvError::Closed);
218            } else {
219                return Err(TryRecvError::Empty);
220            }
221        }
222
223        // If we read from the front of the queue and this is the last receiver reading it
224        // we can pop the queue instead of cloning the message
225        if last_waiter {
226            // Only the first element of the queue should have 0 waiters
227            assert_eq!(i, 0);
228
229            // Remove the element from the queue, adjust space, and notify senders
230            let elt = self.queue.pop_front().unwrap().0;
231            self.head_pos += 1;
232            if !self.overflow {
233                // Notify 1 awaiting senders that there is now room. If there is still room in the
234                // queue, the notified operation will notify another awaiting sender.
235                self.send_ops.notify(1);
236            }
237
238            Ok(Ok(elt))
239        } else {
240            Ok(Err(&self.queue[i].0))
241        }
242    }
243
244    /// Closes the channel and notifies all waiting operations.
245    ///
246    /// Returns `true` if this call has closed the channel and it was not closed already.
247    fn close(&mut self) -> bool {
248        if self.is_closed {
249            return false;
250        }
251
252        self.is_closed = true;
253        // Notify all waiting senders and receivers.
254        self.send_ops.notify(usize::MAX);
255        self.recv_ops.notify(usize::MAX);
256
257        true
258    }
259
260    /// Set the channel capacity.
261    ///
262    /// There are times when you need to change the channel's capacity after creating it. If the
263    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
264    /// dropped to shrink the channel.
265    fn set_capacity(&mut self, new_cap: usize) {
266        self.capacity = new_cap;
267        if new_cap > self.queue.capacity() {
268            let diff = new_cap - self.queue.capacity();
269            self.queue.reserve(diff);
270        }
271
272        // Ensure queue doesn't have more than `new_cap` messages.
273        if new_cap < self.queue.len() {
274            let diff = self.queue.len() - new_cap;
275            self.queue.drain(0..diff);
276            self.head_pos += diff as u64;
277        }
278    }
279
280    /// Close the channel if there aren't any receivers present anymore
281    fn close_channel(&mut self) {
282        if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
283            self.close();
284        }
285    }
286}
287
288/// The sending side of the broadcast channel.
289///
290/// Senders can be cloned and shared among threads. When all senders associated with a channel are
291/// dropped, the channel becomes closed.
292///
293/// The channel can also be closed manually by calling [`Sender::close()`].
294#[derive(Debug)]
295pub struct Sender<T> {
296    inner: Arc<RwLock<Inner<T>>>,
297}
298
299impl<T> Sender<T> {
300    /// Returns the channel capacity.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use async_broadcast::broadcast;
306    ///
307    /// let (s, r) = broadcast::<i32>(5);
308    /// assert_eq!(s.capacity(), 5);
309    /// ```
310    pub fn capacity(&self) -> usize {
311        self.inner.read().unwrap().capacity
312    }
313
314    /// Set the channel capacity.
315    ///
316    /// There are times when you need to change the channel's capacity after creating it. If the
317    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
318    /// dropped to shrink the channel.
319    ///
320    /// # Examples
321    ///
322    /// ```
323    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
324    ///
325    /// let (mut s, mut r) = broadcast::<i32>(3);
326    /// assert_eq!(s.capacity(), 3);
327    /// s.try_broadcast(1).unwrap();
328    /// s.try_broadcast(2).unwrap();
329    /// s.try_broadcast(3).unwrap();
330    ///
331    /// s.set_capacity(1);
332    /// assert_eq!(s.capacity(), 1);
333    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
334    /// assert_eq!(r.try_recv().unwrap(), 3);
335    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
336    /// s.try_broadcast(1).unwrap();
337    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
338    ///
339    /// s.set_capacity(2);
340    /// assert_eq!(s.capacity(), 2);
341    /// s.try_broadcast(2).unwrap();
342    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
343    /// ```
344    pub fn set_capacity(&mut self, new_cap: usize) {
345        self.inner.write().unwrap().set_capacity(new_cap);
346    }
347
348    /// If overflow mode is enabled on this channel.
349    ///
350    /// # Examples
351    ///
352    /// ```
353    /// use async_broadcast::broadcast;
354    ///
355    /// let (s, r) = broadcast::<i32>(5);
356    /// assert!(!s.overflow());
357    /// ```
358    pub fn overflow(&self) -> bool {
359        self.inner.read().unwrap().overflow
360    }
361
362    /// Set overflow mode on the channel.
363    ///
364    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
365    /// full. It achieves that by removing the oldest message from the channel.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
371    ///
372    /// let (mut s, mut r) = broadcast::<i32>(2);
373    /// s.try_broadcast(1).unwrap();
374    /// s.try_broadcast(2).unwrap();
375    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
376    /// s.set_overflow(true);
377    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
378    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
379    ///
380    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
381    /// assert_eq!(r.try_recv().unwrap(), 3);
382    /// assert_eq!(r.try_recv().unwrap(), 4);
383    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
384    /// ```
385    pub fn set_overflow(&mut self, overflow: bool) {
386        self.inner.write().unwrap().overflow = overflow;
387    }
388
389    /// If sender will wait for active receivers.
390    ///
391    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
392    /// `true`.
393    ///
394    /// # Examples
395    ///
396    /// ```
397    /// use async_broadcast::broadcast;
398    ///
399    /// let (s, _) = broadcast::<i32>(5);
400    /// assert!(s.await_active());
401    /// ```
402    pub fn await_active(&self) -> bool {
403        self.inner.read().unwrap().await_active
404    }
405
406    /// Specify if sender will wait for active receivers.
407    ///
408    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
409    /// `true`.
410    ///
411    /// # Examples
412    ///
413    /// ```
414    /// # futures_lite::future::block_on(async {
415    /// use async_broadcast::broadcast;
416    ///
417    /// let (mut s, mut r) = broadcast::<i32>(2);
418    /// s.broadcast(1).await.unwrap();
419    ///
420    /// let _ = r.deactivate();
421    /// s.set_await_active(false);
422    /// assert!(s.broadcast(2).await.is_err());
423    /// # });
424    /// ```
425    pub fn set_await_active(&mut self, await_active: bool) {
426        self.inner.write().unwrap().await_active = await_active;
427    }
428
429    /// Closes the channel.
430    ///
431    /// Returns `true` if this call has closed the channel and it was not closed already.
432    ///
433    /// The remaining messages can still be received.
434    ///
435    /// # Examples
436    ///
437    /// ```
438    /// # futures_lite::future::block_on(async {
439    /// use async_broadcast::{broadcast, RecvError};
440    ///
441    /// let (s, mut r) = broadcast(1);
442    /// s.broadcast(1).await.unwrap();
443    /// assert!(s.close());
444    ///
445    /// assert_eq!(r.recv().await.unwrap(), 1);
446    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
447    /// # });
448    /// ```
449    pub fn close(&self) -> bool {
450        self.inner.write().unwrap().close()
451    }
452
453    /// Returns `true` if the channel is closed.
454    ///
455    /// # Examples
456    ///
457    /// ```
458    /// # futures_lite::future::block_on(async {
459    /// use async_broadcast::{broadcast, RecvError};
460    ///
461    /// let (s, r) = broadcast::<()>(1);
462    /// assert!(!s.is_closed());
463    ///
464    /// drop(r);
465    /// assert!(s.is_closed());
466    /// # });
467    /// ```
468    pub fn is_closed(&self) -> bool {
469        self.inner.read().unwrap().is_closed
470    }
471
472    /// Returns `true` if the channel is empty.
473    ///
474    /// # Examples
475    ///
476    /// ```
477    /// # futures_lite::future::block_on(async {
478    /// use async_broadcast::broadcast;
479    ///
480    /// let (s, r) = broadcast(1);
481    ///
482    /// assert!(s.is_empty());
483    /// s.broadcast(1).await;
484    /// assert!(!s.is_empty());
485    /// # });
486    /// ```
487    pub fn is_empty(&self) -> bool {
488        self.inner.read().unwrap().queue.is_empty()
489    }
490
491    /// Returns `true` if the channel is full.
492    ///
493    /// # Examples
494    ///
495    /// ```
496    /// # futures_lite::future::block_on(async {
497    /// use async_broadcast::broadcast;
498    ///
499    /// let (s, r) = broadcast(1);
500    ///
501    /// assert!(!s.is_full());
502    /// s.broadcast(1).await;
503    /// assert!(s.is_full());
504    /// # });
505    /// ```
506    pub fn is_full(&self) -> bool {
507        let inner = self.inner.read().unwrap();
508
509        inner.queue.len() == inner.capacity
510    }
511
512    /// Returns the number of messages in the channel.
513    ///
514    /// # Examples
515    ///
516    /// ```
517    /// # futures_lite::future::block_on(async {
518    /// use async_broadcast::broadcast;
519    ///
520    /// let (s, r) = broadcast(2);
521    /// assert_eq!(s.len(), 0);
522    ///
523    /// s.broadcast(1).await;
524    /// s.broadcast(2).await;
525    /// assert_eq!(s.len(), 2);
526    /// # });
527    /// ```
528    pub fn len(&self) -> usize {
529        self.inner.read().unwrap().queue.len()
530    }
531
532    /// Returns the number of receivers for the channel.
533    ///
534    /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
535    /// are interested in that.
536    ///
537    /// # Examples
538    ///
539    /// ```
540    /// use async_broadcast::broadcast;
541    ///
542    /// let (s, r) = broadcast::<()>(1);
543    /// assert_eq!(s.receiver_count(), 1);
544    /// let r = r.deactivate();
545    /// assert_eq!(s.receiver_count(), 0);
546    ///
547    /// let r2 = r.activate_cloned();
548    /// assert_eq!(r.receiver_count(), 1);
549    /// assert_eq!(r.inactive_receiver_count(), 1);
550    /// ```
551    pub fn receiver_count(&self) -> usize {
552        self.inner.read().unwrap().receiver_count
553    }
554
555    /// Returns the number of inactive receivers for the channel.
556    ///
557    /// # Examples
558    ///
559    /// ```
560    /// use async_broadcast::broadcast;
561    ///
562    /// let (s, r) = broadcast::<()>(1);
563    /// assert_eq!(s.receiver_count(), 1);
564    /// let r = r.deactivate();
565    /// assert_eq!(s.receiver_count(), 0);
566    ///
567    /// let r2 = r.activate_cloned();
568    /// assert_eq!(r.receiver_count(), 1);
569    /// assert_eq!(r.inactive_receiver_count(), 1);
570    /// ```
571    pub fn inactive_receiver_count(&self) -> usize {
572        self.inner.read().unwrap().inactive_receiver_count
573    }
574
575    /// Returns the number of senders for the channel.
576    ///
577    /// # Examples
578    ///
579    /// ```
580    /// # futures_lite::future::block_on(async {
581    /// use async_broadcast::broadcast;
582    ///
583    /// let (s, r) = broadcast::<()>(1);
584    /// assert_eq!(s.sender_count(), 1);
585    ///
586    /// let s2 = s.clone();
587    /// assert_eq!(s.sender_count(), 2);
588    /// # });
589    /// ```
590    pub fn sender_count(&self) -> usize {
591        self.inner.read().unwrap().sender_count
592    }
593
594    /// Produce a new Receiver for this channel.
595    ///
596    /// The new receiver starts with zero messages available.  This will not re-open the channel if
597    /// it was closed due to all receivers being dropped.
598    ///
599    /// # Examples
600    ///
601    /// ```
602    /// # futures_lite::future::block_on(async {
603    /// use async_broadcast::{broadcast, RecvError};
604    ///
605    /// let (s, mut r1) = broadcast(2);
606    ///
607    /// assert_eq!(s.broadcast(1).await, Ok(None));
608    ///
609    /// let mut r2 = s.new_receiver();
610    ///
611    /// assert_eq!(s.broadcast(2).await, Ok(None));
612    /// drop(s);
613    ///
614    /// assert_eq!(r1.recv().await, Ok(1));
615    /// assert_eq!(r1.recv().await, Ok(2));
616    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
617    ///
618    /// assert_eq!(r2.recv().await, Ok(2));
619    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
620    /// # });
621    /// ```
622    pub fn new_receiver(&self) -> Receiver<T> {
623        let mut inner = self.inner.write().unwrap();
624        inner.receiver_count += 1;
625        Receiver {
626            inner: self.inner.clone(),
627            pos: inner.head_pos + inner.queue.len() as u64,
628            listener: None,
629        }
630    }
631}
632
633impl<T: Clone> Sender<T> {
634    /// Broadcasts a message on the channel.
635    ///
636    /// If the channel is full, this method waits until there is space for a message unless:
637    ///
638    /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
639    ///    the oldest message from the channel to make room for the new message. The removed message
640    ///    is returned to the caller.
641    /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
642    ///    [`SendError`] immediately.
643    ///
644    /// If the channel is closed, this method returns an error.
645    ///
646    /// # Examples
647    ///
648    /// ```
649    /// # futures_lite::future::block_on(async {
650    /// use async_broadcast::{broadcast, SendError};
651    ///
652    /// let (s, r) = broadcast(1);
653    ///
654    /// assert_eq!(s.broadcast(1).await, Ok(None));
655    /// drop(r);
656    /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
657    /// # });
658    /// ```
659    pub fn broadcast(&self, msg: T) -> Send<'_, T> {
660        Send {
661            sender: self,
662            listener: None,
663            msg: Some(msg),
664        }
665    }
666
667    /// Attempts to broadcast a message on the channel.
668    ///
669    /// If the channel is full, this method returns an error unless overflow mode (set through
670    /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
671    /// oldest message from the channel to make room for the new message. The removed message
672    /// is returned to the caller.
673    ///
674    /// If the channel is closed, this method returns an error.
675    ///
676    /// # Examples
677    ///
678    /// ```
679    /// use async_broadcast::{broadcast, TrySendError};
680    ///
681    /// let (s, r) = broadcast(1);
682    ///
683    /// assert_eq!(s.try_broadcast(1), Ok(None));
684    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
685    ///
686    /// drop(r);
687    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
688    /// ```
689    pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
690        let mut ret = None;
691        let mut inner = self.inner.write().unwrap();
692
693        if inner.is_closed {
694            return Err(TrySendError::Closed(msg));
695        } else if inner.receiver_count == 0 {
696            assert!(inner.inactive_receiver_count != 0);
697
698            return Err(TrySendError::Inactive(msg));
699        } else if inner.queue.len() == inner.capacity {
700            if inner.overflow {
701                // Make room by popping a message.
702                ret = inner.queue.pop_front().map(|(m, _)| m);
703            } else {
704                return Err(TrySendError::Full(msg));
705            }
706        }
707        let receiver_count = inner.receiver_count;
708        inner.queue.push_back((msg, receiver_count));
709        if ret.is_some() {
710            inner.head_pos += 1;
711        }
712
713        // Notify all awaiting receive operations.
714        inner.recv_ops.notify(usize::MAX);
715
716        Ok(ret)
717    }
718}
719
720impl<T> Drop for Sender<T> {
721    fn drop(&mut self) {
722        let mut inner = self.inner.write().unwrap();
723
724        inner.sender_count -= 1;
725
726        if inner.sender_count == 0 {
727            inner.close();
728        }
729    }
730}
731
732impl<T> Clone for Sender<T> {
733    fn clone(&self) -> Self {
734        self.inner.write().unwrap().sender_count += 1;
735
736        Sender {
737            inner: self.inner.clone(),
738        }
739    }
740}
741
742/// The receiving side of a channel.
743///
744/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
745/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
746/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
747/// receivers around.
748#[derive(Debug)]
749pub struct Receiver<T> {
750    inner: Arc<RwLock<Inner<T>>>,
751    pos: u64,
752
753    /// Listens for a send or close event to unblock this stream.
754    listener: Option<EventListener>,
755}
756
757impl<T> Receiver<T> {
758    /// Returns the channel capacity.
759    ///
760    /// # Examples
761    ///
762    /// ```
763    /// use async_broadcast::broadcast;
764    ///
765    /// let (_s, r) = broadcast::<i32>(5);
766    /// assert_eq!(r.capacity(), 5);
767    /// ```
768    pub fn capacity(&self) -> usize {
769        self.inner.read().unwrap().capacity
770    }
771
772    /// Set the channel capacity.
773    ///
774    /// There are times when you need to change the channel's capacity after creating it. If the
775    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
776    /// dropped to shrink the channel.
777    ///
778    /// # Examples
779    ///
780    /// ```
781    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
782    ///
783    /// let (s, mut r) = broadcast::<i32>(3);
784    /// assert_eq!(r.capacity(), 3);
785    /// s.try_broadcast(1).unwrap();
786    /// s.try_broadcast(2).unwrap();
787    /// s.try_broadcast(3).unwrap();
788    ///
789    /// r.set_capacity(1);
790    /// assert_eq!(r.capacity(), 1);
791    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
792    /// assert_eq!(r.try_recv().unwrap(), 3);
793    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
794    /// s.try_broadcast(1).unwrap();
795    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
796    ///
797    /// r.set_capacity(2);
798    /// assert_eq!(r.capacity(), 2);
799    /// s.try_broadcast(2).unwrap();
800    /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
801    /// ```
802    pub fn set_capacity(&mut self, new_cap: usize) {
803        self.inner.write().unwrap().set_capacity(new_cap);
804    }
805
806    /// If overflow mode is enabled on this channel.
807    ///
808    /// # Examples
809    ///
810    /// ```
811    /// use async_broadcast::broadcast;
812    ///
813    /// let (_s, r) = broadcast::<i32>(5);
814    /// assert!(!r.overflow());
815    /// ```
816    pub fn overflow(&self) -> bool {
817        self.inner.read().unwrap().overflow
818    }
819
820    /// Set overflow mode on the channel.
821    ///
822    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
823    /// full. It achieves that by removing the oldest message from the channel.
824    ///
825    /// # Examples
826    ///
827    /// ```
828    /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
829    ///
830    /// let (s, mut r) = broadcast::<i32>(2);
831    /// s.try_broadcast(1).unwrap();
832    /// s.try_broadcast(2).unwrap();
833    /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
834    /// r.set_overflow(true);
835    /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
836    /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
837    ///
838    /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
839    /// assert_eq!(r.try_recv().unwrap(), 3);
840    /// assert_eq!(r.try_recv().unwrap(), 4);
841    /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
842    /// ```
843    pub fn set_overflow(&mut self, overflow: bool) {
844        self.inner.write().unwrap().overflow = overflow;
845    }
846
847    /// If sender will wait for active receivers.
848    ///
849    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
850    /// `true`.
851    ///
852    /// # Examples
853    ///
854    /// ```
855    /// use async_broadcast::broadcast;
856    ///
857    /// let (_, r) = broadcast::<i32>(5);
858    /// assert!(r.await_active());
859    /// ```
860    pub fn await_active(&self) -> bool {
861        self.inner.read().unwrap().await_active
862    }
863
864    /// Specify if sender will wait for active receivers.
865    ///
866    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
867    /// `true`.
868    ///
869    /// # Examples
870    ///
871    /// ```
872    /// # futures_lite::future::block_on(async {
873    /// use async_broadcast::broadcast;
874    ///
875    /// let (s, mut r) = broadcast::<i32>(2);
876    /// s.broadcast(1).await.unwrap();
877    ///
878    /// r.set_await_active(false);
879    /// let _ = r.deactivate();
880    /// assert!(s.broadcast(2).await.is_err());
881    /// # });
882    /// ```
883    pub fn set_await_active(&mut self, await_active: bool) {
884        self.inner.write().unwrap().await_active = await_active;
885    }
886
887    /// Closes the channel.
888    ///
889    /// Returns `true` if this call has closed the channel and it was not closed already.
890    ///
891    /// The remaining messages can still be received.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// # futures_lite::future::block_on(async {
897    /// use async_broadcast::{broadcast, RecvError};
898    ///
899    /// let (s, mut r) = broadcast(1);
900    /// s.broadcast(1).await.unwrap();
901    /// assert!(s.close());
902    ///
903    /// assert_eq!(r.recv().await.unwrap(), 1);
904    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
905    /// # });
906    /// ```
907    pub fn close(&self) -> bool {
908        self.inner.write().unwrap().close()
909    }
910
911    /// Returns `true` if the channel is closed.
912    ///
913    /// # Examples
914    ///
915    /// ```
916    /// # futures_lite::future::block_on(async {
917    /// use async_broadcast::{broadcast, RecvError};
918    ///
919    /// let (s, r) = broadcast::<()>(1);
920    /// assert!(!s.is_closed());
921    ///
922    /// drop(r);
923    /// assert!(s.is_closed());
924    /// # });
925    /// ```
926    pub fn is_closed(&self) -> bool {
927        self.inner.read().unwrap().is_closed
928    }
929
930    /// Returns `true` if the channel is empty.
931    ///
932    /// # Examples
933    ///
934    /// ```
935    /// # futures_lite::future::block_on(async {
936    /// use async_broadcast::broadcast;
937    ///
938    /// let (s, r) = broadcast(1);
939    ///
940    /// assert!(s.is_empty());
941    /// s.broadcast(1).await;
942    /// assert!(!s.is_empty());
943    /// # });
944    /// ```
945    pub fn is_empty(&self) -> bool {
946        self.inner.read().unwrap().queue.is_empty()
947    }
948
949    /// Returns `true` if the channel is full.
950    ///
951    /// # Examples
952    ///
953    /// ```
954    /// # futures_lite::future::block_on(async {
955    /// use async_broadcast::broadcast;
956    ///
957    /// let (s, r) = broadcast(1);
958    ///
959    /// assert!(!s.is_full());
960    /// s.broadcast(1).await;
961    /// assert!(s.is_full());
962    /// # });
963    /// ```
964    pub fn is_full(&self) -> bool {
965        let inner = self.inner.read().unwrap();
966
967        inner.queue.len() == inner.capacity
968    }
969
970    /// Returns the number of messages in the channel.
971    ///
972    /// # Examples
973    ///
974    /// ```
975    /// # futures_lite::future::block_on(async {
976    /// use async_broadcast::broadcast;
977    ///
978    /// let (s, r) = broadcast(2);
979    /// assert_eq!(s.len(), 0);
980    ///
981    /// s.broadcast(1).await;
982    /// s.broadcast(2).await;
983    /// assert_eq!(s.len(), 2);
984    /// # });
985    /// ```
986    pub fn len(&self) -> usize {
987        self.inner.read().unwrap().queue.len()
988    }
989
990    /// Returns the number of receivers for the channel.
991    ///
992    /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
993    /// are interested in that.
994    ///
995    /// # Examples
996    ///
997    /// ```
998    /// use async_broadcast::broadcast;
999    ///
1000    /// let (s, r) = broadcast::<()>(1);
1001    /// assert_eq!(s.receiver_count(), 1);
1002    /// let r = r.deactivate();
1003    /// assert_eq!(s.receiver_count(), 0);
1004    ///
1005    /// let r2 = r.activate_cloned();
1006    /// assert_eq!(r.receiver_count(), 1);
1007    /// assert_eq!(r.inactive_receiver_count(), 1);
1008    /// ```
1009    pub fn receiver_count(&self) -> usize {
1010        self.inner.read().unwrap().receiver_count
1011    }
1012
1013    /// Returns the number of inactive receivers for the channel.
1014    ///
1015    /// # Examples
1016    ///
1017    /// ```
1018    /// use async_broadcast::broadcast;
1019    ///
1020    /// let (s, r) = broadcast::<()>(1);
1021    /// assert_eq!(s.receiver_count(), 1);
1022    /// let r = r.deactivate();
1023    /// assert_eq!(s.receiver_count(), 0);
1024    ///
1025    /// let r2 = r.activate_cloned();
1026    /// assert_eq!(r.receiver_count(), 1);
1027    /// assert_eq!(r.inactive_receiver_count(), 1);
1028    /// ```
1029    pub fn inactive_receiver_count(&self) -> usize {
1030        self.inner.read().unwrap().inactive_receiver_count
1031    }
1032
1033    /// Returns the number of senders for the channel.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```
1038    /// # futures_lite::future::block_on(async {
1039    /// use async_broadcast::broadcast;
1040    ///
1041    /// let (s, r) = broadcast::<()>(1);
1042    /// assert_eq!(s.sender_count(), 1);
1043    ///
1044    /// let s2 = s.clone();
1045    /// assert_eq!(s.sender_count(), 2);
1046    /// # });
1047    /// ```
1048    pub fn sender_count(&self) -> usize {
1049        self.inner.read().unwrap().sender_count
1050    }
1051
1052    /// Downgrade to a [`InactiveReceiver`].
1053    ///
1054    /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1055    /// is keep the associated channel open even when there are no (active) receivers. An inactive
1056    /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1057    /// [`InactiveReceiver::activate_cloned`].
1058    ///
1059    /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1060    /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1061    /// active receiver is available.
1062    ///
1063    /// # Examples
1064    ///
1065    /// ```
1066    /// # futures_lite::future::block_on(async {
1067    /// use async_broadcast::{broadcast, TrySendError};
1068    ///
1069    /// let (s, r) = broadcast(1);
1070    /// let inactive = r.deactivate();
1071    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1072    ///
1073    /// let mut r = inactive.activate();
1074    /// assert_eq!(s.broadcast(10).await, Ok(None));
1075    /// assert_eq!(r.recv().await, Ok(10));
1076    /// # });
1077    /// ```
1078    pub fn deactivate(self) -> InactiveReceiver<T> {
1079        // Drop::drop impl of Receiver will take care of `receiver_count`.
1080        self.inner.write().unwrap().inactive_receiver_count += 1;
1081
1082        InactiveReceiver {
1083            inner: self.inner.clone(),
1084        }
1085    }
1086}
1087
1088impl<T: Clone> Receiver<T> {
1089    /// Receives a message from the channel.
1090    ///
1091    /// If the channel is empty, this method waits until there is a message.
1092    ///
1093    /// If the channel is closed, this method receives a message or returns an error if there are
1094    /// no more messages.
1095    ///
1096    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1097    /// this method returns an error and readjusts its cursor to point to the first available
1098    /// message.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```
1103    /// # futures_lite::future::block_on(async {
1104    /// use async_broadcast::{broadcast, RecvError};
1105    ///
1106    /// let (s, mut r1) = broadcast(1);
1107    /// let mut r2 = r1.clone();
1108    ///
1109    /// assert_eq!(s.broadcast(1).await, Ok(None));
1110    /// drop(s);
1111    ///
1112    /// assert_eq!(r1.recv().await, Ok(1));
1113    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1114    /// assert_eq!(r2.recv().await, Ok(1));
1115    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1116    /// # });
1117    /// ```
1118    pub fn recv(&mut self) -> Recv<'_, T> {
1119        Recv {
1120            receiver: self,
1121            listener: None,
1122        }
1123    }
1124
1125    /// Attempts to receive a message from the channel.
1126    ///
1127    /// If the channel is empty or closed, this method returns an error.
1128    ///
1129    /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1130    /// this method returns an error and readjusts its cursor to point to the first available
1131    /// message.
1132    ///
1133    /// # Examples
1134    ///
1135    /// ```
1136    /// # futures_lite::future::block_on(async {
1137    /// use async_broadcast::{broadcast, TryRecvError};
1138    ///
1139    /// let (s, mut r1) = broadcast(1);
1140    /// let mut r2 = r1.clone();
1141    /// assert_eq!(s.broadcast(1).await, Ok(None));
1142    ///
1143    /// assert_eq!(r1.try_recv(), Ok(1));
1144    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1145    /// assert_eq!(r2.try_recv(), Ok(1));
1146    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1147    ///
1148    /// drop(s);
1149    /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1150    /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1151    /// # });
1152    /// ```
1153    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1154        self.inner
1155            .write()
1156            .unwrap()
1157            .try_recv_at(&mut self.pos)
1158            .map(|cow| cow.unwrap_or_else(T::clone))
1159    }
1160
1161    /// Produce a new Sender for this channel.
1162    ///
1163    /// This will not re-open the channel if it was closed due to all senders being dropped.
1164    ///
1165    /// # Examples
1166    ///
1167    /// ```
1168    /// # futures_lite::future::block_on(async {
1169    /// use async_broadcast::{broadcast, RecvError};
1170    ///
1171    /// let (s1, mut r) = broadcast(2);
1172    ///
1173    /// assert_eq!(s1.broadcast(1).await, Ok(None));
1174    ///
1175    /// let mut s2 = r.new_sender();
1176    ///
1177    /// assert_eq!(s2.broadcast(2).await, Ok(None));
1178    /// drop(s1);
1179    /// drop(s2);
1180    ///
1181    /// assert_eq!(r.recv().await, Ok(1));
1182    /// assert_eq!(r.recv().await, Ok(2));
1183    /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1184    /// # });
1185    /// ```
1186    pub fn new_sender(&self) -> Sender<T> {
1187        self.inner.write().unwrap().sender_count += 1;
1188
1189        Sender {
1190            inner: self.inner.clone(),
1191        }
1192    }
1193
1194    /// Produce a new Receiver for this channel.
1195    ///
1196    /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1197    /// messages available.  This is slightly faster than a real clone.
1198    ///
1199    /// # Examples
1200    ///
1201    /// ```
1202    /// # futures_lite::future::block_on(async {
1203    /// use async_broadcast::{broadcast, RecvError};
1204    ///
1205    /// let (s, mut r1) = broadcast(2);
1206    ///
1207    /// assert_eq!(s.broadcast(1).await, Ok(None));
1208    ///
1209    /// let mut r2 = r1.new_receiver();
1210    ///
1211    /// assert_eq!(s.broadcast(2).await, Ok(None));
1212    /// drop(s);
1213    ///
1214    /// assert_eq!(r1.recv().await, Ok(1));
1215    /// assert_eq!(r1.recv().await, Ok(2));
1216    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1217    ///
1218    /// assert_eq!(r2.recv().await, Ok(2));
1219    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1220    /// # });
1221    /// ```
1222    pub fn new_receiver(&self) -> Self {
1223        let mut inner = self.inner.write().unwrap();
1224        inner.receiver_count += 1;
1225        Receiver {
1226            inner: self.inner.clone(),
1227            pos: inner.head_pos + inner.queue.len() as u64,
1228            listener: None,
1229        }
1230    }
1231}
1232
1233impl<T> Drop for Receiver<T> {
1234    fn drop(&mut self) {
1235        let mut inner = self.inner.write().unwrap();
1236
1237        // Remove ourself from each item's counter
1238        loop {
1239            match inner.try_recv_at(&mut self.pos) {
1240                Ok(_) => continue,
1241                Err(TryRecvError::Overflowed(_)) => continue,
1242                Err(TryRecvError::Closed) => break,
1243                Err(TryRecvError::Empty) => break,
1244            }
1245        }
1246
1247        inner.receiver_count -= 1;
1248
1249        inner.close_channel();
1250    }
1251}
1252
1253impl<T> Clone for Receiver<T> {
1254    /// Produce a clone of this Receiver that has the same messages queued.
1255    ///
1256    /// # Examples
1257    ///
1258    /// ```
1259    /// # futures_lite::future::block_on(async {
1260    /// use async_broadcast::{broadcast, RecvError};
1261    ///
1262    /// let (s, mut r1) = broadcast(1);
1263    ///
1264    /// assert_eq!(s.broadcast(1).await, Ok(None));
1265    /// drop(s);
1266    ///
1267    /// let mut r2 = r1.clone();
1268    ///
1269    /// assert_eq!(r1.recv().await, Ok(1));
1270    /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1271    /// assert_eq!(r2.recv().await, Ok(1));
1272    /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1273    /// # });
1274    /// ```
1275    fn clone(&self) -> Self {
1276        let mut inner = self.inner.write().unwrap();
1277        inner.receiver_count += 1;
1278        // increment the waiter count on all items not yet received by this object
1279        let n = self.pos.saturating_sub(inner.head_pos) as usize;
1280        for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1281            *waiters += 1;
1282        }
1283        Receiver {
1284            inner: self.inner.clone(),
1285            pos: self.pos,
1286            listener: None,
1287        }
1288    }
1289}
1290
1291impl<T: Clone> Stream for Receiver<T> {
1292    type Item = T;
1293
1294    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1295        loop {
1296            // If this stream is listening for events, first wait for a notification.
1297            if let Some(listener) = self.listener.as_mut() {
1298                ready!(Pin::new(listener).poll(cx));
1299                self.listener = None;
1300            }
1301
1302            loop {
1303                // Attempt to receive a message.
1304                match self.try_recv() {
1305                    Ok(msg) => {
1306                        // The stream is not blocked on an event - drop the listener.
1307                        self.listener = None;
1308                        return Poll::Ready(Some(msg));
1309                    }
1310                    Err(TryRecvError::Closed) => {
1311                        // The stream is not blocked on an event - drop the listener.
1312                        self.listener = None;
1313                        return Poll::Ready(None);
1314                    }
1315                    Err(TryRecvError::Overflowed(_)) => continue,
1316                    Err(TryRecvError::Empty) => {}
1317                }
1318
1319                // Receiving failed - now start listening for notifications or wait for one.
1320                match self.listener.as_mut() {
1321                    None => {
1322                        // Start listening and then try receiving again.
1323                        self.listener = {
1324                            let inner = self.inner.write().unwrap();
1325                            Some(inner.recv_ops.listen())
1326                        };
1327                    }
1328                    Some(_) => {
1329                        // Go back to the outer loop to poll the listener.
1330                        break;
1331                    }
1332                }
1333            }
1334        }
1335    }
1336}
1337
1338impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1339    fn is_terminated(&self) -> bool {
1340        let inner = self.inner.read().unwrap();
1341
1342        inner.is_closed && inner.queue.is_empty()
1343    }
1344}
1345
1346/// An error returned from [`Sender::broadcast()`].
1347///
1348/// Received because the channel is closed or no active receivers were present while `await-active`
1349/// was set to `false` (See [`Sender::set_await_active`] for details).
1350#[derive(PartialEq, Eq, Clone, Copy)]
1351pub struct SendError<T>(pub T);
1352
1353impl<T> SendError<T> {
1354    /// Unwraps the message that couldn't be sent.
1355    pub fn into_inner(self) -> T {
1356        self.0
1357    }
1358}
1359
1360impl<T> error::Error for SendError<T> {}
1361
1362impl<T> fmt::Debug for SendError<T> {
1363    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1364        write!(f, "SendError(..)")
1365    }
1366}
1367
1368impl<T> fmt::Display for SendError<T> {
1369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370        write!(f, "sending into a closed channel")
1371    }
1372}
1373
1374/// An error returned from [`Sender::try_broadcast()`].
1375#[derive(PartialEq, Eq, Clone, Copy)]
1376pub enum TrySendError<T> {
1377    /// The channel is full but not closed.
1378    Full(T),
1379
1380    /// The channel is closed.
1381    Closed(T),
1382
1383    /// There are currently no active receivers, only inactive ones.
1384    Inactive(T),
1385}
1386
1387impl<T> TrySendError<T> {
1388    /// Unwraps the message that couldn't be sent.
1389    pub fn into_inner(self) -> T {
1390        match self {
1391            TrySendError::Full(t) => t,
1392            TrySendError::Closed(t) => t,
1393            TrySendError::Inactive(t) => t,
1394        }
1395    }
1396
1397    /// Returns `true` if the channel is full but not closed.
1398    pub fn is_full(&self) -> bool {
1399        match self {
1400            TrySendError::Full(_) => true,
1401            TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1402        }
1403    }
1404
1405    /// Returns `true` if the channel is closed.
1406    pub fn is_closed(&self) -> bool {
1407        match self {
1408            TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1409            TrySendError::Closed(_) => true,
1410        }
1411    }
1412
1413    /// Returns `true` if there are currently no active receivers, only inactive ones.
1414    pub fn is_disconnected(&self) -> bool {
1415        match self {
1416            TrySendError::Full(_) | TrySendError::Closed(_) => false,
1417            TrySendError::Inactive(_) => true,
1418        }
1419    }
1420}
1421
1422impl<T> error::Error for TrySendError<T> {}
1423
1424impl<T> fmt::Debug for TrySendError<T> {
1425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1426        match *self {
1427            TrySendError::Full(..) => write!(f, "Full(..)"),
1428            TrySendError::Closed(..) => write!(f, "Closed(..)"),
1429            TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1430        }
1431    }
1432}
1433
1434impl<T> fmt::Display for TrySendError<T> {
1435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1436        match *self {
1437            TrySendError::Full(..) => write!(f, "sending into a full channel"),
1438            TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1439            TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1440        }
1441    }
1442}
1443
1444/// An error returned from [`Receiver::recv()`].
1445#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1446pub enum RecvError {
1447    /// The channel has overflowed since the last element was seen.  Future recv operations will
1448    /// succeed, but some messages have been skipped.
1449    ///
1450    /// Contains the number of messages missed.
1451    Overflowed(u64),
1452
1453    /// The channel is empty and closed.
1454    Closed,
1455}
1456
1457impl error::Error for RecvError {}
1458
1459impl fmt::Display for RecvError {
1460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1461        match self {
1462            Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
1463            Self::Closed => write!(f, "receiving from an empty and closed channel"),
1464        }
1465    }
1466}
1467
1468/// An error returned from [`Receiver::try_recv()`].
1469#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1470pub enum TryRecvError {
1471    /// The channel has overflowed since the last element was seen.  Future recv operations will
1472    /// succeed, but some messages have been skipped.
1473    Overflowed(u64),
1474
1475    /// The channel is empty but not closed.
1476    Empty,
1477
1478    /// The channel is empty and closed.
1479    Closed,
1480}
1481
1482impl TryRecvError {
1483    /// Returns `true` if the channel is empty but not closed.
1484    pub fn is_empty(&self) -> bool {
1485        match self {
1486            TryRecvError::Empty => true,
1487            TryRecvError::Closed => false,
1488            TryRecvError::Overflowed(_) => false,
1489        }
1490    }
1491
1492    /// Returns `true` if the channel is empty and closed.
1493    pub fn is_closed(&self) -> bool {
1494        match self {
1495            TryRecvError::Empty => false,
1496            TryRecvError::Closed => true,
1497            TryRecvError::Overflowed(_) => false,
1498        }
1499    }
1500
1501    /// Returns `true` if this error indicates the receiver missed messages.
1502    pub fn is_overflowed(&self) -> bool {
1503        match self {
1504            TryRecvError::Empty => false,
1505            TryRecvError::Closed => false,
1506            TryRecvError::Overflowed(_) => true,
1507        }
1508    }
1509}
1510
1511impl error::Error for TryRecvError {}
1512
1513impl fmt::Display for TryRecvError {
1514    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515        match *self {
1516            TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1517            TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1518            TryRecvError::Overflowed(n) => {
1519                write!(f, "receiving operation observed {} lost messages", n)
1520            }
1521        }
1522    }
1523}
1524
1525/// A future returned by [`Sender::broadcast()`].
1526#[derive(Debug)]
1527#[must_use = "futures do nothing unless .awaited"]
1528pub struct Send<'a, T> {
1529    sender: &'a Sender<T>,
1530    listener: Option<EventListener>,
1531    msg: Option<T>,
1532}
1533
1534impl<'a, T> Unpin for Send<'a, T> {}
1535
1536impl<'a, T: Clone> Future for Send<'a, T> {
1537    type Output = Result<Option<T>, SendError<T>>;
1538
1539    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1540        let mut this = Pin::new(self);
1541
1542        loop {
1543            let msg = this.msg.take().unwrap();
1544            let inner = &this.sender.inner;
1545
1546            // Attempt to send a message.
1547            match this.sender.try_broadcast(msg) {
1548                Ok(msg) => {
1549                    let inner = inner.write().unwrap();
1550
1551                    if inner.queue.len() < inner.capacity {
1552                        // Not full still, so notify the next awaiting sender.
1553                        inner.send_ops.notify(1);
1554                    }
1555
1556                    return Poll::Ready(Ok(msg));
1557                }
1558                Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1559                Err(TrySendError::Full(m)) => this.msg = Some(m),
1560                Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => {
1561                    this.msg = Some(m)
1562                }
1563                Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1564            }
1565
1566            // Sending failed - now start listening for notifications or wait for one.
1567            match &mut this.listener {
1568                None => {
1569                    // Start listening and then try sending again.
1570                    let inner = inner.write().unwrap();
1571                    this.listener = Some(inner.send_ops.listen());
1572                }
1573                Some(l) => {
1574                    // Wait for a notification.
1575                    ready!(Pin::new(l).poll(cx));
1576                    this.listener = None;
1577                }
1578            }
1579        }
1580    }
1581}
1582
1583/// A future returned by [`Receiver::recv()`].
1584#[derive(Debug)]
1585#[must_use = "futures do nothing unless .awaited"]
1586pub struct Recv<'a, T> {
1587    receiver: &'a mut Receiver<T>,
1588    listener: Option<EventListener>,
1589}
1590
1591impl<'a, T> Unpin for Recv<'a, T> {}
1592
1593impl<'a, T: Clone> Future for Recv<'a, T> {
1594    type Output = Result<T, RecvError>;
1595
1596    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1597        let mut this = Pin::new(self);
1598
1599        loop {
1600            // Attempt to receive a message.
1601            match this.receiver.try_recv() {
1602                Ok(msg) => return Poll::Ready(Ok(msg)),
1603                Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1604                Err(TryRecvError::Overflowed(n)) => {
1605                    return Poll::Ready(Err(RecvError::Overflowed(n)));
1606                }
1607                Err(TryRecvError::Empty) => {}
1608            }
1609
1610            // Receiving failed - now start listening for notifications or wait for one.
1611            match &mut this.listener {
1612                None => {
1613                    // Start listening and then try receiving again.
1614                    this.listener = {
1615                        let inner = this.receiver.inner.write().unwrap();
1616                        Some(inner.recv_ops.listen())
1617                    };
1618                }
1619                Some(l) => {
1620                    // Wait for a notification.
1621                    ready!(Pin::new(l).poll(cx));
1622                    this.listener = None;
1623                }
1624            }
1625        }
1626    }
1627}
1628
1629/// An inactive  receiver.
1630///
1631/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1632/// keeping a channel open even when no associated active receivers exist.
1633#[derive(Debug)]
1634pub struct InactiveReceiver<T> {
1635    inner: Arc<RwLock<Inner<T>>>,
1636}
1637
1638impl<T> InactiveReceiver<T> {
1639    /// Convert to an activate [`Receiver`].
1640    ///
1641    /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1642    ///
1643    /// # Examples
1644    ///
1645    /// ```
1646    /// use async_broadcast::{broadcast, TrySendError};
1647    ///
1648    /// let (s, r) = broadcast(1);
1649    /// let inactive = r.deactivate();
1650    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1651    ///
1652    /// let mut r = inactive.activate();
1653    /// assert_eq!(s.try_broadcast(10), Ok(None));
1654    /// assert_eq!(r.try_recv(), Ok(10));
1655    /// ```
1656    pub fn activate(self) -> Receiver<T> {
1657        self.activate_cloned()
1658    }
1659
1660    /// Create an activate [`Receiver`] for the associated channel.
1661    ///
1662    /// # Examples
1663    ///
1664    /// ```
1665    /// use async_broadcast::{broadcast, TrySendError};
1666    ///
1667    /// let (s, r) = broadcast(1);
1668    /// let inactive = r.deactivate();
1669    /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1670    ///
1671    /// let mut r = inactive.activate_cloned();
1672    /// assert_eq!(s.try_broadcast(10), Ok(None));
1673    /// assert_eq!(r.try_recv(), Ok(10));
1674    /// ```
1675    pub fn activate_cloned(&self) -> Receiver<T> {
1676        let mut inner = self.inner.write().unwrap();
1677        inner.receiver_count += 1;
1678
1679        if inner.receiver_count == 1 {
1680            // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1681            // queue, the notified operation will notify another awaiting sender.
1682            inner.send_ops.notify(1);
1683        }
1684
1685        Receiver {
1686            inner: self.inner.clone(),
1687            pos: inner.head_pos + inner.queue.len() as u64,
1688            listener: None,
1689        }
1690    }
1691
1692    /// Returns the channel capacity.
1693    ///
1694    /// See [`Receiver::capacity`] documentation for examples.
1695    pub fn capacity(&self) -> usize {
1696        self.inner.read().unwrap().capacity
1697    }
1698
1699    /// Set the channel capacity.
1700    ///
1701    /// There are times when you need to change the channel's capacity after creating it. If the
1702    /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1703    /// dropped to shrink the channel.
1704    ///
1705    /// See [`Receiver::set_capacity`] documentation for examples.
1706    pub fn set_capacity(&mut self, new_cap: usize) {
1707        self.inner.write().unwrap().set_capacity(new_cap);
1708    }
1709
1710    /// If overflow mode is enabled on this channel.
1711    ///
1712    /// See [`Receiver::overflow`] documentation for examples.
1713    pub fn overflow(&self) -> bool {
1714        self.inner.read().unwrap().overflow
1715    }
1716
1717    /// Set overflow mode on the channel.
1718    ///
1719    /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1720    /// full. It achieves that by removing the oldest message from the channel.
1721    ///
1722    /// See [`Receiver::set_overflow`] documentation for examples.
1723    pub fn set_overflow(&mut self, overflow: bool) {
1724        self.inner.write().unwrap().overflow = overflow;
1725    }
1726
1727    /// If sender will wait for active receivers.
1728    ///
1729    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1730    /// `true`.
1731    ///
1732    /// # Examples
1733    ///
1734    /// ```
1735    /// use async_broadcast::broadcast;
1736    ///
1737    /// let (_, r) = broadcast::<i32>(5);
1738    /// let r = r.deactivate();
1739    /// assert!(r.await_active());
1740    /// ```
1741    pub fn await_active(&self) -> bool {
1742        self.inner.read().unwrap().await_active
1743    }
1744
1745    /// Specify if sender will wait for active receivers.
1746    ///
1747    /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1748    /// `true`.
1749    ///
1750    /// # Examples
1751    ///
1752    /// ```
1753    /// # futures_lite::future::block_on(async {
1754    /// use async_broadcast::broadcast;
1755    ///
1756    /// let (s, r) = broadcast::<i32>(2);
1757    /// s.broadcast(1).await.unwrap();
1758    ///
1759    /// let mut r = r.deactivate();
1760    /// r.set_await_active(false);
1761    /// assert!(s.broadcast(2).await.is_err());
1762    /// # });
1763    /// ```
1764    pub fn set_await_active(&mut self, await_active: bool) {
1765        self.inner.write().unwrap().await_active = await_active;
1766    }
1767
1768    /// Closes the channel.
1769    ///
1770    /// Returns `true` if this call has closed the channel and it was not closed already.
1771    ///
1772    /// The remaining messages can still be received.
1773    ///
1774    /// See [`Receiver::close`] documentation for examples.
1775    pub fn close(&self) -> bool {
1776        self.inner.write().unwrap().close()
1777    }
1778
1779    /// Returns `true` if the channel is closed.
1780    ///
1781    /// See [`Receiver::is_closed`] documentation for examples.
1782    pub fn is_closed(&self) -> bool {
1783        self.inner.read().unwrap().is_closed
1784    }
1785
1786    /// Returns `true` if the channel is empty.
1787    ///
1788    /// See [`Receiver::is_empty`] documentation for examples.
1789    pub fn is_empty(&self) -> bool {
1790        self.inner.read().unwrap().queue.is_empty()
1791    }
1792
1793    /// Returns `true` if the channel is full.
1794    ///
1795    /// See [`Receiver::is_full`] documentation for examples.
1796    pub fn is_full(&self) -> bool {
1797        let inner = self.inner.read().unwrap();
1798
1799        inner.queue.len() == inner.capacity
1800    }
1801
1802    /// Returns the number of messages in the channel.
1803    ///
1804    /// See [`Receiver::len`] documentation for examples.
1805    pub fn len(&self) -> usize {
1806        self.inner.read().unwrap().queue.len()
1807    }
1808
1809    /// Returns the number of receivers for the channel.
1810    ///
1811    /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
1812    /// if you're interested in that.
1813    ///
1814    /// # Examples
1815    ///
1816    /// ```
1817    /// use async_broadcast::broadcast;
1818    ///
1819    /// let (s, r) = broadcast::<()>(1);
1820    /// assert_eq!(s.receiver_count(), 1);
1821    /// let r = r.deactivate();
1822    /// assert_eq!(s.receiver_count(), 0);
1823    ///
1824    /// let r2 = r.activate_cloned();
1825    /// assert_eq!(r.receiver_count(), 1);
1826    /// assert_eq!(r.inactive_receiver_count(), 1);
1827    /// ```
1828    pub fn receiver_count(&self) -> usize {
1829        self.inner.read().unwrap().receiver_count
1830    }
1831
1832    /// Returns the number of inactive receivers for the channel.
1833    ///
1834    /// # Examples
1835    ///
1836    /// ```
1837    /// use async_broadcast::broadcast;
1838    ///
1839    /// let (s, r) = broadcast::<()>(1);
1840    /// assert_eq!(s.receiver_count(), 1);
1841    /// let r = r.deactivate();
1842    /// assert_eq!(s.receiver_count(), 0);
1843    ///
1844    /// let r2 = r.activate_cloned();
1845    /// assert_eq!(r.receiver_count(), 1);
1846    /// assert_eq!(r.inactive_receiver_count(), 1);
1847    /// ```
1848    pub fn inactive_receiver_count(&self) -> usize {
1849        self.inner.read().unwrap().inactive_receiver_count
1850    }
1851
1852    /// Returns the number of senders for the channel.
1853    ///
1854    /// See [`Receiver::sender_count`] documentation for examples.
1855    pub fn sender_count(&self) -> usize {
1856        self.inner.read().unwrap().sender_count
1857    }
1858}
1859
1860impl<T> Clone for InactiveReceiver<T> {
1861    fn clone(&self) -> Self {
1862        self.inner.write().unwrap().inactive_receiver_count += 1;
1863
1864        InactiveReceiver {
1865            inner: self.inner.clone(),
1866        }
1867    }
1868}
1869
1870impl<T> Drop for InactiveReceiver<T> {
1871    fn drop(&mut self) {
1872        let mut inner = self.inner.write().unwrap();
1873
1874        inner.inactive_receiver_count -= 1;
1875        inner.close_channel();
1876    }
1877}