piper/lib.rs
1//! A bounded single-producer single-consumer pipe.
2//!
3//! This crate provides a ring buffer that can be asynchronously read from and written to. It is
4//! created via the [`pipe`] function, which returns a pair of [`Reader`] and [`Writer`] handles.
5//! They implement the [`AsyncRead`] and [`AsyncWrite`] traits, respectively.
6//!
7//! The handles are single-producer/single-consumer; to clarify, they cannot be cloned and need `&mut`
8//! access to read or write to them. If multiple-producer/multiple-consumer handles are needed,
9//! consider wrapping them in an `Arc<Mutex<...>>` or similar.
10//!
11//! When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
12//! to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
13//!
14//! When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
15//! Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
16//!
17//! # Version 0.2.0 Notes
18//!
19//! Previously, this crate contained other synchronization primitives, such as bounded channels, locks,
20//! and event listeners. These have been split out into their own crates:
21//!
22//! - [`async-channel`](https://docs.rs/async-channel)
23//! - [`async-dup`](https://docs.rs/async-dup)
24//! - [`async-lock`](https://docs.rs/async-lock)
25//! - [`async-mutex`](https://docs.rs/async-mutex)
26//! - [`event-listener`](https://docs.rs/event-listener)
27//!
28//! # Examples
29//!
30//! ## Asynchronous Tasks
31//!
32//! Communicate between asynchronous tasks, potentially on other threads.
33//!
34//! ```
35//! use async_channel::unbounded;
36//! use async_executor::Executor;
37//! use easy_parallel::Parallel;
38//! use futures_lite::{future, prelude::*};
39//! use std::time::Duration;
40//!
41//! # if cfg!(miri) { return; }
42//!
43//! // Create a pair of handles.
44//! let (mut reader, mut writer) = piper::pipe(1024);
45//!
46//! // Create the executor.
47//! let ex = Executor::new();
48//! let (signal, shutdown) = unbounded::<()>();
49//!
50//! // Spawn a detached task for random data to the pipe.
51//! let writer = ex.spawn(async move {
52//! for _ in 0..1_000 {
53//! // Generate 8 random numnbers.
54//! let random = fastrand::u64(..).to_le_bytes();
55//!
56//! // Write them to the pipe.
57//! writer.write_all(&random).await.unwrap();
58//!
59//! // Wait a bit.
60//! async_io::Timer::after(Duration::from_millis(5)).await;
61//! }
62//!
63//! // Drop the writer to close the pipe.
64//! drop(writer);
65//! });
66//!
67//! // Detach the task so that it runs in the background.
68//! writer.detach();
69//!
70//! // Spawn a task for reading from the pipe.
71//! let reader = ex.spawn(async move {
72//! let mut buf = vec![];
73//!
74//! // Read all bytes from the pipe.
75//! reader.read_to_end(&mut buf).await.unwrap();
76//!
77//! println!("Random data: {:#?}", buf);
78//! });
79//!
80//! Parallel::new()
81//! // Run four executor threads.
82//! .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
83//! // Run the main future on the current thread.
84//! .finish(|| future::block_on(async {
85//! // Wait for the reader to finish.
86//! reader.await;
87//!
88//! // Signal the executor threads to shut down.
89//! drop(signal);
90//! }));
91//! ```
92//!
93//! ## Blocking I/O
94//!
95//! File I/O is blocking; therefore, in `async` code, you must run it on another thread. This example
96//! spawns another thread for reading a file and writing it to a pipe.
97//!
98//! ```no_run
99//! use futures_lite::{future, prelude::*};
100//! use std::fs::File;
101//! use std::io::prelude::*;
102//! use std::thread;
103//!
104//! // Create a pair of handles.
105//! let (mut r, mut w) = piper::pipe(1024);
106//!
107//! // Spawn a thread for reading a file.
108//! thread::spawn(move || {
109//! let mut file = File::open("Cargo.toml").unwrap();
110//!
111//! // Read the file into a buffer.
112//! let mut buf = [0u8; 16384];
113//! future::block_on(async move {
114//! loop {
115//! // Read a chunk of bytes from the file.
116//! // Blocking is okay here, since this is a separate thread.
117//! let n = file.read(&mut buf).unwrap();
118//! if n == 0 {
119//! break;
120//! }
121//!
122//! // Write the chunk to the pipe.
123//! w.write_all(&buf[..n]).await.unwrap();
124//! }
125//!
126//! // Close the pipe.
127//! drop(w);
128//! });
129//! });
130//!
131//! # future::block_on(async move {
132//! // Read bytes from the pipe.
133//! let mut buf = vec![];
134//! r.read_to_end(&mut buf).await.unwrap();
135//!
136//! println!("Read {} bytes", buf.len());
137//! # });
138//! ```
139//!
140//! However, the lower-level [`poll_fill`] and [`poll_drain`] methods take `impl Read` and `impl Write`
141//! arguments, respectively. This allows you to skip the buffer entirely and read/write directly from
142//! the file into the pipe. This approach should be preferred when possible, as it avoids an extra
143//! copy.
144//!
145//! ```no_run
146//! # use futures_lite::future;
147//! # use std::fs::File;
148//! # let mut file: File = unimplemented!();
149//! # let mut w: piper::Writer = unimplemented!();
150//! // In the `future::block_on` call above...
151//! # future::block_on(async move {
152//! loop {
153//! let n = future::poll_fn(|cx| w.poll_fill(cx, &mut file)).await.unwrap();
154//! if n == 0 {
155//! break;
156//! }
157//! }
158//! # });
159//! ```
160//!
161//! The [`blocking`] crate is preferred in this use case, since it uses more efficient strategies for
162//! thread management and pipes.
163//!
164//! [`poll_fill`]: struct.Writer.html#method.poll_fill
165//! [`poll_drain`]: struct.Reader.html#method.poll_drain
166//! [`blocking`]: https://docs.rs/blocking
167
168#![cfg_attr(not(feature = "std"), no_std)]
169#![forbid(missing_docs)]
170#![doc(
171 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
172)]
173#![doc(
174 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
175)]
176
177extern crate alloc;
178
179use core::convert::Infallible;
180use core::mem;
181use core::slice;
182use core::task::{Context, Poll};
183
184use alloc::vec::Vec;
185
186use sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
187use sync::Arc;
188
189#[cfg(feature = "std")]
190use std::{
191 io::{self, Read, Write},
192 pin::Pin,
193};
194
195use atomic_waker::AtomicWaker;
196
197#[cfg(feature = "std")]
198use futures_io::{AsyncRead, AsyncWrite};
199
200macro_rules! ready {
201 ($e:expr) => {{
202 match $e {
203 Poll::Ready(t) => t,
204 Poll::Pending => return Poll::Pending,
205 }
206 }};
207}
208
209/// Creates a bounded single-producer single-consumer pipe.
210///
211/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
212///
213/// See the [crate-level documentation](index.html) for more details.
214///
215/// # Panics
216///
217/// This function panics if `cap` is 0 or if `cap * 2` overflows a `usize`.
218#[allow(clippy::incompatible_msrv)] // false positive: https://github.com/rust-lang/rust-clippy/issues/12280
219pub fn pipe(cap: usize) -> (Reader, Writer) {
220 assert!(cap > 0, "capacity must be positive");
221 assert!(cap.checked_mul(2).is_some(), "capacity is too large");
222
223 // Allocate the ring buffer.
224 let mut v = Vec::with_capacity(cap);
225 let buffer = v.as_mut_ptr();
226 mem::forget(v);
227
228 let inner = Arc::new(Pipe {
229 head: AtomicUsize::new(0),
230 tail: AtomicUsize::new(0),
231 reader: AtomicWaker::new(),
232 writer: AtomicWaker::new(),
233 closed: AtomicBool::new(false),
234 buffer,
235 cap,
236 });
237
238 // Use a random number generator to randomize fair yielding behavior.
239 let mut rng = rng();
240
241 let r = Reader {
242 inner: inner.clone(),
243 head: 0,
244 tail: 0,
245 rng: rng.fork(),
246 };
247
248 let w = Writer {
249 inner,
250 head: 0,
251 tail: 0,
252 zeroed_until: 0,
253 rng,
254 };
255
256 (r, w)
257}
258
259/// The reading side of a pipe.
260///
261/// This type is created by the [`pipe`] function. See its documentation for more details.
262pub struct Reader {
263 /// The inner ring buffer.
264 inner: Arc<Pipe>,
265
266 /// The head index, moved by the reader, in the range `0..2*cap`.
267 ///
268 /// This index always matches `inner.head`.
269 head: usize,
270
271 /// The tail index, moved by the writer, in the range `0..2*cap`.
272 ///
273 /// This index is a snapshot of `index.tail` that might become stale at any point.
274 tail: usize,
275
276 /// Random number generator.
277 rng: fastrand::Rng,
278}
279
280/// The writing side of a pipe.
281///
282/// This type is created by the [`pipe`] function. See its documentation for more details.
283pub struct Writer {
284 /// The inner ring buffer.
285 inner: Arc<Pipe>,
286
287 /// The head index, moved by the reader, in the range `0..2*cap`.
288 ///
289 /// This index is a snapshot of `index.head` that might become stale at any point.
290 head: usize,
291
292 /// The tail index, moved by the writer, in the range `0..2*cap`.
293 ///
294 /// This index always matches `inner.tail`.
295 tail: usize,
296
297 /// How many bytes at the beginning of the buffer have been zeroed.
298 ///
299 /// The pipe allocates an uninitialized buffer, and we must be careful about passing
300 /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
301 /// expensive, so we zero it in smaller chunks as the writer makes progress.
302 zeroed_until: usize,
303
304 /// Random number generator.
305 rng: fastrand::Rng,
306}
307
308/// The inner ring buffer.
309///
310/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
311/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
312///
313/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
314/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
315/// could mean the pipe is either empty or full, but we don't know which!
316struct Pipe {
317 /// The head index, moved by the reader, in the range `0..2*cap`.
318 head: AtomicUsize,
319
320 /// The tail index, moved by the writer, in the range `0..2*cap`.
321 tail: AtomicUsize,
322
323 /// A waker representing the blocked reader.
324 reader: AtomicWaker,
325
326 /// A waker representing the blocked writer.
327 writer: AtomicWaker,
328
329 /// Set to `true` if the reader or writer was dropped.
330 closed: AtomicBool,
331
332 /// The byte buffer.
333 buffer: *mut u8,
334
335 /// The buffer capacity.
336 cap: usize,
337}
338
339unsafe impl Sync for Pipe {}
340unsafe impl Send for Pipe {}
341
342impl Drop for Pipe {
343 fn drop(&mut self) {
344 // Deallocate the byte buffer.
345 unsafe {
346 Vec::from_raw_parts(self.buffer, 0, self.cap);
347 }
348 }
349}
350
351impl Drop for Reader {
352 fn drop(&mut self) {
353 // Dropping closes the pipe and then wakes the writer.
354 self.inner.closed.store(true, Ordering::SeqCst);
355 self.inner.writer.wake();
356 }
357}
358
359impl Drop for Writer {
360 fn drop(&mut self) {
361 // Dropping closes the pipe and then wakes the reader.
362 self.inner.closed.store(true, Ordering::SeqCst);
363 self.inner.reader.wake();
364 }
365}
366
367impl Pipe {
368 /// Get the length of the data in the pipe.
369 fn len(&self) -> usize {
370 let head = self.head.load(Ordering::Acquire);
371 let tail = self.tail.load(Ordering::Acquire);
372
373 if head <= tail {
374 tail - head
375 } else {
376 (2 * self.cap) - (head - tail)
377 }
378 }
379}
380
381impl Reader {
382 /// Gets the total length of the data in the pipe.
383 ///
384 /// This method returns the number of bytes that have been written into the pipe but haven't been
385 /// read yet.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// let (mut reader, mut writer) = piper::pipe(10);
391 /// let _ = writer.try_fill(&[0u8; 5]);
392 /// assert_eq!(reader.len(), 5);
393 /// ```
394 pub fn len(&self) -> usize {
395 self.inner.len()
396 }
397
398 /// Tell whether or not the pipe is empty.
399 ///
400 /// This method returns `true` if the pipe is empty, and `false` otherwise.
401 ///
402 /// # Examples
403 ///
404 /// ```
405 /// let (mut reader, mut writer) = piper::pipe(10);
406 /// assert!(reader.is_empty());
407 /// let _ = writer.try_fill(&[0u8; 5]);
408 /// assert!(!reader.is_empty());
409 /// ```
410 pub fn is_empty(&self) -> bool {
411 self.inner.len() == 0
412 }
413
414 /// Gets the total capacity of the pipe.
415 ///
416 /// This method returns the number of bytes that the pipe can hold at a time.
417 ///
418 /// # Examples
419 ///
420 /// ```
421 /// # futures_lite::future::block_on(async {
422 /// let (reader, _) = piper::pipe(10);
423 /// assert_eq!(reader.capacity(), 10);
424 /// # });
425 /// ```
426 pub fn capacity(&self) -> usize {
427 self.inner.cap
428 }
429
430 /// Tell whether or not the pipe is full.
431 ///
432 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
433 /// writes will block until some data is read from the pipe.
434 ///
435 /// This method returns `true` if the pipe is full, and `false` otherwise.
436 ///
437 /// # Examples
438 ///
439 /// ```
440 /// let (mut reader, mut writer) = piper::pipe(10);
441 /// assert!(!reader.is_full());
442 /// let _ = writer.try_fill(&[0u8; 10]);
443 /// assert!(reader.is_full());
444 /// let _ = reader.try_drain(&mut [0u8; 5]);
445 /// assert!(!reader.is_full());
446 /// ```
447 pub fn is_full(&self) -> bool {
448 self.inner.len() == self.inner.cap
449 }
450
451 /// Tell whether or not the pipe is closed.
452 ///
453 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
454 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
455 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// # futures_lite::future::block_on(async {
461 /// let (mut reader, mut writer) = piper::pipe(10);
462 /// assert!(!reader.is_closed());
463 /// drop(writer);
464 /// assert!(reader.is_closed());
465 /// # });
466 /// ```
467 pub fn is_closed(&self) -> bool {
468 self.inner.closed.load(Ordering::SeqCst)
469 }
470
471 /// Reads bytes from this reader and writes into blocking `dest`.
472 ///
473 /// This method reads directly from the pipe's internal buffer into `dest`. This avoids an extra copy,
474 /// but it may block the thread if `dest` blocks.
475 ///
476 /// If the pipe is empty, this method returns `Poll::Pending`. If the pipe is closed, this method
477 /// returns `Poll::Ready(Ok(0))`. Errors in `dest` are bubbled up through `Poll::Ready(Err(e))`.
478 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes written.
479 ///
480 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
481 /// consider using [`poll_drain_bytes`] instead.
482 ///
483 /// [`poll_drain_bytes`]: #method.poll_drain_bytes
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// use futures_lite::{future, prelude::*};
489 /// # future::block_on(async {
490 ///
491 /// let (mut r, mut w) = piper::pipe(1024);
492 ///
493 /// // Write some data to the pipe.
494 /// w.write_all(b"hello world").await.unwrap();
495 ///
496 /// // Try reading from the pipe.
497 /// let mut buf = [0; 1024];
498 /// let n = future::poll_fn(|cx| r.poll_drain(cx, &mut buf[..])).await.unwrap();
499 ///
500 /// // The data was written to the buffer.
501 /// assert_eq!(&buf[..n], b"hello world");
502 /// # });
503 /// ```
504 #[cfg(feature = "std")]
505 pub fn poll_drain(
506 &mut self,
507 cx: &mut Context<'_>,
508 dest: impl Write,
509 ) -> Poll<io::Result<usize>> {
510 self.drain_inner(Some(cx), dest)
511 }
512
513 /// Reads bytes from this reader.
514 ///
515 /// Rather than taking a `Write` trait object, this method takes a slice of bytes to write into.
516 /// Because of this, it is infallible and can be used in `no_std` environments.
517 ///
518 /// The same conditions that apply to [`poll_drain`] apply to this method.
519 ///
520 /// [`poll_drain`]: #method.poll_drain
521 ///
522 /// # Examples
523 ///
524 /// ```
525 /// use futures_lite::{future, prelude::*};
526 /// # future::block_on(async {
527 /// let (mut r, mut w) = piper::pipe(1024);
528 ///
529 /// // Write some data to the pipe.
530 /// w.write_all(b"hello world").await.unwrap();
531 ///
532 /// // Try reading from the pipe.
533 /// let mut buf = [0; 1024];
534 /// let n = future::poll_fn(|cx| r.poll_drain_bytes(cx, &mut buf[..])).await;
535 ///
536 /// // The data was written to the buffer.
537 /// assert_eq!(&buf[..n], b"hello world");
538 /// # });
539 /// ```
540 pub fn poll_drain_bytes(&mut self, cx: &mut Context<'_>, dest: &mut [u8]) -> Poll<usize> {
541 match self.drain_inner(Some(cx), WriteBytes(dest)) {
542 Poll::Ready(Ok(n)) => Poll::Ready(n),
543 Poll::Ready(Err(e)) => match e {},
544 Poll::Pending => Poll::Pending,
545 }
546 }
547
548 /// Tries to read bytes from this reader.
549 ///
550 /// Returns the total number of bytes that were read from this reader.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// let (mut r, mut w) = piper::pipe(1024);
556 ///
557 /// // `try_drain()` returns 0 off the bat.
558 /// let mut buf = [0; 10];
559 /// assert_eq!(r.try_drain(&mut buf), 0);
560 ///
561 /// // After a write it returns the data.
562 /// w.try_fill(&[0, 1, 2, 3, 4]);
563 /// assert_eq!(r.try_drain(&mut buf), 5);
564 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
565 /// ```
566 pub fn try_drain(&mut self, dest: &mut [u8]) -> usize {
567 match self.drain_inner(None, WriteBytes(dest)) {
568 Poll::Ready(Ok(n)) => n,
569 Poll::Ready(Err(e)) => match e {},
570 Poll::Pending => 0,
571 }
572 }
573
574 /// Reads bytes from this reader and writes into blocking `dest`.
575 #[inline]
576 fn drain_inner<W: WriteLike>(
577 &mut self,
578 mut cx: Option<&mut Context<'_>>,
579 mut dest: W,
580 ) -> Poll<Result<usize, W::Error>> {
581 let cap = self.inner.cap;
582
583 // Calculates the distance between two indices.
584 let distance = |a: usize, b: usize| {
585 if a <= b {
586 b - a
587 } else {
588 2 * cap - (a - b)
589 }
590 };
591
592 // If the pipe appears to be empty...
593 if distance(self.head, self.tail) == 0 {
594 // Reload the tail in case it's become stale.
595 self.tail = self.inner.tail.load(Ordering::Acquire);
596
597 // If the pipe is now really empty...
598 if distance(self.head, self.tail) == 0 {
599 // Register the waker.
600 if let Some(cx) = cx.as_mut() {
601 self.inner.reader.register(cx.waker());
602 }
603 atomic::fence(Ordering::SeqCst);
604
605 // Reload the tail after registering the waker.
606 self.tail = self.inner.tail.load(Ordering::Acquire);
607
608 // If the pipe is still empty...
609 if distance(self.head, self.tail) == 0 {
610 // Check whether the pipe is closed or just empty.
611 if self.inner.closed.load(Ordering::Relaxed) {
612 return Poll::Ready(Ok(0));
613 } else {
614 return Poll::Pending;
615 }
616 }
617 }
618 }
619
620 // The pipe is not empty so remove the waker.
621 self.inner.reader.take();
622
623 // Yield with some small probability - this improves fairness.
624 if let Some(cx) = cx {
625 ready!(maybe_yield(&mut self.rng, cx));
626 }
627
628 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
629 let real_index = |i: usize| {
630 if i < cap {
631 i
632 } else {
633 i - cap
634 }
635 };
636
637 // Number of bytes read so far.
638 let mut count = 0;
639
640 loop {
641 // Calculate how many bytes to read in this iteration.
642 let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
643 .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
644 .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
645
646 // Create a slice of data in the pipe buffer.
647 let pipe_slice =
648 unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
649
650 // Copy bytes from the pipe buffer into `dest`.
651 let n = dest.write(pipe_slice)?;
652 count += n;
653
654 // If pipe is empty or `dest` is full, return.
655 if n == 0 {
656 return Poll::Ready(Ok(count));
657 }
658
659 // Move the head forward.
660 if self.head + n < 2 * cap {
661 self.head += n;
662 } else {
663 self.head = 0;
664 }
665
666 // Store the current head index.
667 self.inner.head.store(self.head, Ordering::Release);
668
669 // Wake the writer because the pipe is not full.
670 self.inner.writer.wake();
671 }
672 }
673}
674
675#[cfg(feature = "std")]
676impl AsyncRead for Reader {
677 fn poll_read(
678 mut self: Pin<&mut Self>,
679 cx: &mut Context<'_>,
680 buf: &mut [u8],
681 ) -> Poll<io::Result<usize>> {
682 self.poll_drain_bytes(cx, buf).map(Ok)
683 }
684}
685
686impl Writer {
687 /// Gets the total length of the data in the pipe.
688 ///
689 /// This method returns the number of bytes that have been written into the pipe but haven't been
690 /// read yet.
691 ///
692 /// # Examples
693 ///
694 /// ```
695 /// let (_reader, mut writer) = piper::pipe(10);
696 /// let _ = writer.try_fill(&[0u8; 5]);
697 /// assert_eq!(writer.len(), 5);
698 /// ```
699 pub fn len(&self) -> usize {
700 self.inner.len()
701 }
702
703 /// Tell whether or not the pipe is empty.
704 ///
705 /// This method returns `true` if the pipe is empty, and `false` otherwise.
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// let (_reader, mut writer) = piper::pipe(10);
711 /// assert!(writer.is_empty());
712 /// let _ = writer.try_fill(&[0u8; 5]);
713 /// assert!(!writer.is_empty());
714 /// ```
715 pub fn is_empty(&self) -> bool {
716 self.inner.len() == 0
717 }
718
719 /// Gets the total capacity of the pipe.
720 ///
721 /// This method returns the number of bytes that the pipe can hold at a time.
722 ///
723 /// # Examples
724 ///
725 /// ```
726 /// # futures_lite::future::block_on(async {
727 /// let (_, writer) = piper::pipe(10);
728 /// assert_eq!(writer.capacity(), 10);
729 /// # });
730 /// ```
731 pub fn capacity(&self) -> usize {
732 self.inner.cap
733 }
734
735 /// Tell whether or not the pipe is full.
736 ///
737 /// The pipe is full if the number of bytes written into it is equal to its capacity. At this point,
738 /// writes will block until some data is read from the pipe.
739 ///
740 /// This method returns `true` if the pipe is full, and `false` otherwise.
741 ///
742 /// # Examples
743 ///
744 /// ```
745 /// let (mut reader, mut writer) = piper::pipe(10);
746 /// assert!(!writer.is_full());
747 /// let _ = writer.try_fill(&[0u8; 10]);
748 /// assert!(writer.is_full());
749 /// let _ = reader.try_drain(&mut [0u8; 5]);
750 /// assert!(!writer.is_full());
751 /// ```
752 pub fn is_full(&self) -> bool {
753 self.inner.len() == self.inner.cap
754 }
755
756 /// Tell whether or not the pipe is closed.
757 ///
758 /// The pipe is closed if either the reader or the writer has been dropped. At this point, attempting
759 /// to write into the pipe will return `Poll::Ready(Ok(0))` and attempting to read from the pipe after
760 /// any previously written bytes are read will return `Poll::Ready(Ok(0))`.
761 ///
762 /// # Examples
763 ///
764 /// ```
765 /// # futures_lite::future::block_on(async {
766 /// let (reader, writer) = piper::pipe(10);
767 /// assert!(!writer.is_closed());
768 /// drop(reader);
769 /// assert!(writer.is_closed());
770 /// # });
771 /// ```
772 pub fn is_closed(&self) -> bool {
773 self.inner.closed.load(Ordering::SeqCst)
774 }
775
776 /// Reads bytes from blocking `src` and writes into this writer.
777 ///
778 /// This method writes directly from `src` into the pipe's internal buffer. This avoids an extra copy,
779 /// but it may block the thread if `src` blocks.
780 ///
781 /// If the pipe is full, this method returns `Poll::Pending`. If the pipe is closed, this method
782 /// returns `Poll::Ready(Ok(0))`. Errors in `src` are bubbled up through `Poll::Ready(Err(e))`.
783 /// Otherwise, this method returns `Poll::Ready(Ok(n))` where `n` is the number of bytes read.
784 ///
785 /// This method is only available when the `std` feature is enabled. For `no_std` environments,
786 /// consider using [`poll_fill_bytes`] instead.
787 ///
788 /// [`poll_fill_bytes`]: #method.poll_fill_bytes
789 ///
790 /// # Examples
791 ///
792 /// ```
793 /// use futures_lite::{future, prelude::*};
794 /// # future::block_on(async {
795 ///
796 /// // Create a pipe.
797 /// let (mut reader, mut writer) = piper::pipe(1024);
798 ///
799 /// // Fill the pipe with some bytes.
800 /// let data = b"hello world";
801 /// let n = future::poll_fn(|cx| writer.poll_fill(cx, &data[..])).await.unwrap();
802 /// assert_eq!(n, data.len());
803 ///
804 /// // Read the bytes back.
805 /// let mut buf = [0; 1024];
806 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
807 /// assert_eq!(&buf[..data.len()], data);
808 /// # });
809 /// ```
810 #[cfg(feature = "std")]
811 pub fn poll_fill(&mut self, cx: &mut Context<'_>, src: impl Read) -> Poll<io::Result<usize>> {
812 self.fill_inner(Some(cx), src)
813 }
814
815 /// Writes bytes into this writer.
816 ///
817 /// Rather than taking a `Read` trait object, this method takes a slice of bytes to read from.
818 /// Because of this, it is infallible and can be used in `no_std` environments.
819 ///
820 /// The same conditions that apply to [`poll_fill`] apply to this method.
821 ///
822 /// [`poll_fill`]: #method.poll_fill
823 ///
824 /// # Examples
825 ///
826 /// ```
827 /// use futures_lite::{future, prelude::*};
828 /// # future::block_on(async {
829 ///
830 /// // Create a pipe.
831 /// let (mut reader, mut writer) = piper::pipe(1024);
832 ///
833 /// // Fill the pipe with some bytes.
834 /// let data = b"hello world";
835 /// let n = future::poll_fn(|cx| writer.poll_fill_bytes(cx, &data[..])).await;
836 /// assert_eq!(n, data.len());
837 ///
838 /// // Read the bytes back.
839 /// let mut buf = [0; 1024];
840 /// reader.read_exact(&mut buf[..data.len()]).await.unwrap();
841 /// assert_eq!(&buf[..data.len()], data);
842 /// # });
843 /// ```
844 pub fn poll_fill_bytes(&mut self, cx: &mut Context<'_>, bytes: &[u8]) -> Poll<usize> {
845 match self.fill_inner(Some(cx), ReadBytes(bytes)) {
846 Poll::Ready(Ok(n)) => Poll::Ready(n),
847 Poll::Ready(Err(e)) => match e {},
848 Poll::Pending => Poll::Pending,
849 }
850 }
851
852 /// Tries to write bytes to this writer.
853 ///
854 /// Returns the total number of bytes that were read from this reader.
855 ///
856 /// # Examples
857 ///
858 /// ```
859 /// let (mut r, mut w) = piper::pipe(1024);
860 ///
861 /// let mut buf = [0; 10];
862 /// assert_eq!(w.try_fill(&[0, 1, 2, 3, 4]), 5);
863 /// assert_eq!(r.try_drain(&mut buf), 5);
864 /// assert_eq!(&buf[..5], &[0, 1, 2, 3, 4]);
865 /// ```
866 pub fn try_fill(&mut self, dest: &[u8]) -> usize {
867 match self.fill_inner(None, ReadBytes(dest)) {
868 Poll::Ready(Ok(n)) => n,
869 Poll::Ready(Err(e)) => match e {},
870 Poll::Pending => 0,
871 }
872 }
873
874 /// Reads bytes from blocking `src` and writes into this writer.
875 #[inline]
876 fn fill_inner<R: ReadLike>(
877 &mut self,
878 mut cx: Option<&mut Context<'_>>,
879 mut src: R,
880 ) -> Poll<Result<usize, R::Error>> {
881 // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
882 if self.inner.closed.load(Ordering::Relaxed) {
883 return Poll::Ready(Ok(0));
884 }
885
886 // Calculates the distance between two indices.
887 let cap = self.inner.cap;
888 let distance = |a: usize, b: usize| {
889 if a <= b {
890 b - a
891 } else {
892 2 * cap - (a - b)
893 }
894 };
895
896 // If the pipe appears to be full...
897 if distance(self.head, self.tail) == cap {
898 // Reload the head in case it's become stale.
899 self.head = self.inner.head.load(Ordering::Acquire);
900
901 // If the pipe is now really empty...
902 if distance(self.head, self.tail) == cap {
903 // Register the waker.
904 if let Some(cx) = cx.as_mut() {
905 self.inner.writer.register(cx.waker());
906 }
907 atomic::fence(Ordering::SeqCst);
908
909 // Reload the head after registering the waker.
910 self.head = self.inner.head.load(Ordering::Acquire);
911
912 // If the pipe is still full...
913 if distance(self.head, self.tail) == cap {
914 // Check whether the pipe is closed or just full.
915 if self.inner.closed.load(Ordering::Relaxed) {
916 return Poll::Ready(Ok(0));
917 } else {
918 return Poll::Pending;
919 }
920 }
921 }
922 }
923
924 // The pipe is not full so remove the waker.
925 self.inner.writer.take();
926
927 // Yield with some small probability - this improves fairness.
928 if let Some(cx) = cx {
929 ready!(maybe_yield(&mut self.rng, cx));
930 }
931
932 // Given an index in `0..2*cap`, returns the real index in `0..cap`.
933 let real_index = |i: usize| {
934 if i < cap {
935 i
936 } else {
937 i - cap
938 }
939 };
940
941 // Number of bytes written so far.
942 let mut count = 0;
943
944 loop {
945 // Calculate how many bytes to write in this iteration.
946 let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
947 .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
948 .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
949 .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
950
951 // Create a slice of available space in the pipe buffer.
952 let pipe_slice_mut = unsafe {
953 let from = real_index(self.tail);
954 let to = from + n;
955
956 // Make sure all bytes in the slice are initialized.
957 if self.zeroed_until < to {
958 self.inner
959 .buffer
960 .add(self.zeroed_until)
961 .write_bytes(0u8, to - self.zeroed_until);
962 self.zeroed_until = to;
963 }
964
965 slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
966 };
967
968 // Copy bytes from `src` into the piper buffer.
969 let n = src.read(pipe_slice_mut)?;
970 count += n;
971
972 // If the pipe is full or closed, or `src` is empty, return.
973 if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
974 return Poll::Ready(Ok(count));
975 }
976
977 // Move the tail forward.
978 if self.tail + n < 2 * cap {
979 self.tail += n;
980 } else {
981 self.tail = 0;
982 }
983
984 // Store the current tail index.
985 self.inner.tail.store(self.tail, Ordering::Release);
986
987 // Wake the reader because the pipe is not empty.
988 self.inner.reader.wake();
989 }
990 }
991}
992
993#[cfg(feature = "std")]
994impl AsyncWrite for Writer {
995 fn poll_write(
996 mut self: Pin<&mut Self>,
997 cx: &mut Context<'_>,
998 buf: &[u8],
999 ) -> Poll<io::Result<usize>> {
1000 self.poll_fill_bytes(cx, buf).map(Ok)
1001 }
1002
1003 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1004 // Nothing to flush.
1005 Poll::Ready(Ok(()))
1006 }
1007
1008 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1009 // Set the closed flag.
1010 self.inner.closed.store(true, Ordering::Release);
1011
1012 // Wake up any tasks that may be waiting on the pipe.
1013 self.inner.reader.wake();
1014 self.inner.writer.wake();
1015
1016 // The pipe is now closed.
1017 Poll::Ready(Ok(()))
1018 }
1019}
1020
1021/// A trait for reading bytes into a pipe.
1022trait ReadLike {
1023 /// The error type.
1024 type Error;
1025
1026 /// Reads bytes into the given buffer.
1027 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
1028}
1029
1030#[cfg(feature = "std")]
1031impl<R: Read> ReadLike for R {
1032 type Error = io::Error;
1033
1034 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1035 Read::read(self, buf)
1036 }
1037}
1038
1039/// Implements `no_std` reading around a byte slice.
1040struct ReadBytes<'a>(&'a [u8]);
1041
1042impl ReadLike for ReadBytes<'_> {
1043 type Error = Infallible;
1044
1045 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
1046 let n = self.0.len().min(buf.len());
1047 buf[..n].copy_from_slice(&self.0[..n]);
1048 self.0 = &self.0[n..];
1049 Ok(n)
1050 }
1051}
1052
1053/// A trait for writing bytes from a pipe.
1054trait WriteLike {
1055 /// The error type.
1056 type Error;
1057
1058 /// Writes bytes from the given buffer.
1059 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
1060}
1061
1062#[cfg(feature = "std")]
1063impl<W: Write> WriteLike for W {
1064 type Error = io::Error;
1065
1066 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1067 Write::write(self, buf)
1068 }
1069}
1070
1071/// Implements `no_std` writing around a byte slice.
1072struct WriteBytes<'a>(&'a mut [u8]);
1073
1074impl WriteLike for WriteBytes<'_> {
1075 type Error = Infallible;
1076
1077 fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
1078 let n = self.0.len().min(buf.len());
1079 self.0[..n].copy_from_slice(&buf[..n]);
1080
1081 // mem::take() is not available on 1.36
1082 #[allow(clippy::mem_replace_with_default)]
1083 {
1084 let slice = mem::replace(&mut self.0, &mut []);
1085 self.0 = &mut slice[n..];
1086 }
1087
1088 Ok(n)
1089 }
1090}
1091
1092/// Yield with some small probability.
1093fn maybe_yield(rng: &mut fastrand::Rng, cx: &mut Context<'_>) -> Poll<()> {
1094 if rng.usize(..100) == 0 {
1095 cx.waker().wake_by_ref();
1096 Poll::Pending
1097 } else {
1098 Poll::Ready(())
1099 }
1100}
1101
1102/// Get a random number generator.
1103#[cfg(feature = "std")]
1104#[inline]
1105fn rng() -> fastrand::Rng {
1106 fastrand::Rng::new()
1107}
1108
1109/// Get a random number generator.
1110///
1111/// This uses a fixed seed due to the lack of a good RNG in `no_std` environments.
1112#[cfg(not(feature = "std"))]
1113#[inline]
1114fn rng() -> fastrand::Rng {
1115 // Chosen by fair roll of the dice.
1116 fastrand::Rng::with_seed(0x7e9b496634c97ec6)
1117}
1118
1119/// ```
1120/// use piper::{Reader, Writer};
1121/// fn _send_sync<T: Send + Sync>() {}
1122/// _send_sync::<Reader>();
1123/// _send_sync::<Writer>();
1124/// ```
1125fn _assert_send_sync() {}
1126
1127mod sync {
1128 #[cfg(not(feature = "portable-atomic"))]
1129 pub use core::sync::atomic;
1130
1131 #[cfg(not(feature = "portable-atomic"))]
1132 pub use alloc::sync::Arc;
1133
1134 #[cfg(feature = "portable-atomic")]
1135 pub use portable_atomic_crate as atomic;
1136
1137 #[cfg(feature = "portable-atomic")]
1138 pub use portable_atomic_util::Arc;
1139}