piper/
lib.rs

1//! A bounded single-producer single-consumer pipe.
2//!
3//! This crate provides a ring buffer that can be asynchronously read from and written to. It is
4//! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles.
5//! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively.
6//!
7//! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut`
8//! access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
9//! consider wrapping them in an `Arc<Mutex<...>>` or similar.
10//!
11//! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
12//! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
13//!
14//! When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
15//! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
16//!
17//! # Version 0.2.0 Notes
18//!
19//! Previously, this crate contained other synchronization primitives, such as bounded channels, locks,
20//! and event listeners. These have been split out into their own crates:
21//!
22//! - [`async-channel`](https://docs.rs/async-channel)
23//! - [`async-dup`](https://docs.rs/async-dup)
24//! - [`async-lock`](https://docs.rs/async-lock)
25//! - [`async-mutex`](https://docs.rs/async-mutex)
26//! - [`event-listener`](https://docs.rs/event-listener)
27//!
28//! # Examples
29//!
30//! ## Asynchronous Tasks
31//!
32//! Communicate between asynchronous tasks, potentially on other threads.
33//!
34//! ```
35//! use async_channel::unbounded;
36//! use async_executor::Executor;
37//! use easy_parallel::Parallel;
38//! use futures_lite::{future, prelude::*};
39//! use std::time::Duration;
40//!
41//! # if cfg!(miri) { return; }
42//!
43//! // Create a pair of handles.
44//! let (mut reader, mut writer) = piper::pipe(1024);
45//!
46//! // Create the executor.
47//! let ex = Executor::new();
48//! let (signal, shutdown) = unbounded::<()>();
49//!
50//! // Spawn a detached task for random data to the pipe.
51//! let writer = ex.spawn(async move {
52//!     for _ in 0..1_000 {
53//!         // Generate 8 random numnbers.
54//!         let random = fastrand::u64(..).to_le_bytes();
55//!
56//!         // Write them to the pipe.
57//!         writer.write_all(&random).await.unwrap();
58//!
59//!         // Wait a bit.
60//!         async_io::Timer::after(Duration::from_millis(5)).await;
61//!     }
62//!
63//!     // Drop the writer to close the pipe.
64//!     drop(writer);
65//! });
66//!
67//! // Detach the task so that it runs in the background.
68//! writer.detach();
69//!
70//! // Spawn a task for reading from the pipe.
71//! let reader = ex.spawn(async move {
72//!     let mut buf = vec![];
73//!
74//!     // Read all bytes from the pipe.
75//!     reader.read_to_end(&mut buf).await.unwrap();
76//!
77//!     println!("Random data: {:#?}", buf);
78//! });
79//!
80//! Parallel::new()
81//!     // Run four executor threads.
82//!     .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
83//!     // Run the main future on the current thread.
84//!     .finish(|| future::block_on(async {
85//!         // Wait for the reader to finish.
86//!         reader.await;
87//!
88//!         // Signal the executor threads to shut down.
89//!         drop(signal);
90//!     }));
91//! ```
92//!
93//! ## Blocking I/O
94//!
95//! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example
96//! spawns another thread for reading a file and writing it to a pipe.
97//!
98//! ```no_run
99//! use futures_lite::{future, prelude::*};
100//! use std::fs::File;
101//! use std::io::prelude::*;
102//! use std::thread;
103//!
104//! // Create a pair of handles.
105//! let (mut r, mut w) = piper::pipe(1024);
106//!
107//! // Spawn a thread for reading a file.
108//! thread::spawn(move || {
109//!     let mut file = File::open("Cargo.toml").unwrap();
110//!
111//!     // Read the file into a buffer.
112//!     let mut buf = [0u8; 16384];
113//!     future::block_on(async move {
114//!         loop {
115//!             // Read a chunk of bytes from the file.
116//!             // Blocking is okay here, since this is a separate thread.
117//!             let n = file.read(&mut buf).unwrap();
118//!             if n == 0 {
119//!                 break;
120//!             }
121//!
122//!             // Write the chunk to the pipe.
123//!             w.write_all(&buf[..n]).await.unwrap();
124//!         }
125//!
126//!         // Close the pipe.
127//!         drop(w);
128//!     });
129//! });
130//!
131//! # future::block_on(async move {
132//! // Read bytes from the pipe.
133//! let mut buf = vec![];
134//! r.read_to_end(&mut buf).await.unwrap();
135//!
136//! println!("Read {} bytes", buf.len());
137//! # });
138//! ```
139//!
140//! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write`
141//! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
142//! the file into the pipe. This approach should be preferred when possible, as it avoids an extra
143//! copy.
144//!
145//! ```no_run
146//! # use futures_lite::future;
147//! # use std::fs::File;
148//! # let mut file: File = unimplemented!();
149//! # let mut w: piper::Writer = unimplemented!();
150//! // In the `future::block_on` call above...
151//! # future::block_on(async move {
152//! loop {
153//!     let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
154//!     if n == 0 {
155//!         break;
156//!     }
157//! }
158//! # });
159//! ```
160//!
161//! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for
162//! thread management and pipes.
163//!
164//! [`poll_fill`]: struct.Writer.html#method.poll_fill
165//! [`poll_drain`]: struct.Reader.html#method.poll_drain
166//! [`blocking`]: https://docs.rs/blocking
167
168#![cfg_attr(not(feature = "std"), no_std)]
169#![forbid(missing_docs)]
170#![doc(
171    html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
172)]
173#![doc(
174    html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
175)]
176
177extern crate alloc;
178
179use core::convert::Infallible;
180use core::mem;
181use core::slice;
182use core::task::{Context, Poll};
183
184use alloc::vec::Vec;
185
186use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
187use sync::Arc;
188
189#[cfg(feature = "std")]
190use std::{
191    io::{self, Read, Write},
192    pin::Pin,
193};
194
195use atomic_waker::AtomicWaker;
196
197#[cfg(feature = "std")]
198use futures_io::{AsyncRead, AsyncWrite};
199
200macro_rules! ready {
201    ($e:expr) => {{
202        match $e {
203            Poll::Ready(t) => t,
204            Poll::Pending => return Poll::Pending,
205        }
206    }};
207}
208
209/// Creates a bounded single-producer single-consumer pipe.
210///
211/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
212///
213/// See the [crate-level documentation](index.html) for more details.
214///
215/// # Panics
216///
217/// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`.
218#[allow(clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280
219pub fn pipe(cap: usize) -> (Reader, Writer) {
220    assert!(cap > 0, "capacity must be positive");
221    assert!(cap.checked_mul(2).is_some(), "capacity is too large");
222
223    // Allocate the ring buffer.
224    let mut v = Vec::with_capacity(cap);
225    let buffer = v.as_mut_ptr();
226    mem::forget(v);
227
228    let inner = Arc::new(Pipe {
229        head: AtomicUsize::new(0),
230        tail: AtomicUsize::new(0),
231        reader: AtomicWaker::new(),
232        writer: AtomicWaker::new(),
233        closed: AtomicBool::new(false),
234        buffer,
235        cap,
236    });
237
238    // Use a random number generator to randomize fair yielding behavior.
239    let mut rng = rng();
240
241    let r = Reader {
242        inner: inner.clone(),
243        head: 0,
244        tail: 0,
245        rng: rng.fork(),
246    };
247
248    let w = Writer {
249        inner,
250        head: 0,
251        tail: 0,
252        zeroed_until: 0,
253        rng,
254    };
255
256    (r, w)
257}
258
259/// The reading side of a pipe.
260///
261/// This type is created by the [`pipe`] function. See its documentation for more details.
262pub struct Reader {
263    /// The inner ring buffer.
264    inner: Arc<Pipe>,
265
266    /// The head index, moved by the reader, in the range `0..2*cap`.
267    ///
268    /// This index always matches `inner.head`.
269    head: usize,
270
271    /// The tail index, moved by the writer, in the range `0..2*cap`.
272    ///
273    /// This index is a snapshot of `index.tail` that might become stale at any point.
274    tail: usize,
275
276    /// Random number generator.
277    rng: fastrand::Rng,
278}
279
280/// The writing side of a pipe.
281///
282/// This type is created by the [`pipe`] function. See its documentation for more details.
283pub struct Writer {
284    /// The inner ring buffer.
285    inner: Arc<Pipe>,
286
287    /// The head index, moved by the reader, in the range `0..2*cap`.
288    ///
289    /// This index is a snapshot of `index.head` that might become stale at any point.
290    head: usize,
291
292    /// The tail index, moved by the writer, in the range `0..2*cap`.
293    ///
294    /// This index always matches `inner.tail`.
295    tail: usize,
296
297    /// How many bytes at the beginning of the buffer have been zeroed.
298    ///
299    /// The pipe allocates an uninitialized buffer, and we must be careful about passing
300    /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
301    /// expensive, so we zero it in smaller chunks as the writer makes progress.
302    zeroed_until: usize,
303
304    /// Random number generator.
305    rng: fastrand::Rng,
306}
307
308/// The inner ring buffer.
309///
310/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
311/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
312///
313/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
314/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
315/// could mean the pipe is either empty or full, but we don't know which!
316struct Pipe {
317    /// The head index, moved by the reader, in the range `0..2*cap`.
318    head: AtomicUsize,
319
320    /// The tail index, moved by the writer, in the range `0..2*cap`.
321    tail: AtomicUsize,
322
323    /// A waker representing the blocked reader.
324    reader: AtomicWaker,
325
326    /// A waker representing the blocked writer.
327    writer: AtomicWaker,
328
329    /// Set to `true` if the reader or writer was dropped.
330    closed: AtomicBool,
331
332    /// The byte buffer.
333    buffer: *mut u8,
334
335    /// The buffer capacity.
336    cap: usize,
337}
338
339unsafe impl Sync for Pipe {}
340unsafe impl Send for Pipe {}
341
342impl Drop for Pipe {
343    fn drop(&mut self) {
344        // Deallocate the byte buffer.
345        unsafe {
346            Vec::from_raw_parts(self.buffer, 0, self.cap);
347        }
348    }
349}
350
351impl Drop for Reader {
352    fn drop(&mut self) {
353        // Dropping closes the pipe and then wakes the writer.
354        self.inner.closed.store(true, Ordering::SeqCst);
355        self.inner.writer.wake();
356    }
357}
358
359impl Drop for Writer {
360    fn drop(&mut self) {
361        // Dropping closes the pipe and then wakes the reader.
362        self.inner.closed.store(true, Ordering::SeqCst);
363        self.inner.reader.wake();
364    }
365}
366
367impl Pipe {
368    /// Get the length of the data in the pipe.
369    fn len(&self) -> usize {
370        let head = self.head.load(Ordering::Acquire);
371        let tail = self.tail.load(Ordering::Acquire);
372
373        if head <= tail {
374            tail - head
375        } else {
376            (2 * self.cap) - (head - tail)
377        }
378    }
379}
380
381impl Reader {
382    /// Gets the total length of the data in the pipe.
383    ///
384    /// This method returns the number of bytes that have been written into the pipe but haven't been
385    /// read yet.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// let (mut reader, mut writer) = piper::pipe(10);
391    /// let _ = writer.try_fill(&[0u8; 5]);
392    /// assert_eq!(reader.len(), 5);
393    /// ```
394    pub fn len(&self) -> usize {
395        self.inner.len()
396    }
397
398    /// Tell whether or not the pipe is empty.
399    ///
400    /// This method returns `true` if the pipe is empty, and `false` otherwise.
401    ///
402    /// # Examples
403    ///
404    /// ```
405    /// let (mut reader, mut writer) = piper::pipe(10);
406    /// assert!(reader.is_empty());
407    /// let _ = writer.try_fill(&[0u8; 5]);
408    /// assert!(!reader.is_empty());
409    /// ```
410    pub fn is_empty(&self) -> bool {
411        self.inner.len() == 0
412    }
413
414    /// Gets the total capacity of the pipe.
415    ///
416    /// This method returns the number of bytes that the pipe can hold at a time.
417    ///
418    /// # Examples
419    ///
420    /// ```
421    /// # futures_lite::future::block_on(async {
422    /// let (reader, _) = piper::pipe(10);
423    /// assert_eq!(reader.capacity(), 10);
424    /// # });
425    /// ```
426    pub fn capacity(&self) -> usize {
427        self.inner.cap
428    }
429
430    /// Tell whether or not the pipe is full.
431    ///
432    /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
433    /// writes will block until some data is read from the pipe.
434    ///
435    /// This method returns `true` if the pipe is full, and `false` otherwise.
436    ///
437    /// # Examples
438    ///
439    /// ```
440    /// let (mut reader, mut writer) = piper::pipe(10);
441    /// assert!(!reader.is_full());
442    /// let _ = writer.try_fill(&[0u8; 10]);
443    /// assert!(reader.is_full());
444    /// let _ = reader.try_drain(&mut [0u8; 5]);
445    /// assert!(!reader.is_full());
446    /// ```
447    pub fn is_full(&self) -> bool {
448        self.inner.len() == self.inner.cap
449    }
450
451    /// Tell whether or not the pipe is closed.
452    ///
453    /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
454    /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
455    /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// # futures_lite::future::block_on(async {
461    /// let (mut reader, mut writer) = piper::pipe(10);
462    /// assert!(!reader.is_closed());
463    /// drop(writer);
464    /// assert!(reader.is_closed());
465    /// # });
466    /// ```
467    pub fn is_closed(&self) -> bool {
468        self.inner.closed.load(Ordering::SeqCst)
469    }
470
471    /// Reads bytes from this reader and writes into blocking `dest`.
472    ///
473    /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy,
474    /// but it may block the thread if `dest` blocks.
475    ///
476    /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method
477    /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`.
478    /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written.
479    ///
480    /// This method is only available when the `std` feature is enabled. For `no_std` environments,
481    /// consider using [`poll_drain_bytes`] instead.
482    ///
483    /// [`poll_drain_bytes`]: #method.poll_drain_bytes
484    ///
485    /// # Examples
486    ///
487    /// ```
488    /// use futures_lite::{future, prelude::*};
489    /// # future::block_on(async {
490    ///
491    /// let (mut r, mut w) = piper::pipe(1024);
492    ///
493    /// // Write some data to the pipe.
494    /// w.write_all(b"hello world").await.unwrap();
495    ///
496    /// // Try reading from the pipe.
497    /// let mut buf = [0; 1024];
498    /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap();
499    ///
500    /// // The data was written to the buffer.
501    /// assert_eq!(&buf[..n], b"hello world");
502    /// # });
503    /// ```
504    #[cfg(feature = "std")]
505    pub fn poll_drain(
506        &mut self,
507        cx: &mut Context<'_>,
508        dest: impl Write,
509    ) -> Poll<io::Result<usize>> {
510        self.drain_inner(Some(cx), dest)
511    }
512
513    /// Reads bytes from this reader.
514    ///
515    /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into.
516    /// Because of this, it is infallible and can be used in `no_std` environments.
517    ///
518    /// The same conditions that apply to [`poll_drain`] apply to this method.
519    ///
520    /// [`poll_drain`]: #method.poll_drain
521    ///
522    /// # Examples
523    ///
524    /// ```
525    /// use futures_lite::{future, prelude::*};
526    /// # future::block_on(async {
527    /// let (mut r, mut w) = piper::pipe(1024);
528    ///
529    /// // Write some data to the pipe.
530    /// w.write_all(b"hello world").await.unwrap();
531    ///
532    /// // Try reading from the pipe.
533    /// let mut buf = [0; 1024];
534    /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await;
535    ///
536    /// // The data was written to the buffer.
537    /// assert_eq!(&buf[..n], b"hello world");
538    /// # });
539    /// ```
540    pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
541        match self.drain_inner(Some(cx), WriteBytes(dest)) {
542            Poll::Ready(Ok(n)) => Poll::Ready(n),
543            Poll::Ready(Err(e)) => match e {},
544            Poll::Pending => Poll::Pending,
545        }
546    }
547
548    /// Tries to read bytes from this reader.
549    ///
550    /// Returns the total number of bytes that were read from this reader.
551    ///
552    /// # Examples
553    ///
554    /// ```
555    /// let (mut r, mut w) = piper::pipe(1024);
556    ///
557    /// // `try_drain()` returns 0 off the bat.
558    /// let mut buf = [0; 10];
559    /// assert_eq!(r.try_drain(&mut buf), 0);
560    ///
561    /// // After a write it returns the data.
562    /// w.try_fill(&[0, 1, 2, 3, 4]);
563    /// assert_eq!(r.try_drain(&mut buf), 5);
564    /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
565    /// ```
566    pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
567        match self.drain_inner(None, WriteBytes(dest)) {
568            Poll::Ready(Ok(n)) => n,
569            Poll::Ready(Err(e)) => match e {},
570            Poll::Pending => 0,
571        }
572    }
573
574    /// Reads bytes from this reader and writes into blocking `dest`.
575    #[inline]
576    fn drain_inner<W: WriteLike>(
577        &mut self,
578        mut cx: Option<&mut Context<'_>>,
579        mut dest: W,
580    ) -> Poll<Result<usize, W::Error>> {
581        let cap = self.inner.cap;
582
583        // Calculates the distance between two indices.
584        let distance = |a: usize, b: usize| {
585            if a <= b {
586                b - a
587            } else {
588                2 * cap - (a - b)
589            }
590        };
591
592        // If the pipe appears to be empty...
593        if distance(self.head, self.tail) == 0 {
594            // Reload the tail in case it's become stale.
595            self.tail = self.inner.tail.load(Ordering::Acquire);
596
597            // If the pipe is now really empty...
598            if distance(self.head, self.tail) == 0 {
599                // Register the waker.
600                if let Some(cx) = cx.as_mut() {
601                    self.inner.reader.register(cx.waker());
602                }
603                atomic::fence(Ordering::SeqCst);
604
605                // Reload the tail after registering the waker.
606                self.tail = self.inner.tail.load(Ordering::Acquire);
607
608                // If the pipe is still empty...
609                if distance(self.head, self.tail) == 0 {
610                    // Check whether the pipe is closed or just empty.
611                    if self.inner.closed.load(Ordering::Relaxed) {
612                        return Poll::Ready(Ok(0));
613                    } else {
614                        return Poll::Pending;
615                    }
616                }
617            }
618        }
619
620        // The pipe is not empty so remove the waker.
621        self.inner.reader.take();
622
623        // Yield with some small probability - this improves fairness.
624        if let Some(cx) = cx {
625            ready!(maybe_yield(&mut self.rng, cx));
626        }
627
628        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
629        let real_index = |i: usize| {
630            if i < cap {
631                i
632            } else {
633                i - cap
634            }
635        };
636
637        // Number of bytes read so far.
638        let mut count = 0;
639
640        loop {
641            // Calculate how many bytes to read in this iteration.
642            let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
643                .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
644                .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
645
646            // Create a slice of data in the pipe buffer.
647            let pipe_slice =
648                unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
649
650            // Copy bytes from the pipe buffer into `dest`.
651            let n = dest.write(pipe_slice)?;
652            count += n;
653
654            // If pipe is empty or `dest` is full, return.
655            if n == 0 {
656                return Poll::Ready(Ok(count));
657            }
658
659            // Move the head forward.
660            if self.head + n < 2 * cap {
661                self.head += n;
662            } else {
663                self.head = 0;
664            }
665
666            // Store the current head index.
667            self.inner.head.store(self.head, Ordering::Release);
668
669            // Wake the writer because the pipe is not full.
670            self.inner.writer.wake();
671        }
672    }
673}
674
675#[cfg(feature = "std")]
676impl AsyncRead for Reader {
677    fn poll_read(
678        mut self: Pin<&mut Self>,
679        cx: &mut Context<'_>,
680        buf: &mut [u8],
681    ) -> Poll<io::Result<usize>> {
682        self.poll_drain_bytes(cx, buf).map(Ok)
683    }
684}
685
686impl Writer {
687    /// Gets the total length of the data in the pipe.
688    ///
689    /// This method returns the number of bytes that have been written into the pipe but haven't been
690    /// read yet.
691    ///
692    /// # Examples
693    ///
694    /// ```
695    /// let (_reader, mut writer) = piper::pipe(10);
696    /// let _ = writer.try_fill(&[0u8; 5]);
697    /// assert_eq!(writer.len(), 5);
698    /// ```
699    pub fn len(&self) -> usize {
700        self.inner.len()
701    }
702
703    /// Tell whether or not the pipe is empty.
704    ///
705    /// This method returns `true` if the pipe is empty, and `false` otherwise.
706    ///
707    /// # Examples
708    ///
709    /// ```
710    /// let (_reader, mut writer) = piper::pipe(10);
711    /// assert!(writer.is_empty());
712    /// let _ = writer.try_fill(&[0u8; 5]);
713    /// assert!(!writer.is_empty());
714    /// ```
715    pub fn is_empty(&self) -> bool {
716        self.inner.len() == 0
717    }
718
719    /// Gets the total capacity of the pipe.
720    ///
721    /// This method returns the number of bytes that the pipe can hold at a time.
722    ///
723    /// # Examples
724    ///
725    /// ```
726    /// # futures_lite::future::block_on(async {
727    /// let (_, writer) = piper::pipe(10);
728    /// assert_eq!(writer.capacity(), 10);
729    /// # });
730    /// ```
731    pub fn capacity(&self) -> usize {
732        self.inner.cap
733    }
734
735    /// Tell whether or not the pipe is full.
736    ///
737    /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
738    /// writes will block until some data is read from the pipe.
739    ///
740    /// This method returns `true` if the pipe is full, and `false` otherwise.
741    ///
742    /// # Examples
743    ///
744    /// ```
745    /// let (mut reader, mut writer) = piper::pipe(10);
746    /// assert!(!writer.is_full());
747    /// let _ = writer.try_fill(&[0u8; 10]);
748    /// assert!(writer.is_full());
749    /// let _ = reader.try_drain(&mut [0u8; 5]);
750    /// assert!(!writer.is_full());
751    /// ```
752    pub fn is_full(&self) -> bool {
753        self.inner.len() == self.inner.cap
754    }
755
756    /// Tell whether or not the pipe is closed.
757    ///
758    /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
759    /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
760    /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
761    ///
762    /// # Examples
763    ///
764    /// ```
765    /// # futures_lite::future::block_on(async {
766    /// let (reader, writer) = piper::pipe(10);
767    /// assert!(!writer.is_closed());
768    /// drop(reader);
769    /// assert!(writer.is_closed());
770    /// # });
771    /// ```
772    pub fn is_closed(&self) -> bool {
773        self.inner.closed.load(Ordering::SeqCst)
774    }
775
776    /// Reads bytes from blocking `src` and writes into this writer.
777    ///
778    /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy,
779    /// but it may block the thread if `src` blocks.
780    ///
781    /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method
782    /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`.
783    /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read.
784    ///
785    /// This method is only available when the `std` feature is enabled. For `no_std` environments,
786    /// consider using [`poll_fill_bytes`] instead.
787    ///
788    /// [`poll_fill_bytes`]: #method.poll_fill_bytes
789    ///
790    /// # Examples
791    ///
792    /// ```
793    /// use futures_lite::{future, prelude::*};
794    /// # future::block_on(async {
795    ///
796    /// // Create a pipe.
797    /// let (mut reader, mut writer) = piper::pipe(1024);
798    ///
799    /// // Fill the pipe with some bytes.
800    /// let data = b"hello world";
801    /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap();
802    /// assert_eq!(n, data.len());
803    ///
804    /// // Read the bytes back.
805    /// let mut buf = [0; 1024];
806    /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
807    /// assert_eq!(&buf[..data.len()], data);
808    /// # });
809    /// ```
810    #[cfg(feature = "std")]
811    pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
812        self.fill_inner(Some(cx), src)
813    }
814
815    /// Writes bytes into this writer.
816    ///
817    /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from.
818    /// Because of this, it is infallible and can be used in `no_std` environments.
819    ///
820    /// The same conditions that apply to [`poll_fill`] apply to this method.
821    ///
822    /// [`poll_fill`]: #method.poll_fill
823    ///
824    /// # Examples
825    ///
826    /// ```
827    /// use futures_lite::{future, prelude::*};
828    /// # future::block_on(async {
829    ///
830    /// // Create a pipe.
831    /// let (mut reader, mut writer) = piper::pipe(1024);
832    ///
833    /// // Fill the pipe with some bytes.
834    /// let data = b"hello world";
835    /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await;
836    /// assert_eq!(n, data.len());
837    ///
838    /// // Read the bytes back.
839    /// let mut buf = [0; 1024];
840    /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
841    /// assert_eq!(&buf[..data.len()], data);
842    /// # });
843    /// ```
844    pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
845        match self.fill_inner(Some(cx), ReadBytes(bytes)) {
846            Poll::Ready(Ok(n)) => Poll::Ready(n),
847            Poll::Ready(Err(e)) => match e {},
848            Poll::Pending => Poll::Pending,
849        }
850    }
851
852    /// Tries to write bytes to this writer.
853    ///
854    /// Returns the total number of bytes that were read from this reader.
855    ///
856    /// # Examples
857    ///
858    /// ```
859    /// let (mut r, mut w) = piper::pipe(1024);
860    ///
861    /// let mut buf = [0; 10];
862    /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5);
863    /// assert_eq!(r.try_drain(&mut buf), 5);
864    /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
865    /// ```
866    pub fn try_fill(&mut self, dest: &[u8]) -> usize {
867        match self.fill_inner(None, ReadBytes(dest)) {
868            Poll::Ready(Ok(n)) => n,
869            Poll::Ready(Err(e)) => match e {},
870            Poll::Pending => 0,
871        }
872    }
873
874    /// Reads bytes from blocking `src` and writes into this writer.
875    #[inline]
876    fn fill_inner<R: ReadLike>(
877        &mut self,
878        mut cx: Option<&mut Context<'_>>,
879        mut src: R,
880    ) -> Poll<Result<usize, R::Error>> {
881        // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
882        if self.inner.closed.load(Ordering::Relaxed) {
883            return Poll::Ready(Ok(0));
884        }
885
886        // Calculates the distance between two indices.
887        let cap = self.inner.cap;
888        let distance = |a: usize, b: usize| {
889            if a <= b {
890                b - a
891            } else {
892                2 * cap - (a - b)
893            }
894        };
895
896        // If the pipe appears to be full...
897        if distance(self.head, self.tail) == cap {
898            // Reload the head in case it's become stale.
899            self.head = self.inner.head.load(Ordering::Acquire);
900
901            // If the pipe is now really empty...
902            if distance(self.head, self.tail) == cap {
903                // Register the waker.
904                if let Some(cx) = cx.as_mut() {
905                    self.inner.writer.register(cx.waker());
906                }
907                atomic::fence(Ordering::SeqCst);
908
909                // Reload the head after registering the waker.
910                self.head = self.inner.head.load(Ordering::Acquire);
911
912                // If the pipe is still full...
913                if distance(self.head, self.tail) == cap {
914                    // Check whether the pipe is closed or just full.
915                    if self.inner.closed.load(Ordering::Relaxed) {
916                        return Poll::Ready(Ok(0));
917                    } else {
918                        return Poll::Pending;
919                    }
920                }
921            }
922        }
923
924        // The pipe is not full so remove the waker.
925        self.inner.writer.take();
926
927        // Yield with some small probability - this improves fairness.
928        if let Some(cx) = cx {
929            ready!(maybe_yield(&mut self.rng, cx));
930        }
931
932        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
933        let real_index = |i: usize| {
934            if i < cap {
935                i
936            } else {
937                i - cap
938            }
939        };
940
941        // Number of bytes written so far.
942        let mut count = 0;
943
944        loop {
945            // Calculate how many bytes to write in this iteration.
946            let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
947                .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
948                .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
949                .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
950
951            // Create a slice of available space in the pipe buffer.
952            let pipe_slice_mut = unsafe {
953                let from = real_index(self.tail);
954                let to = from + n;
955
956                // Make sure all bytes in the slice are initialized.
957                if self.zeroed_until < to {
958                    self.inner
959                        .buffer
960                        .add(self.zeroed_until)
961                        .write_bytes(0u8, to - self.zeroed_until);
962                    self.zeroed_until = to;
963                }
964
965                slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
966            };
967
968            // Copy bytes from `src` into the piper buffer.
969            let n = src.read(pipe_slice_mut)?;
970            count += n;
971
972            // If the pipe is full or closed, or `src` is empty, return.
973            if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
974                return Poll::Ready(Ok(count));
975            }
976
977            // Move the tail forward.
978            if self.tail + n < 2 * cap {
979                self.tail += n;
980            } else {
981                self.tail = 0;
982            }
983
984            // Store the current tail index.
985            self.inner.tail.store(self.tail, Ordering::Release);
986
987            // Wake the reader because the pipe is not empty.
988            self.inner.reader.wake();
989        }
990    }
991}
992
993#[cfg(feature = "std")]
994impl AsyncWrite for Writer {
995    fn poll_write(
996        mut self: Pin<&mut Self>,
997        cx: &mut Context<'_>,
998        buf: &[u8],
999    ) -> Poll<io::Result<usize>> {
1000        self.poll_fill_bytes(cx, buf).map(Ok)
1001    }
1002
1003    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1004        // Nothing to flush.
1005        Poll::Ready(Ok(()))
1006    }
1007
1008    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1009        // Set the closed flag.
1010        self.inner.closed.store(true, Ordering::Release);
1011
1012        // Wake up any tasks that may be waiting on the pipe.
1013        self.inner.reader.wake();
1014        self.inner.writer.wake();
1015
1016        // The pipe is now closed.
1017        Poll::Ready(Ok(()))
1018    }
1019}
1020
1021/// A trait for reading bytes into a pipe.
1022trait ReadLike {
1023    /// The error type.
1024    type Error;
1025
1026    /// Reads bytes into the given buffer.
1027    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
1028}
1029
1030#[cfg(feature = "std")]
1031impl<R: Read> ReadLike for R {
1032    type Error = io::Error;
1033
1034    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1035        Read::read(self, buf)
1036    }
1037}
1038
1039/// Implements `no_std` reading around a byte slice.
1040struct ReadBytes<'a>(&'a [u8]);
1041
1042impl ReadLike for ReadBytes<'_> {
1043    type Error = Infallible;
1044
1045    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1046        let n = self.0.len().min(buf.len());
1047        buf[..n].copy_from_slice(&self.0[..n]);
1048        self.0 = &self.0[n..];
1049        Ok(n)
1050    }
1051}
1052
1053/// A trait for writing bytes from a pipe.
1054trait WriteLike {
1055    /// The error type.
1056    type Error;
1057
1058    /// Writes bytes from the given buffer.
1059    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
1060}
1061
1062#[cfg(feature = "std")]
1063impl<W: Write> WriteLike for W {
1064    type Error = io::Error;
1065
1066    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1067        Write::write(self, buf)
1068    }
1069}
1070
1071/// Implements `no_std` writing around a byte slice.
1072struct WriteBytes<'a>(&'a mut [u8]);
1073
1074impl WriteLike for WriteBytes<'_> {
1075    type Error = Infallible;
1076
1077    fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1078        let n = self.0.len().min(buf.len());
1079        self.0[..n].copy_from_slice(&buf[..n]);
1080
1081        // mem::take() is not available on 1.36
1082        #[allow(clippy::mem_replace_with_default)]
1083        {
1084            let slice = mem::replace(&mut self.0, &mut []);
1085            self.0 = &mut slice[n..];
1086        }
1087
1088        Ok(n)
1089    }
1090}
1091
1092/// Yield with some small probability.
1093fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
1094    if rng.usize(..100) == 0 {
1095        cx.waker().wake_by_ref();
1096        Poll::Pending
1097    } else {
1098        Poll::Ready(())
1099    }
1100}
1101
1102/// Get a random number generator.
1103#[cfg(feature = "std")]
1104#[inline]
1105fn rng() -> fastrand::Rng {
1106    fastrand::Rng::new()
1107}
1108
1109/// Get a random number generator.
1110///
1111/// This uses a fixed seed due to the lack of a good RNG in `no_std` environments.
1112#[cfg(not(feature = "std"))]
1113#[inline]
1114fn rng() -> fastrand::Rng {
1115    // Chosen by fair roll of the dice.
1116    fastrand::Rng::with_seed(0x7e9b496634c97ec6)
1117}
1118
1119/// ```
1120/// use piper::{Reader, Writer};
1121/// fn _send_sync<T: Send + Sync>() {}
1122/// _send_sync::<Reader>();
1123/// _send_sync::<Writer>();
1124/// ```
1125fn _assert_send_sync() {}
1126
1127mod sync {
1128    #[cfg(not(feature = "portable-atomic"))]
1129    pub use core::sync::atomic;
1130
1131    #[cfg(not(feature = "portable-atomic"))]
1132    pub use alloc::sync::Arc;
1133
1134    #[cfg(feature = "portable-atomic")]
1135    pub use portable_atomic_crate as atomic;
1136
1137    #[cfg(feature = "portable-atomic")]
1138    pub use portable_atomic_util::Arc;
1139}