futures_lite/io.rs
1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::cmp;
25use std::fmt;
26use std::future::Future;
27use std::io::{IoSlice, IoSliceMut};
28use std::mem;
29use std::pin::Pin;
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll};
32
33use futures_core::stream::Stream;
34use pin_project_lite::pin_project;
35
36use crate::future;
37use crate::ready;
38
39const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40
41/// Copies the entire contents of a reader into a writer.
42///
43/// This function will read data from `reader` and write it into `writer` in a streaming fashion
44/// until `reader` returns EOF.
45///
46/// On success, returns the total number of bytes copied.
47///
48/// # Examples
49///
50/// ```
51/// use futures_lite::io::{self, BufReader, BufWriter};
52///
53/// # spin_on::spin_on(async {
54/// let input: &[u8] = b"hello";
55/// let reader = BufReader::new(input);
56///
57/// let mut output = Vec::new();
58/// let writer = BufWriter::new(&mut output);
59///
60/// io::copy(reader, writer).await?;
61/// # std::io::Result::Ok(()) });
62/// ```
63pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
64where
65 R: AsyncRead,
66 W: AsyncWrite,
67{
68 pin_project! {
69 struct CopyFuture<R, W> {
70 #[pin]
71 reader: R,
72 #[pin]
73 writer: W,
74 amt: u64,
75 }
76 }
77
78 impl<R, W> Future for CopyFuture<R, W>
79 where
80 R: AsyncBufRead,
81 W: AsyncWrite,
82 {
83 type Output = Result<u64>;
84
85 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86 let mut this = self.project();
87 loop {
88 let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
89 if buffer.is_empty() {
90 ready!(this.writer.as_mut().poll_flush(cx))?;
91 return Poll::Ready(Ok(*this.amt));
92 }
93
94 let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
95 if i == 0 {
96 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
97 }
98 *this.amt += i as u64;
99 this.reader.as_mut().consume(i);
100 }
101 }
102 }
103
104 let future = CopyFuture {
105 reader: BufReader::new(reader),
106 writer,
107 amt: 0,
108 };
109 future.await
110}
111
112/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
113///
114/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
115/// This is usually the case for in-memory buffered I/O.
116///
117/// # Examples
118///
119/// ```
120/// use futures_lite::io::{AssertAsync, AsyncReadExt};
121///
122/// let reader: &[u8] = b"hello";
123///
124/// # spin_on::spin_on(async {
125/// let mut async_reader = AssertAsync::new(reader);
126/// let mut contents = String::new();
127///
128/// // This line works in async manner - note that there is await:
129/// async_reader.read_to_string(&mut contents).await?;
130/// # std::io::Result::Ok(()) });
131/// ```
132#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
133pub struct AssertAsync<T>(T);
134
135impl<T> Unpin for AssertAsync<T> {}
136
137impl<T> AssertAsync<T> {
138 /// Wraps an I/O handle implementing [`std::io`] traits.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use futures_lite::io::AssertAsync;
144 ///
145 /// let reader: &[u8] = b"hello";
146 ///
147 /// let async_reader = AssertAsync::new(reader);
148 /// ```
149 #[inline(always)]
150 pub fn new(io: T) -> Self {
151 AssertAsync(io)
152 }
153
154 /// Gets a reference to the inner I/O handle.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use futures_lite::io::AssertAsync;
160 ///
161 /// let reader: &[u8] = b"hello";
162 ///
163 /// let async_reader = AssertAsync::new(reader);
164 /// let r = async_reader.get_ref();
165 /// ```
166 #[inline(always)]
167 pub fn get_ref(&self) -> &T {
168 &self.0
169 }
170
171 /// Gets a mutable reference to the inner I/O handle.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use futures_lite::io::AssertAsync;
177 ///
178 /// let reader: &[u8] = b"hello";
179 ///
180 /// let mut async_reader = AssertAsync::new(reader);
181 /// let r = async_reader.get_mut();
182 /// ```
183 #[inline(always)]
184 pub fn get_mut(&mut self) -> &mut T {
185 &mut self.0
186 }
187
188 /// Extracts the inner I/O handle.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use futures_lite::io::AssertAsync;
194 ///
195 /// let reader: &[u8] = b"hello";
196 ///
197 /// let async_reader = AssertAsync::new(reader);
198 /// let inner = async_reader.into_inner();
199 /// ```
200 #[inline(always)]
201 pub fn into_inner(self) -> T {
202 self.0
203 }
204}
205
206fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
207where
208 F: FnMut() -> std::io::Result<T>,
209{
210 loop {
211 match f() {
212 Err(err) if err.kind() == ErrorKind::Interrupted => {}
213 res => return Poll::Ready(res),
214 }
215 }
216}
217
218impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
219 #[inline]
220 fn poll_read(
221 mut self: Pin<&mut Self>,
222 _: &mut Context<'_>,
223 buf: &mut [u8],
224 ) -> Poll<Result<usize>> {
225 assert_async_wrapio(move || self.0.read(buf))
226 }
227
228 #[inline]
229 fn poll_read_vectored(
230 mut self: Pin<&mut Self>,
231 _: &mut Context<'_>,
232 bufs: &mut [IoSliceMut<'_>],
233 ) -> Poll<Result<usize>> {
234 assert_async_wrapio(move || self.0.read_vectored(bufs))
235 }
236}
237
238impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
239 #[inline]
240 fn poll_write(
241 mut self: Pin<&mut Self>,
242 _: &mut Context<'_>,
243 buf: &[u8],
244 ) -> Poll<Result<usize>> {
245 assert_async_wrapio(move || self.0.write(buf))
246 }
247
248 #[inline]
249 fn poll_write_vectored(
250 mut self: Pin<&mut Self>,
251 _: &mut Context<'_>,
252 bufs: &[IoSlice<'_>],
253 ) -> Poll<Result<usize>> {
254 assert_async_wrapio(move || self.0.write_vectored(bufs))
255 }
256
257 #[inline]
258 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
259 assert_async_wrapio(move || self.0.flush())
260 }
261
262 #[inline]
263 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
264 self.poll_flush(cx)
265 }
266}
267
268impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
269 #[inline]
270 fn poll_seek(
271 mut self: Pin<&mut Self>,
272 _: &mut Context<'_>,
273 pos: SeekFrom,
274 ) -> Poll<Result<u64>> {
275 assert_async_wrapio(move || self.0.seek(pos))
276 }
277}
278
279/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
280/// polls to `WouldBlock` errors.
281///
282/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
283/// that take `Read` as a parameter.
284///
285/// # Examples
286///
287/// ```
288/// use std::io::Read;
289/// use std::task::{Poll, Context};
290///
291/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
292/// // Assume we have a library that's built around `Read` and `Write` traits.
293/// use cooltls::Session;
294///
295/// // We want to use it with our writer that implements `AsyncWrite`.
296/// let writer = Stream::new();
297///
298/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
299/// use futures_lite::io::AsyncAsSync;
300/// let writer = AsyncAsSync::new(cx, writer);
301///
302/// // Now, we can use it with `cooltls`.
303/// let mut session = Session::new(writer);
304///
305/// // Match on the result of `read()` and translate it to poll.
306/// match session.read(&mut [0; 1024]) {
307/// Ok(n) => Poll::Ready(n),
308/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
309/// Err(err) => panic!("unexpected error: {}", err),
310/// }
311/// }
312///
313/// // Usually, poll-based functions are best wrapped using `poll_fn`.
314/// use futures_lite::future::poll_fn;
315/// # futures_lite::future::block_on(async {
316/// poll_fn(|cx| poll_for_io(cx)).await;
317/// # });
318/// # struct Stream;
319/// # impl Stream {
320/// # fn new() -> Stream {
321/// # Stream
322/// # }
323/// # }
324/// # impl futures_lite::io::AsyncRead for Stream {
325/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
326/// # Poll::Ready(Ok(0))
327/// # }
328/// # }
329/// # mod cooltls {
330/// # pub struct Session<W> {
331/// # reader: W,
332/// # }
333/// # impl<W> Session<W> {
334/// # pub fn new(reader: W) -> Session<W> {
335/// # Session { reader }
336/// # }
337/// # }
338/// # impl<W: std::io::Read> std::io::Read for Session<W> {
339/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
340/// # self.reader.read(buf)
341/// # }
342/// # }
343/// # }
344/// ```
345#[derive(Debug)]
346pub struct AsyncAsSync<'r, 'ctx, T> {
347 /// The context we are using to poll the future.
348 pub context: &'r mut Context<'ctx>,
349
350 /// The actual reader/writer we are wrapping.
351 pub inner: T,
352}
353
354impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
355 /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use futures_lite::io::AsyncAsSync;
361 /// use std::task::Context;
362 /// use waker_fn::waker_fn;
363 ///
364 /// let reader: &[u8] = b"hello";
365 /// let waker = waker_fn(|| {});
366 /// let mut context = Context::from_waker(&waker);
367 ///
368 /// let async_reader = AsyncAsSync::new(&mut context, reader);
369 /// ```
370 #[inline]
371 pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
372 AsyncAsSync { context, inner }
373 }
374
375 /// Attempt to shutdown the I/O handle.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// use futures_lite::io::AsyncAsSync;
381 /// use std::task::Context;
382 /// use waker_fn::waker_fn;
383 ///
384 /// let reader: Vec<u8> = b"hello".to_vec();
385 /// let waker = waker_fn(|| {});
386 /// let mut context = Context::from_waker(&waker);
387 ///
388 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
389 /// async_reader.close().unwrap();
390 /// ```
391 #[inline]
392 pub fn close(&mut self) -> Result<()>
393 where
394 T: AsyncWrite + Unpin,
395 {
396 self.poll_with(|io, cx| io.poll_close(cx))
397 }
398
399 /// Poll this `AsyncAsSync` for some function.
400 ///
401 /// # Examples
402 ///
403 /// ```
404 /// use futures_lite::io::{AsyncAsSync, AsyncRead};
405 /// use std::task::Context;
406 /// use waker_fn::waker_fn;
407 ///
408 /// let reader: &[u8] = b"hello";
409 /// let waker = waker_fn(|| {});
410 /// let mut context = Context::from_waker(&waker);
411 ///
412 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
413 /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
414 /// assert_eq!(r.unwrap(), 5);
415 /// ```
416 #[inline]
417 pub fn poll_with<R>(
418 &mut self,
419 f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
420 ) -> Result<R>
421 where
422 T: Unpin,
423 {
424 match f(Pin::new(&mut self.inner), self.context) {
425 Poll::Ready(res) => res,
426 Poll::Pending => Err(ErrorKind::WouldBlock.into()),
427 }
428 }
429}
430
431impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
432 #[inline]
433 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
434 self.poll_with(|io, cx| io.poll_read(cx, buf))
435 }
436
437 #[inline]
438 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
439 self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
440 }
441}
442
443impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
444 #[inline]
445 fn write(&mut self, buf: &[u8]) -> Result<usize> {
446 self.poll_with(|io, cx| io.poll_write(cx, buf))
447 }
448
449 #[inline]
450 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
451 self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
452 }
453
454 #[inline]
455 fn flush(&mut self) -> Result<()> {
456 self.poll_with(|io, cx| io.poll_flush(cx))
457 }
458}
459
460impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
461 #[inline]
462 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
463 self.poll_with(|io, cx| io.poll_seek(cx, pos))
464 }
465}
466
467impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
468 #[inline]
469 fn as_ref(&self) -> &T {
470 &self.inner
471 }
472}
473
474impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
475 #[inline]
476 fn as_mut(&mut self) -> &mut T {
477 &mut self.inner
478 }
479}
480
481impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
482 #[inline]
483 fn borrow(&self) -> &T {
484 &self.inner
485 }
486}
487
488impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
489 #[inline]
490 fn borrow_mut(&mut self) -> &mut T {
491 &mut self.inner
492 }
493}
494
495/// Blocks on all async I/O operations and implements [`std::io`] traits.
496///
497/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
498/// manually all the time becomes too tedious, use this type for more convenient blocking on async
499/// I/O operations.
500///
501/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
502/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
503/// [`AsyncSeek`], respectively.
504///
505/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
506/// dropping the [`BlockOn`] handle or some buffered data might get lost.
507///
508/// # Examples
509///
510/// ```
511/// use futures_lite::io::BlockOn;
512/// use futures_lite::pin;
513/// use std::io::Read;
514///
515/// let reader: &[u8] = b"hello";
516/// pin!(reader);
517///
518/// let mut blocking_reader = BlockOn::new(reader);
519/// let mut contents = String::new();
520///
521/// // This line blocks - note that there is no await:
522/// blocking_reader.read_to_string(&mut contents)?;
523/// # std::io::Result::Ok(())
524/// ```
525#[derive(Debug)]
526pub struct BlockOn<T>(T);
527
528impl<T> BlockOn<T> {
529 /// Wraps an async I/O handle into a blocking interface.
530 ///
531 /// # Examples
532 ///
533 /// ```
534 /// use futures_lite::io::BlockOn;
535 /// use futures_lite::pin;
536 ///
537 /// let reader: &[u8] = b"hello";
538 /// pin!(reader);
539 ///
540 /// let blocking_reader = BlockOn::new(reader);
541 /// ```
542 pub fn new(io: T) -> BlockOn<T> {
543 BlockOn(io)
544 }
545
546 /// Gets a reference to the async I/O handle.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// use futures_lite::io::BlockOn;
552 /// use futures_lite::pin;
553 ///
554 /// let reader: &[u8] = b"hello";
555 /// pin!(reader);
556 ///
557 /// let blocking_reader = BlockOn::new(reader);
558 /// let r = blocking_reader.get_ref();
559 /// ```
560 pub fn get_ref(&self) -> &T {
561 &self.0
562 }
563
564 /// Gets a mutable reference to the async I/O handle.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use futures_lite::io::BlockOn;
570 /// use futures_lite::pin;
571 ///
572 /// let reader: &[u8] = b"hello";
573 /// pin!(reader);
574 ///
575 /// let mut blocking_reader = BlockOn::new(reader);
576 /// let r = blocking_reader.get_mut();
577 /// ```
578 pub fn get_mut(&mut self) -> &mut T {
579 &mut self.0
580 }
581
582 /// Extracts the inner async I/O handle.
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// use futures_lite::io::BlockOn;
588 /// use futures_lite::pin;
589 ///
590 /// let reader: &[u8] = b"hello";
591 /// pin!(reader);
592 ///
593 /// let blocking_reader = BlockOn::new(reader);
594 /// let inner = blocking_reader.into_inner();
595 /// ```
596 pub fn into_inner(self) -> T {
597 self.0
598 }
599}
600
601impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
602 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
603 future::block_on(self.0.read(buf))
604 }
605}
606
607impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
608 fn fill_buf(&mut self) -> Result<&[u8]> {
609 future::block_on(self.0.fill_buf())
610 }
611
612 fn consume(&mut self, amt: usize) {
613 Pin::new(&mut self.0).consume(amt)
614 }
615}
616
617impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
618 fn write(&mut self, buf: &[u8]) -> Result<usize> {
619 future::block_on(self.0.write(buf))
620 }
621
622 fn flush(&mut self) -> Result<()> {
623 future::block_on(self.0.flush())
624 }
625}
626
627impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
628 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
629 future::block_on(self.0.seek(pos))
630 }
631}
632
633pin_project! {
634 /// Adds buffering to a reader.
635 ///
636 /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
637 /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
638 /// maintains an in-memory buffer of the incoming byte stream.
639 ///
640 /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
641 /// the same file or networking socket. It does not help when reading very large amounts at
642 /// once, or reading just once or a few times. It also provides no advantage when reading from
643 /// a source that is already in memory, like a `Vec<u8>`.
644 ///
645 /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
646 /// multiple instances of [`BufReader`] on the same reader can cause data loss.
647 ///
648 /// # Examples
649 ///
650 /// ```
651 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
652 ///
653 /// # spin_on::spin_on(async {
654 /// let input: &[u8] = b"hello";
655 /// let mut reader = BufReader::new(input);
656 ///
657 /// let mut line = String::new();
658 /// reader.read_line(&mut line).await?;
659 /// # std::io::Result::Ok(()) });
660 /// ```
661 pub struct BufReader<R> {
662 #[pin]
663 inner: R,
664 buf: Box<[u8]>,
665 pos: usize,
666 cap: usize,
667 }
668}
669
670impl<R: AsyncRead> BufReader<R> {
671 /// Creates a buffered reader with the default buffer capacity.
672 ///
673 /// The default capacity is currently 8 KB, but that may change in the future.
674 ///
675 /// # Examples
676 ///
677 /// ```
678 /// use futures_lite::io::BufReader;
679 ///
680 /// let input: &[u8] = b"hello";
681 /// let reader = BufReader::new(input);
682 /// ```
683 pub fn new(inner: R) -> BufReader<R> {
684 BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
685 }
686
687 /// Creates a buffered reader with the specified capacity.
688 ///
689 /// # Examples
690 ///
691 /// ```
692 /// use futures_lite::io::BufReader;
693 ///
694 /// let input: &[u8] = b"hello";
695 /// let reader = BufReader::with_capacity(1024, input);
696 /// ```
697 pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
698 BufReader {
699 inner,
700 buf: vec![0; capacity].into_boxed_slice(),
701 pos: 0,
702 cap: 0,
703 }
704 }
705}
706
707impl<R> BufReader<R> {
708 /// Gets a reference to the underlying reader.
709 ///
710 /// It is not advisable to directly read from the underlying reader.
711 ///
712 /// # Examples
713 ///
714 /// ```
715 /// use futures_lite::io::BufReader;
716 ///
717 /// let input: &[u8] = b"hello";
718 /// let reader = BufReader::new(input);
719 ///
720 /// let r = reader.get_ref();
721 /// ```
722 pub fn get_ref(&self) -> &R {
723 &self.inner
724 }
725
726 /// Gets a mutable reference to the underlying reader.
727 ///
728 /// It is not advisable to directly read from the underlying reader.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// use futures_lite::io::BufReader;
734 ///
735 /// let input: &[u8] = b"hello";
736 /// let mut reader = BufReader::new(input);
737 ///
738 /// let r = reader.get_mut();
739 /// ```
740 pub fn get_mut(&mut self) -> &mut R {
741 &mut self.inner
742 }
743
744 /// Gets a pinned mutable reference to the underlying reader.
745 ///
746 /// It is not advisable to directly read from the underlying reader.
747 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
748 self.project().inner
749 }
750
751 /// Returns a reference to the internal buffer.
752 ///
753 /// This method will not attempt to fill the buffer if it is empty.
754 ///
755 /// # Examples
756 ///
757 /// ```
758 /// use futures_lite::io::BufReader;
759 ///
760 /// let input: &[u8] = b"hello";
761 /// let reader = BufReader::new(input);
762 ///
763 /// // The internal buffer is empty until the first read request.
764 /// assert_eq!(reader.buffer(), &[]);
765 /// ```
766 pub fn buffer(&self) -> &[u8] {
767 &self.buf[self.pos..self.cap]
768 }
769
770 /// Unwraps the buffered reader, returning the underlying reader.
771 ///
772 /// Note that any leftover data in the internal buffer will be lost.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use futures_lite::io::BufReader;
778 ///
779 /// let input: &[u8] = b"hello";
780 /// let reader = BufReader::new(input);
781 ///
782 /// assert_eq!(reader.into_inner(), input);
783 /// ```
784 pub fn into_inner(self) -> R {
785 self.inner
786 }
787
788 /// Invalidates all data in the internal buffer.
789 #[inline]
790 fn discard_buffer(self: Pin<&mut Self>) {
791 let this = self.project();
792 *this.pos = 0;
793 *this.cap = 0;
794 }
795}
796
797impl<R: AsyncRead> AsyncRead for BufReader<R> {
798 fn poll_read(
799 mut self: Pin<&mut Self>,
800 cx: &mut Context<'_>,
801 buf: &mut [u8],
802 ) -> Poll<Result<usize>> {
803 // If we don't have any buffered data and we're doing a massive read
804 // (larger than our internal buffer), bypass our internal buffer
805 // entirely.
806 if self.pos == self.cap && buf.len() >= self.buf.len() {
807 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
808 self.discard_buffer();
809 return Poll::Ready(res);
810 }
811 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
812 let nread = std::io::Read::read(&mut rem, buf)?;
813 self.consume(nread);
814 Poll::Ready(Ok(nread))
815 }
816
817 fn poll_read_vectored(
818 mut self: Pin<&mut Self>,
819 cx: &mut Context<'_>,
820 bufs: &mut [IoSliceMut<'_>],
821 ) -> Poll<Result<usize>> {
822 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
823 if self.pos == self.cap && total_len >= self.buf.len() {
824 let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
825 self.discard_buffer();
826 return Poll::Ready(res);
827 }
828 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
829 let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
830 self.consume(nread);
831 Poll::Ready(Ok(nread))
832 }
833}
834
835impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
836 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
837 let mut this = self.project();
838
839 // If we've reached the end of our internal buffer then we need to fetch
840 // some more data from the underlying reader.
841 // Branch using `>=` instead of the more correct `==`
842 // to tell the compiler that the pos..cap slice is always valid.
843 if *this.pos >= *this.cap {
844 debug_assert!(*this.pos == *this.cap);
845 *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
846 *this.pos = 0;
847 }
848 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
849 }
850
851 fn consume(self: Pin<&mut Self>, amt: usize) {
852 let this = self.project();
853 *this.pos = cmp::min(*this.pos + amt, *this.cap);
854 }
855}
856
857impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
858 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
859 f.debug_struct("BufReader")
860 .field("reader", &self.inner)
861 .field(
862 "buffer",
863 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
864 )
865 .finish()
866 }
867}
868
869impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
870 /// Seeks to an offset, in bytes, in the underlying reader.
871 ///
872 /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
873 /// reader would be at if the [`BufReader`] had no internal buffer.
874 ///
875 /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
876 /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
877 /// immediately after a seek yields the underlying reader at the same position.
878 ///
879 /// See [`AsyncSeek`] for more details.
880 ///
881 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
882 /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
883 /// the second seek returns `Err`, the underlying reader will be left at the same position it
884 /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
885 fn poll_seek(
886 mut self: Pin<&mut Self>,
887 cx: &mut Context<'_>,
888 pos: SeekFrom,
889 ) -> Poll<Result<u64>> {
890 let result: u64;
891 if let SeekFrom::Current(n) = pos {
892 let remainder = (self.cap - self.pos) as i64;
893 // it should be safe to assume that remainder fits within an i64 as the alternative
894 // means we managed to allocate 8 exbibytes and that's absurd.
895 // But it's not out of the realm of possibility for some weird underlying reader to
896 // support seeking by i64::min_value() so we need to handle underflow when subtracting
897 // remainder.
898 if let Some(offset) = n.checked_sub(remainder) {
899 result = ready!(self
900 .as_mut()
901 .get_pin_mut()
902 .poll_seek(cx, SeekFrom::Current(offset)))?;
903 } else {
904 // seek backwards by our remainder, and then by the offset
905 ready!(self
906 .as_mut()
907 .get_pin_mut()
908 .poll_seek(cx, SeekFrom::Current(-remainder)))?;
909 self.as_mut().discard_buffer();
910 result = ready!(self
911 .as_mut()
912 .get_pin_mut()
913 .poll_seek(cx, SeekFrom::Current(n)))?;
914 }
915 } else {
916 // Seeking with Start/End doesn't care about our buffer length.
917 result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
918 }
919 self.discard_buffer();
920 Poll::Ready(Ok(result))
921 }
922}
923
924impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
925 fn poll_write(
926 mut self: Pin<&mut Self>,
927 cx: &mut Context<'_>,
928 buf: &[u8],
929 ) -> Poll<Result<usize>> {
930 self.as_mut().get_pin_mut().poll_write(cx, buf)
931 }
932
933 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
934 self.as_mut().get_pin_mut().poll_flush(cx)
935 }
936
937 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938 self.as_mut().get_pin_mut().poll_close(cx)
939 }
940}
941
942pin_project! {
943 /// Adds buffering to a writer.
944 ///
945 /// It can be excessively inefficient to work directly with something that implements
946 /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
947 /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
948 /// writes it to the underlying writer in large, infrequent batches.
949 ///
950 /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
951 /// the same file or networking socket. It does not help when writing very large amounts at
952 /// once, or writing just once or a few times. It also provides no advantage when writing to a
953 /// destination that is in memory, like a `Vec<u8>`.
954 ///
955 /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
956 /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
957 /// dropping the [`BufWriter`].
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
963 ///
964 /// # spin_on::spin_on(async {
965 /// let mut output = Vec::new();
966 /// let mut writer = BufWriter::new(&mut output);
967 ///
968 /// writer.write_all(b"hello").await?;
969 /// writer.flush().await?;
970 /// # std::io::Result::Ok(()) });
971 /// ```
972 pub struct BufWriter<W> {
973 #[pin]
974 inner: W,
975 buf: Vec<u8>,
976 written: usize,
977 }
978}
979
980impl<W: AsyncWrite> BufWriter<W> {
981 /// Creates a buffered writer with the default buffer capacity.
982 ///
983 /// The default capacity is currently 8 KB, but that may change in the future.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use futures_lite::io::BufWriter;
989 ///
990 /// let mut output = Vec::new();
991 /// let writer = BufWriter::new(&mut output);
992 /// ```
993 pub fn new(inner: W) -> BufWriter<W> {
994 BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
995 }
996
997 /// Creates a buffered writer with the specified buffer capacity.
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// use futures_lite::io::BufWriter;
1003 ///
1004 /// let mut output = Vec::new();
1005 /// let writer = BufWriter::with_capacity(100, &mut output);
1006 /// ```
1007 pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1008 BufWriter {
1009 inner,
1010 buf: Vec::with_capacity(capacity),
1011 written: 0,
1012 }
1013 }
1014
1015 /// Gets a reference to the underlying writer.
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```
1020 /// use futures_lite::io::BufWriter;
1021 ///
1022 /// let mut output = Vec::new();
1023 /// let writer = BufWriter::new(&mut output);
1024 ///
1025 /// let r = writer.get_ref();
1026 /// ```
1027 pub fn get_ref(&self) -> &W {
1028 &self.inner
1029 }
1030
1031 /// Gets a mutable reference to the underlying writer.
1032 ///
1033 /// It is not advisable to directly write to the underlying writer.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// use futures_lite::io::BufWriter;
1039 ///
1040 /// let mut output = Vec::new();
1041 /// let mut writer = BufWriter::new(&mut output);
1042 ///
1043 /// let r = writer.get_mut();
1044 /// ```
1045 pub fn get_mut(&mut self) -> &mut W {
1046 &mut self.inner
1047 }
1048
1049 /// Gets a pinned mutable reference to the underlying writer.
1050 ///
1051 /// It is not not advisable to directly write to the underlying writer.
1052 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1053 self.project().inner
1054 }
1055
1056 /// Unwraps the buffered writer, returning the underlying writer.
1057 ///
1058 /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1059 /// that data, flush the buffered writer before unwrapping it.
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1065 ///
1066 /// # spin_on::spin_on(async {
1067 /// let mut output = vec![1, 2, 3];
1068 /// let mut writer = BufWriter::new(&mut output);
1069 ///
1070 /// writer.write_all(&[4]).await?;
1071 /// writer.flush().await?;
1072 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1073 /// # std::io::Result::Ok(()) });
1074 /// ```
1075 pub fn into_inner(self) -> W {
1076 self.inner
1077 }
1078
1079 /// Returns a reference to the internal buffer.
1080 ///
1081 /// # Examples
1082 ///
1083 /// ```
1084 /// use futures_lite::io::BufWriter;
1085 ///
1086 /// let mut output = Vec::new();
1087 /// let writer = BufWriter::new(&mut output);
1088 ///
1089 /// // The internal buffer is empty until the first write request.
1090 /// assert_eq!(writer.buffer(), &[]);
1091 /// ```
1092 pub fn buffer(&self) -> &[u8] {
1093 &self.buf
1094 }
1095
1096 /// Flush the buffer.
1097 fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1098 let mut this = self.project();
1099 let len = this.buf.len();
1100 let mut ret = Ok(());
1101
1102 while *this.written < len {
1103 match this
1104 .inner
1105 .as_mut()
1106 .poll_write(cx, &this.buf[*this.written..])
1107 {
1108 Poll::Ready(Ok(0)) => {
1109 ret = Err(Error::new(
1110 ErrorKind::WriteZero,
1111 "Failed to write buffered data",
1112 ));
1113 break;
1114 }
1115 Poll::Ready(Ok(n)) => *this.written += n,
1116 Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1117 Poll::Ready(Err(e)) => {
1118 ret = Err(e);
1119 break;
1120 }
1121 Poll::Pending => return Poll::Pending,
1122 }
1123 }
1124
1125 if *this.written > 0 {
1126 this.buf.drain(..*this.written);
1127 }
1128 *this.written = 0;
1129
1130 Poll::Ready(ret)
1131 }
1132}
1133
1134impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136 f.debug_struct("BufWriter")
1137 .field("writer", &self.inner)
1138 .field("buf", &self.buf)
1139 .finish()
1140 }
1141}
1142
1143impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1144 fn poll_write(
1145 mut self: Pin<&mut Self>,
1146 cx: &mut Context<'_>,
1147 buf: &[u8],
1148 ) -> Poll<Result<usize>> {
1149 if self.buf.len() + buf.len() > self.buf.capacity() {
1150 ready!(self.as_mut().poll_flush_buf(cx))?;
1151 }
1152 if buf.len() >= self.buf.capacity() {
1153 self.get_pin_mut().poll_write(cx, buf)
1154 } else {
1155 Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1156 }
1157 }
1158
1159 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160 ready!(self.as_mut().poll_flush_buf(cx))?;
1161 self.get_pin_mut().poll_flush(cx)
1162 }
1163
1164 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1165 ready!(self.as_mut().poll_flush_buf(cx))?;
1166 self.get_pin_mut().poll_close(cx)
1167 }
1168}
1169
1170impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1171 /// Seek to the offset, in bytes, in the underlying writer.
1172 ///
1173 /// Seeking always writes out the internal buffer before seeking.
1174 fn poll_seek(
1175 mut self: Pin<&mut Self>,
1176 cx: &mut Context<'_>,
1177 pos: SeekFrom,
1178 ) -> Poll<Result<u64>> {
1179 ready!(self.as_mut().poll_flush_buf(cx))?;
1180 self.get_pin_mut().poll_seek(cx, pos)
1181 }
1182}
1183
1184/// Gives an in-memory buffer a cursor for reading and writing.
1185///
1186/// # Examples
1187///
1188/// ```
1189/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1190///
1191/// # spin_on::spin_on(async {
1192/// let mut bytes = b"hello".to_vec();
1193/// let mut cursor = Cursor::new(&mut bytes);
1194///
1195/// // Overwrite 'h' with 'H'.
1196/// cursor.write_all(b"H").await?;
1197///
1198/// // Move the cursor one byte forward.
1199/// cursor.seek(SeekFrom::Current(1)).await?;
1200///
1201/// // Read a byte.
1202/// let mut byte = [0];
1203/// cursor.read_exact(&mut byte).await?;
1204/// assert_eq!(&byte, b"l");
1205///
1206/// // Check the final buffer.
1207/// assert_eq!(bytes, b"Hello");
1208/// # std::io::Result::Ok(()) });
1209/// ```
1210#[derive(Clone, Debug, Default)]
1211pub struct Cursor<T> {
1212 inner: std::io::Cursor<T>,
1213}
1214
1215impl<T> Cursor<T> {
1216 /// Creates a cursor for an in-memory buffer.
1217 ///
1218 /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1219 /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1220 /// the buffer using [`set_position()`][Cursor::set_position()`] or
1221 /// [`seek()`][`AsyncSeekExt::seek()`].
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```
1226 /// use futures_lite::io::Cursor;
1227 ///
1228 /// let cursor = Cursor::new(Vec::<u8>::new());
1229 /// ```
1230 pub fn new(inner: T) -> Cursor<T> {
1231 Cursor {
1232 inner: std::io::Cursor::new(inner),
1233 }
1234 }
1235
1236 /// Gets a reference to the underlying buffer.
1237 ///
1238 /// # Examples
1239 ///
1240 /// ```
1241 /// use futures_lite::io::Cursor;
1242 ///
1243 /// let cursor = Cursor::new(Vec::<u8>::new());
1244 /// let r = cursor.get_ref();
1245 /// ```
1246 pub fn get_ref(&self) -> &T {
1247 self.inner.get_ref()
1248 }
1249
1250 /// Gets a mutable reference to the underlying buffer.
1251 ///
1252 /// # Examples
1253 ///
1254 /// ```
1255 /// use futures_lite::io::Cursor;
1256 ///
1257 /// let mut cursor = Cursor::new(Vec::<u8>::new());
1258 /// let r = cursor.get_mut();
1259 /// ```
1260 pub fn get_mut(&mut self) -> &mut T {
1261 self.inner.get_mut()
1262 }
1263
1264 /// Unwraps the cursor, returning the underlying buffer.
1265 ///
1266 /// # Examples
1267 ///
1268 /// ```
1269 /// use futures_lite::io::Cursor;
1270 ///
1271 /// let cursor = Cursor::new(vec![1, 2, 3]);
1272 /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1273 /// ```
1274 pub fn into_inner(self) -> T {
1275 self.inner.into_inner()
1276 }
1277
1278 /// Returns the current position of this cursor.
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```
1283 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1284 ///
1285 /// # spin_on::spin_on(async {
1286 /// let mut cursor = Cursor::new(b"hello");
1287 /// assert_eq!(cursor.position(), 0);
1288 ///
1289 /// cursor.seek(SeekFrom::Start(2)).await?;
1290 /// assert_eq!(cursor.position(), 2);
1291 /// # std::io::Result::Ok(()) });
1292 /// ```
1293 pub fn position(&self) -> u64 {
1294 self.inner.position()
1295 }
1296
1297 /// Sets the position of this cursor.
1298 ///
1299 /// # Examples
1300 ///
1301 /// ```
1302 /// use futures_lite::io::Cursor;
1303 ///
1304 /// let mut cursor = Cursor::new(b"hello");
1305 /// assert_eq!(cursor.position(), 0);
1306 ///
1307 /// cursor.set_position(2);
1308 /// assert_eq!(cursor.position(), 2);
1309 /// ```
1310 pub fn set_position(&mut self, pos: u64) {
1311 self.inner.set_position(pos)
1312 }
1313}
1314
1315impl<T> AsyncSeek for Cursor<T>
1316where
1317 T: AsRef<[u8]> + Unpin,
1318{
1319 fn poll_seek(
1320 mut self: Pin<&mut Self>,
1321 _: &mut Context<'_>,
1322 pos: SeekFrom,
1323 ) -> Poll<Result<u64>> {
1324 Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1325 }
1326}
1327
1328impl<T> AsyncRead for Cursor<T>
1329where
1330 T: AsRef<[u8]> + Unpin,
1331{
1332 fn poll_read(
1333 mut self: Pin<&mut Self>,
1334 _cx: &mut Context<'_>,
1335 buf: &mut [u8],
1336 ) -> Poll<Result<usize>> {
1337 Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1338 }
1339
1340 fn poll_read_vectored(
1341 mut self: Pin<&mut Self>,
1342 _: &mut Context<'_>,
1343 bufs: &mut [IoSliceMut<'_>],
1344 ) -> Poll<Result<usize>> {
1345 Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1346 }
1347}
1348
1349impl<T> AsyncBufRead for Cursor<T>
1350where
1351 T: AsRef<[u8]> + Unpin,
1352{
1353 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1354 Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1355 }
1356
1357 fn consume(mut self: Pin<&mut Self>, amt: usize) {
1358 std::io::BufRead::consume(&mut self.inner, amt)
1359 }
1360}
1361
1362impl AsyncWrite for Cursor<&mut [u8]> {
1363 fn poll_write(
1364 mut self: Pin<&mut Self>,
1365 _: &mut Context<'_>,
1366 buf: &[u8],
1367 ) -> Poll<Result<usize>> {
1368 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1369 }
1370
1371 fn poll_write_vectored(
1372 mut self: Pin<&mut Self>,
1373 _: &mut Context<'_>,
1374 bufs: &[IoSlice<'_>],
1375 ) -> Poll<Result<usize>> {
1376 Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1377 }
1378
1379 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1380 Poll::Ready(std::io::Write::flush(&mut self.inner))
1381 }
1382
1383 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1384 self.poll_flush(cx)
1385 }
1386}
1387
1388impl AsyncWrite for Cursor<&mut Vec<u8>> {
1389 fn poll_write(
1390 mut self: Pin<&mut Self>,
1391 _: &mut Context<'_>,
1392 buf: &[u8],
1393 ) -> Poll<Result<usize>> {
1394 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1395 }
1396
1397 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1398 self.poll_flush(cx)
1399 }
1400
1401 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1402 Poll::Ready(std::io::Write::flush(&mut self.inner))
1403 }
1404}
1405
1406impl AsyncWrite for Cursor<Vec<u8>> {
1407 fn poll_write(
1408 mut self: Pin<&mut Self>,
1409 _: &mut Context<'_>,
1410 buf: &[u8],
1411 ) -> Poll<Result<usize>> {
1412 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1413 }
1414
1415 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1416 self.poll_flush(cx)
1417 }
1418
1419 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1420 Poll::Ready(std::io::Write::flush(&mut self.inner))
1421 }
1422}
1423
1424/// Creates an empty reader.
1425///
1426/// # Examples
1427///
1428/// ```
1429/// use futures_lite::io::{self, AsyncReadExt};
1430///
1431/// # spin_on::spin_on(async {
1432/// let mut reader = io::empty();
1433///
1434/// let mut contents = Vec::new();
1435/// reader.read_to_end(&mut contents).await?;
1436/// assert!(contents.is_empty());
1437/// # std::io::Result::Ok(()) });
1438/// ```
1439pub fn empty() -> Empty {
1440 Empty { _private: () }
1441}
1442
1443/// Reader for the [`empty()`] function.
1444pub struct Empty {
1445 _private: (),
1446}
1447
1448impl fmt::Debug for Empty {
1449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1450 f.pad("Empty { .. }")
1451 }
1452}
1453
1454impl AsyncRead for Empty {
1455 #[inline]
1456 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1457 Poll::Ready(Ok(0))
1458 }
1459}
1460
1461impl AsyncBufRead for Empty {
1462 #[inline]
1463 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1464 Poll::Ready(Ok(&[]))
1465 }
1466
1467 #[inline]
1468 fn consume(self: Pin<&mut Self>, _: usize) {}
1469}
1470
1471/// Creates an infinite reader that reads the same byte repeatedly.
1472///
1473/// # Examples
1474///
1475/// ```
1476/// use futures_lite::io::{self, AsyncReadExt};
1477///
1478/// # spin_on::spin_on(async {
1479/// let mut reader = io::repeat(b'a');
1480///
1481/// let mut contents = vec![0; 5];
1482/// reader.read_exact(&mut contents).await?;
1483/// assert_eq!(contents, b"aaaaa");
1484/// # std::io::Result::Ok(()) });
1485/// ```
1486pub fn repeat(byte: u8) -> Repeat {
1487 Repeat { byte }
1488}
1489
1490/// Reader for the [`repeat()`] function.
1491#[derive(Debug)]
1492pub struct Repeat {
1493 byte: u8,
1494}
1495
1496impl AsyncRead for Repeat {
1497 #[inline]
1498 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1499 for b in &mut *buf {
1500 *b = self.byte;
1501 }
1502 Poll::Ready(Ok(buf.len()))
1503 }
1504}
1505
1506/// Creates a writer that consumes and drops all data.
1507///
1508/// # Examples
1509///
1510/// ```
1511/// use futures_lite::io::{self, AsyncWriteExt};
1512///
1513/// # spin_on::spin_on(async {
1514/// let mut writer = io::sink();
1515/// writer.write_all(b"hello").await?;
1516/// # std::io::Result::Ok(()) });
1517/// ```
1518pub fn sink() -> Sink {
1519 Sink { _private: () }
1520}
1521
1522/// Writer for the [`sink()`] function.
1523#[derive(Debug)]
1524pub struct Sink {
1525 _private: (),
1526}
1527
1528impl AsyncWrite for Sink {
1529 #[inline]
1530 fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1531 Poll::Ready(Ok(buf.len()))
1532 }
1533
1534 #[inline]
1535 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1536 Poll::Ready(Ok(()))
1537 }
1538
1539 #[inline]
1540 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1541 Poll::Ready(Ok(()))
1542 }
1543}
1544
1545/// Extension trait for [`AsyncBufRead`].
1546pub trait AsyncBufReadExt: AsyncBufRead {
1547 /// Returns the contents of the internal buffer, filling it with more data if empty.
1548 ///
1549 /// If the stream has reached EOF, an empty buffer will be returned.
1550 ///
1551 /// # Examples
1552 ///
1553 /// ```
1554 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1555 /// use std::pin::Pin;
1556 ///
1557 /// # spin_on::spin_on(async {
1558 /// let input: &[u8] = b"hello world";
1559 /// let mut reader = BufReader::with_capacity(5, input);
1560 ///
1561 /// assert_eq!(reader.fill_buf().await?, b"hello");
1562 /// reader.consume(2);
1563 /// assert_eq!(reader.fill_buf().await?, b"llo");
1564 /// reader.consume(3);
1565 /// assert_eq!(reader.fill_buf().await?, b" worl");
1566 /// # std::io::Result::Ok(()) });
1567 /// ```
1568 fn fill_buf(&mut self) -> FillBuf<'_, Self>
1569 where
1570 Self: Unpin,
1571 {
1572 FillBuf { reader: Some(self) }
1573 }
1574
1575 /// Consumes `amt` buffered bytes.
1576 ///
1577 /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1578 /// internal buffer.
1579 ///
1580 /// The `amt` must be <= the number of bytes in the buffer returned by
1581 /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```
1586 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1587 /// use std::pin::Pin;
1588 ///
1589 /// # spin_on::spin_on(async {
1590 /// let input: &[u8] = b"hello";
1591 /// let mut reader = BufReader::with_capacity(4, input);
1592 ///
1593 /// assert_eq!(reader.fill_buf().await?, b"hell");
1594 /// reader.consume(2);
1595 /// assert_eq!(reader.fill_buf().await?, b"ll");
1596 /// # std::io::Result::Ok(()) });
1597 /// ```
1598 fn consume(&mut self, amt: usize)
1599 where
1600 Self: Unpin,
1601 {
1602 AsyncBufRead::consume(Pin::new(self), amt);
1603 }
1604
1605 /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1606 ///
1607 /// This method will read bytes from the underlying stream until the delimiter or EOF is
1608 /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1609 ///
1610 /// If successful, returns the total number of bytes read.
1611 ///
1612 /// # Examples
1613 ///
1614 /// ```
1615 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1616 ///
1617 /// # spin_on::spin_on(async {
1618 /// let input: &[u8] = b"hello";
1619 /// let mut reader = BufReader::new(input);
1620 ///
1621 /// let mut buf = Vec::new();
1622 /// let n = reader.read_until(b'\n', &mut buf).await?;
1623 /// # std::io::Result::Ok(()) });
1624 /// ```
1625 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
1626 where
1627 Self: Unpin,
1628 {
1629 ReadUntilFuture {
1630 reader: self,
1631 byte,
1632 buf,
1633 read: 0,
1634 }
1635 }
1636
1637 /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1638 ///
1639 /// This method will read bytes from the underlying stream until the newline delimiter (the
1640 /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1641 /// will be appended to `buf`.
1642 ///
1643 /// If successful, returns the total number of bytes read.
1644 ///
1645 /// # Examples
1646 ///
1647 /// ```
1648 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1649 ///
1650 /// # spin_on::spin_on(async {
1651 /// let input: &[u8] = b"hello";
1652 /// let mut reader = BufReader::new(input);
1653 ///
1654 /// let mut line = String::new();
1655 /// let n = reader.read_line(&mut line).await?;
1656 /// # std::io::Result::Ok(()) });
1657 /// ```
1658 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
1659 where
1660 Self: Unpin,
1661 {
1662 ReadLineFuture {
1663 reader: self,
1664 buf,
1665 bytes: Vec::new(),
1666 read: 0,
1667 }
1668 }
1669
1670 /// Returns a stream over the lines of this byte stream.
1671 ///
1672 /// The stream returned from this method yields items of type
1673 /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1674 /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1675 /// at the end.
1676 ///
1677 /// # Examples
1678 ///
1679 /// ```
1680 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1681 /// use futures_lite::stream::StreamExt;
1682 ///
1683 /// # spin_on::spin_on(async {
1684 /// let input: &[u8] = b"hello\nworld\n";
1685 /// let mut reader = BufReader::new(input);
1686 /// let mut lines = reader.lines();
1687 ///
1688 /// while let Some(line) = lines.next().await {
1689 /// println!("{}", line?);
1690 /// }
1691 /// # std::io::Result::Ok(()) });
1692 /// ```
1693 fn lines(self) -> Lines<Self>
1694 where
1695 Self: Sized,
1696 {
1697 Lines {
1698 reader: self,
1699 buf: String::new(),
1700 bytes: Vec::new(),
1701 read: 0,
1702 }
1703 }
1704
1705 /// Returns a stream over the contents of this reader split on the specified `byte`.
1706 ///
1707 /// The stream returned from this method yields items of type
1708 /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1709 /// Each vector returned will *not* have the delimiter byte at the end.
1710 ///
1711 /// # Examples
1712 ///
1713 /// ```
1714 /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1715 /// use futures_lite::stream::StreamExt;
1716 ///
1717 /// # spin_on::spin_on(async {
1718 /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1719 /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1720 ///
1721 /// assert_eq!(items[0], b"lorem");
1722 /// assert_eq!(items[1], b"ipsum");
1723 /// assert_eq!(items[2], b"dolor");
1724 /// # std::io::Result::Ok(()) });
1725 /// ```
1726 fn split(self, byte: u8) -> Split<Self>
1727 where
1728 Self: Sized,
1729 {
1730 Split {
1731 reader: self,
1732 buf: Vec::new(),
1733 delim: byte,
1734 read: 0,
1735 }
1736 }
1737}
1738
1739impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1740
1741/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1742#[derive(Debug)]
1743#[must_use = "futures do nothing unless you `.await` or poll them"]
1744pub struct FillBuf<'a, R: ?Sized> {
1745 reader: Option<&'a mut R>,
1746}
1747
1748impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1749
1750impl<'a, R> Future for FillBuf<'a, R>
1751where
1752 R: AsyncBufRead + Unpin + ?Sized,
1753{
1754 type Output = Result<&'a [u8]>;
1755
1756 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1757 let this = &mut *self;
1758 let reader = this
1759 .reader
1760 .take()
1761 .expect("polled `FillBuf` after completion");
1762
1763 match Pin::new(&mut *reader).poll_fill_buf(cx) {
1764 Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1765 Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1766 poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1767 },
1768 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1769 Poll::Pending => {
1770 this.reader = Some(reader);
1771 Poll::Pending
1772 }
1773 }
1774 }
1775}
1776
1777/// Future for the [`AsyncBufReadExt::read_until()`] method.
1778#[derive(Debug)]
1779#[must_use = "futures do nothing unless you `.await` or poll them"]
1780pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1781 reader: &'a mut R,
1782 byte: u8,
1783 buf: &'a mut Vec<u8>,
1784 read: usize,
1785}
1786
1787impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1788
1789impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1790 type Output = Result<usize>;
1791
1792 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1793 let Self {
1794 reader,
1795 byte,
1796 buf,
1797 read,
1798 } = &mut *self;
1799 read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1800 }
1801}
1802
1803fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1804 mut reader: Pin<&mut R>,
1805 cx: &mut Context<'_>,
1806 byte: u8,
1807 buf: &mut Vec<u8>,
1808 read: &mut usize,
1809) -> Poll<Result<usize>> {
1810 loop {
1811 let (done, used) = {
1812 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1813
1814 if let Some(i) = memchr(byte, available) {
1815 buf.extend_from_slice(&available[..=i]);
1816 (true, i + 1)
1817 } else {
1818 buf.extend_from_slice(available);
1819 (false, available.len())
1820 }
1821 };
1822
1823 reader.as_mut().consume(used);
1824 *read += used;
1825
1826 if done || used == 0 {
1827 return Poll::Ready(Ok(mem::replace(read, 0)));
1828 }
1829 }
1830}
1831
1832/// Future for the [`AsyncBufReadExt::read_line()`] method.
1833#[derive(Debug)]
1834#[must_use = "futures do nothing unless you `.await` or poll them"]
1835pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1836 reader: &'a mut R,
1837 buf: &'a mut String,
1838 bytes: Vec<u8>,
1839 read: usize,
1840}
1841
1842impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1843
1844impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1845 type Output = Result<usize>;
1846
1847 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1848 let Self {
1849 reader,
1850 buf,
1851 bytes,
1852 read,
1853 } = &mut *self;
1854 read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1855 }
1856}
1857
1858pin_project! {
1859 /// Stream for the [`AsyncBufReadExt::lines()`] method.
1860 #[derive(Debug)]
1861 #[must_use = "streams do nothing unless polled"]
1862 pub struct Lines<R> {
1863 #[pin]
1864 reader: R,
1865 buf: String,
1866 bytes: Vec<u8>,
1867 read: usize,
1868 }
1869}
1870
1871impl<R: AsyncBufRead> Stream for Lines<R> {
1872 type Item = Result<String>;
1873
1874 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1875 let this = self.project();
1876
1877 let n = ready!(read_line_internal(
1878 this.reader,
1879 cx,
1880 this.buf,
1881 this.bytes,
1882 this.read
1883 ))?;
1884 if n == 0 && this.buf.is_empty() {
1885 return Poll::Ready(None);
1886 }
1887
1888 if this.buf.ends_with('\n') {
1889 this.buf.pop();
1890 if this.buf.ends_with('\r') {
1891 this.buf.pop();
1892 }
1893 }
1894 Poll::Ready(Some(Ok(mem::take(this.buf))))
1895 }
1896}
1897
1898fn read_line_internal<R: AsyncBufRead + ?Sized>(
1899 reader: Pin<&mut R>,
1900 cx: &mut Context<'_>,
1901 buf: &mut String,
1902 bytes: &mut Vec<u8>,
1903 read: &mut usize,
1904) -> Poll<Result<usize>> {
1905 let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1906
1907 match String::from_utf8(mem::take(bytes)) {
1908 Ok(s) => {
1909 debug_assert!(buf.is_empty());
1910 debug_assert_eq!(*read, 0);
1911 *buf = s;
1912 Poll::Ready(ret)
1913 }
1914 Err(_) => Poll::Ready(ret.and_then(|_| {
1915 Err(Error::new(
1916 ErrorKind::InvalidData,
1917 "stream did not contain valid UTF-8",
1918 ))
1919 })),
1920 }
1921}
1922
1923pin_project! {
1924 /// Stream for the [`AsyncBufReadExt::split()`] method.
1925 #[derive(Debug)]
1926 #[must_use = "streams do nothing unless polled"]
1927 pub struct Split<R> {
1928 #[pin]
1929 reader: R,
1930 buf: Vec<u8>,
1931 read: usize,
1932 delim: u8,
1933 }
1934}
1935
1936impl<R: AsyncBufRead> Stream for Split<R> {
1937 type Item = Result<Vec<u8>>;
1938
1939 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1940 let this = self.project();
1941
1942 let n = ready!(read_until_internal(
1943 this.reader,
1944 cx,
1945 *this.delim,
1946 this.buf,
1947 this.read
1948 ))?;
1949 if n == 0 && this.buf.is_empty() {
1950 return Poll::Ready(None);
1951 }
1952
1953 if this.buf[this.buf.len() - 1] == *this.delim {
1954 this.buf.pop();
1955 }
1956 Poll::Ready(Some(Ok(mem::take(this.buf))))
1957 }
1958}
1959
1960/// Extension trait for [`AsyncRead`].
1961pub trait AsyncReadExt: AsyncRead {
1962 /// Reads some bytes from the byte stream.
1963 ///
1964 /// On success, returns the total number of bytes read.
1965 ///
1966 /// If the return value is `Ok(n)`, then it must be guaranteed that
1967 /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1968 /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1969 /// scenarios:
1970 ///
1971 /// 1. This reader has reached its "end of file" and will likely no longer be able to
1972 /// produce bytes. Note that this does not mean that the reader will always no
1973 /// longer be able to produce bytes.
1974 /// 2. The buffer specified was 0 bytes in length.
1975 ///
1976 /// # Examples
1977 ///
1978 /// ```
1979 /// use futures_lite::io::{AsyncReadExt, BufReader};
1980 ///
1981 /// # spin_on::spin_on(async {
1982 /// let input: &[u8] = b"hello";
1983 /// let mut reader = BufReader::new(input);
1984 ///
1985 /// let mut buf = vec![0; 1024];
1986 /// let n = reader.read(&mut buf).await?;
1987 /// # std::io::Result::Ok(()) });
1988 /// ```
1989 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1990 where
1991 Self: Unpin,
1992 {
1993 ReadFuture { reader: self, buf }
1994 }
1995
1996 /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1997 ///
1998 /// Data is copied to fill each buffer in order, with the final buffer possibly being
1999 /// only partially filled. This method must behave same as a single call to
2000 /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2001 fn read_vectored<'a>(
2002 &'a mut self,
2003 bufs: &'a mut [IoSliceMut<'a>],
2004 ) -> ReadVectoredFuture<'a, Self>
2005 where
2006 Self: Unpin,
2007 {
2008 ReadVectoredFuture { reader: self, bufs }
2009 }
2010
2011 /// Reads the entire contents and appends them to a [`Vec`].
2012 ///
2013 /// On success, returns the total number of bytes read.
2014 ///
2015 /// # Examples
2016 ///
2017 /// ```
2018 /// use futures_lite::io::{AsyncReadExt, Cursor};
2019 ///
2020 /// # spin_on::spin_on(async {
2021 /// let mut reader = Cursor::new(vec![1, 2, 3]);
2022 /// let mut contents = Vec::new();
2023 ///
2024 /// let n = reader.read_to_end(&mut contents).await?;
2025 /// assert_eq!(n, 3);
2026 /// assert_eq!(contents, [1, 2, 3]);
2027 /// # std::io::Result::Ok(()) });
2028 /// ```
2029 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2030 where
2031 Self: Unpin,
2032 {
2033 let start_len = buf.len();
2034 ReadToEndFuture {
2035 reader: self,
2036 buf,
2037 start_len,
2038 }
2039 }
2040
2041 /// Reads the entire contents and appends them to a [`String`].
2042 ///
2043 /// On success, returns the total number of bytes read.
2044 ///
2045 /// # Examples
2046 ///
2047 /// ```
2048 /// use futures_lite::io::{AsyncReadExt, Cursor};
2049 ///
2050 /// # spin_on::spin_on(async {
2051 /// let mut reader = Cursor::new(&b"hello");
2052 /// let mut contents = String::new();
2053 ///
2054 /// let n = reader.read_to_string(&mut contents).await?;
2055 /// assert_eq!(n, 5);
2056 /// assert_eq!(contents, "hello");
2057 /// # std::io::Result::Ok(()) });
2058 /// ```
2059 fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2060 where
2061 Self: Unpin,
2062 {
2063 ReadToStringFuture {
2064 reader: self,
2065 buf,
2066 bytes: Vec::new(),
2067 start_len: 0,
2068 }
2069 }
2070
2071 /// Reads the exact number of bytes required to fill `buf`.
2072 ///
2073 /// # Examples
2074 ///
2075 /// ```
2076 /// use futures_lite::io::{AsyncReadExt, Cursor};
2077 ///
2078 /// # spin_on::spin_on(async {
2079 /// let mut reader = Cursor::new(&b"hello");
2080 /// let mut contents = vec![0; 3];
2081 ///
2082 /// reader.read_exact(&mut contents).await?;
2083 /// assert_eq!(contents, b"hel");
2084 /// # std::io::Result::Ok(()) });
2085 /// ```
2086 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2087 where
2088 Self: Unpin,
2089 {
2090 ReadExactFuture { reader: self, buf }
2091 }
2092
2093 /// Creates an adapter which will read at most `limit` bytes from it.
2094 ///
2095 /// This method returns a new instance of [`AsyncRead`] which will read at most
2096 /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2097 ///
2098 /// # Examples
2099 ///
2100 /// ```
2101 /// use futures_lite::io::{AsyncReadExt, Cursor};
2102 ///
2103 /// # spin_on::spin_on(async {
2104 /// let mut reader = Cursor::new(&b"hello");
2105 /// let mut contents = String::new();
2106 ///
2107 /// let n = reader.take(3).read_to_string(&mut contents).await?;
2108 /// assert_eq!(n, 3);
2109 /// assert_eq!(contents, "hel");
2110 /// # std::io::Result::Ok(()) });
2111 /// ```
2112 fn take(self, limit: u64) -> Take<Self>
2113 where
2114 Self: Sized,
2115 {
2116 Take { inner: self, limit }
2117 }
2118
2119 /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2120 ///
2121 /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2122 ///
2123 /// ```
2124 /// use futures_lite::io::{AsyncReadExt, Cursor};
2125 /// use futures_lite::stream::StreamExt;
2126 ///
2127 /// # spin_on::spin_on(async {
2128 /// let reader = Cursor::new(&b"hello");
2129 /// let mut bytes = reader.bytes();
2130 ///
2131 /// while let Some(byte) = bytes.next().await {
2132 /// println!("byte: {}", byte?);
2133 /// }
2134 /// # std::io::Result::Ok(()) });
2135 /// ```
2136 fn bytes(self) -> Bytes<Self>
2137 where
2138 Self: Sized,
2139 {
2140 Bytes { inner: self }
2141 }
2142
2143 /// Creates an adapter which will chain this stream with another.
2144 ///
2145 /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2146 /// until EOF is found, and then continue with `next`.
2147 ///
2148 /// # Examples
2149 ///
2150 /// ```
2151 /// use futures_lite::io::{AsyncReadExt, Cursor};
2152 ///
2153 /// # spin_on::spin_on(async {
2154 /// let r1 = Cursor::new(&b"hello");
2155 /// let r2 = Cursor::new(&b"world");
2156 /// let mut reader = r1.chain(r2);
2157 ///
2158 /// let mut contents = String::new();
2159 /// reader.read_to_string(&mut contents).await?;
2160 /// assert_eq!(contents, "helloworld");
2161 /// # std::io::Result::Ok(()) });
2162 /// ```
2163 fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2164 where
2165 Self: Sized,
2166 {
2167 Chain {
2168 first: self,
2169 second: next,
2170 done_first: false,
2171 }
2172 }
2173
2174 /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2175 ///
2176 /// # Examples
2177 ///
2178 /// ```
2179 /// use futures_lite::io::AsyncReadExt;
2180 ///
2181 /// let reader = [1, 2, 3].boxed_reader();
2182 /// ```
2183 #[cfg(feature = "alloc")]
2184 fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2185 where
2186 Self: Sized + Send + 'a,
2187 {
2188 Box::pin(self)
2189 }
2190}
2191
2192impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2193
2194/// Future for the [`AsyncReadExt::read()`] method.
2195#[derive(Debug)]
2196#[must_use = "futures do nothing unless you `.await` or poll them"]
2197pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2198 reader: &'a mut R,
2199 buf: &'a mut [u8],
2200}
2201
2202impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2203
2204impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2205 type Output = Result<usize>;
2206
2207 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2208 let Self { reader, buf } = &mut *self;
2209 Pin::new(reader).poll_read(cx, buf)
2210 }
2211}
2212
2213/// Future for the [`AsyncReadExt::read_vectored()`] method.
2214#[derive(Debug)]
2215#[must_use = "futures do nothing unless you `.await` or poll them"]
2216pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2217 reader: &'a mut R,
2218 bufs: &'a mut [IoSliceMut<'a>],
2219}
2220
2221impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2222
2223impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2224 type Output = Result<usize>;
2225
2226 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2227 let Self { reader, bufs } = &mut *self;
2228 Pin::new(reader).poll_read_vectored(cx, bufs)
2229 }
2230}
2231
2232/// Future for the [`AsyncReadExt::read_to_end()`] method.
2233#[derive(Debug)]
2234#[must_use = "futures do nothing unless you `.await` or poll them"]
2235pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2236 reader: &'a mut R,
2237 buf: &'a mut Vec<u8>,
2238 start_len: usize,
2239}
2240
2241impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2242
2243impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2244 type Output = Result<usize>;
2245
2246 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2247 let Self {
2248 reader,
2249 buf,
2250 start_len,
2251 } = &mut *self;
2252 read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2253 }
2254}
2255
2256/// Future for the [`AsyncReadExt::read_to_string()`] method.
2257#[derive(Debug)]
2258#[must_use = "futures do nothing unless you `.await` or poll them"]
2259pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2260 reader: &'a mut R,
2261 buf: &'a mut String,
2262 bytes: Vec<u8>,
2263 start_len: usize,
2264}
2265
2266impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2267
2268impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2269 type Output = Result<usize>;
2270
2271 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2272 let Self {
2273 reader,
2274 buf,
2275 bytes,
2276 start_len,
2277 } = &mut *self;
2278 let reader = Pin::new(reader);
2279
2280 let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2281
2282 match String::from_utf8(mem::take(bytes)) {
2283 Ok(s) => {
2284 debug_assert!(buf.is_empty());
2285 **buf = s;
2286 Poll::Ready(ret)
2287 }
2288 Err(_) => Poll::Ready(ret.and_then(|_| {
2289 Err(Error::new(
2290 ErrorKind::InvalidData,
2291 "stream did not contain valid UTF-8",
2292 ))
2293 })),
2294 }
2295 }
2296}
2297
2298// This uses an adaptive system to extend the vector when it fills. We want to
2299// avoid paying to allocate and zero a huge chunk of memory if the reader only
2300// has 4 bytes while still making large reads if the reader does have a ton
2301// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2302// time is 4,500 times (!) slower than this if the reader has a very small
2303// amount of data to return.
2304//
2305// Because we're extending the buffer with uninitialized data for trusted
2306// readers, we need to make sure to truncate that if any of this panics.
2307fn read_to_end_internal<R: AsyncRead + ?Sized>(
2308 mut rd: Pin<&mut R>,
2309 cx: &mut Context<'_>,
2310 buf: &mut Vec<u8>,
2311 start_len: usize,
2312) -> Poll<Result<usize>> {
2313 struct Guard<'a> {
2314 buf: &'a mut Vec<u8>,
2315 len: usize,
2316 }
2317
2318 impl Drop for Guard<'_> {
2319 fn drop(&mut self) {
2320 self.buf.resize(self.len, 0);
2321 }
2322 }
2323
2324 let mut g = Guard {
2325 len: buf.len(),
2326 buf,
2327 };
2328 let ret;
2329 loop {
2330 if g.len == g.buf.len() {
2331 g.buf.reserve(32);
2332 let capacity = g.buf.capacity();
2333 g.buf.resize(capacity, 0);
2334 }
2335
2336 match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2337 Ok(0) => {
2338 ret = Poll::Ready(Ok(g.len - start_len));
2339 break;
2340 }
2341 Ok(n) => g.len += n,
2342 Err(e) => {
2343 ret = Poll::Ready(Err(e));
2344 break;
2345 }
2346 }
2347 }
2348
2349 ret
2350}
2351
2352/// Future for the [`AsyncReadExt::read_exact()`] method.
2353#[derive(Debug)]
2354#[must_use = "futures do nothing unless you `.await` or poll them"]
2355pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2356 reader: &'a mut R,
2357 buf: &'a mut [u8],
2358}
2359
2360impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2361
2362impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2363 type Output = Result<()>;
2364
2365 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2366 let Self { reader, buf } = &mut *self;
2367
2368 while !buf.is_empty() {
2369 let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2370 let (_, rest) = mem::take(buf).split_at_mut(n);
2371 *buf = rest;
2372
2373 if n == 0 {
2374 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2375 }
2376 }
2377
2378 Poll::Ready(Ok(()))
2379 }
2380}
2381
2382pin_project! {
2383 /// Reader for the [`AsyncReadExt::take()`] method.
2384 #[derive(Debug)]
2385 pub struct Take<R> {
2386 #[pin]
2387 inner: R,
2388 limit: u64,
2389 }
2390}
2391
2392impl<R> Take<R> {
2393 /// Returns the number of bytes before this adapter will return EOF.
2394 ///
2395 /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2396 ///
2397 /// # Examples
2398 ///
2399 /// ```
2400 /// use futures_lite::io::{AsyncReadExt, Cursor};
2401 ///
2402 /// let reader = Cursor::new("hello");
2403 ///
2404 /// let reader = reader.take(3);
2405 /// assert_eq!(reader.limit(), 3);
2406 /// ```
2407 pub fn limit(&self) -> u64 {
2408 self.limit
2409 }
2410
2411 /// Puts a limit on the number of bytes.
2412 ///
2413 /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2414 ///
2415 /// # Examples
2416 ///
2417 /// ```
2418 /// use futures_lite::io::{AsyncReadExt, Cursor};
2419 ///
2420 /// let reader = Cursor::new("hello");
2421 ///
2422 /// let mut reader = reader.take(10);
2423 /// assert_eq!(reader.limit(), 10);
2424 ///
2425 /// reader.set_limit(3);
2426 /// assert_eq!(reader.limit(), 3);
2427 /// ```
2428 pub fn set_limit(&mut self, limit: u64) {
2429 self.limit = limit;
2430 }
2431
2432 /// Gets a reference to the underlying reader.
2433 ///
2434 /// # Examples
2435 ///
2436 /// ```
2437 /// use futures_lite::io::{AsyncReadExt, Cursor};
2438 ///
2439 /// let reader = Cursor::new("hello");
2440 ///
2441 /// let reader = reader.take(3);
2442 /// let r = reader.get_ref();
2443 /// ```
2444 pub fn get_ref(&self) -> &R {
2445 &self.inner
2446 }
2447
2448 /// Gets a mutable reference to the underlying reader.
2449 ///
2450 /// # Examples
2451 ///
2452 /// ```
2453 /// use futures_lite::io::{AsyncReadExt, Cursor};
2454 ///
2455 /// let reader = Cursor::new("hello");
2456 ///
2457 /// let mut reader = reader.take(3);
2458 /// let r = reader.get_mut();
2459 /// ```
2460 pub fn get_mut(&mut self) -> &mut R {
2461 &mut self.inner
2462 }
2463
2464 /// Unwraps the adapter, returning the underlying reader.
2465 ///
2466 /// # Examples
2467 ///
2468 /// ```
2469 /// use futures_lite::io::{AsyncReadExt, Cursor};
2470 ///
2471 /// let reader = Cursor::new("hello");
2472 ///
2473 /// let reader = reader.take(3);
2474 /// let reader = reader.into_inner();
2475 /// ```
2476 pub fn into_inner(self) -> R {
2477 self.inner
2478 }
2479}
2480
2481impl<R: AsyncRead> AsyncRead for Take<R> {
2482 fn poll_read(
2483 self: Pin<&mut Self>,
2484 cx: &mut Context<'_>,
2485 buf: &mut [u8],
2486 ) -> Poll<Result<usize>> {
2487 let this = self.project();
2488 take_read_internal(this.inner, cx, buf, this.limit)
2489 }
2490}
2491
2492fn take_read_internal<R: AsyncRead + ?Sized>(
2493 mut rd: Pin<&mut R>,
2494 cx: &mut Context<'_>,
2495 buf: &mut [u8],
2496 limit: &mut u64,
2497) -> Poll<Result<usize>> {
2498 // Don't call into inner reader at all at EOF because it may still block
2499 if *limit == 0 {
2500 return Poll::Ready(Ok(0));
2501 }
2502
2503 let max = cmp::min(buf.len() as u64, *limit) as usize;
2504
2505 match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2506 Ok(n) => {
2507 *limit -= n as u64;
2508 Poll::Ready(Ok(n))
2509 }
2510 Err(e) => Poll::Ready(Err(e)),
2511 }
2512}
2513
2514impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2515 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2516 let this = self.project();
2517
2518 if *this.limit == 0 {
2519 return Poll::Ready(Ok(&[]));
2520 }
2521
2522 match ready!(this.inner.poll_fill_buf(cx)) {
2523 Ok(buf) => {
2524 let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2525 Poll::Ready(Ok(&buf[..cap]))
2526 }
2527 Err(e) => Poll::Ready(Err(e)),
2528 }
2529 }
2530
2531 fn consume(self: Pin<&mut Self>, amt: usize) {
2532 let this = self.project();
2533 // Don't let callers reset the limit by passing an overlarge value
2534 let amt = cmp::min(amt as u64, *this.limit) as usize;
2535 *this.limit -= amt as u64;
2536
2537 this.inner.consume(amt);
2538 }
2539}
2540
2541pin_project! {
2542 /// Reader for the [`AsyncReadExt::bytes()`] method.
2543 #[derive(Debug)]
2544 pub struct Bytes<R> {
2545 #[pin]
2546 inner: R,
2547 }
2548}
2549
2550impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2551 type Item = Result<u8>;
2552
2553 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2554 let mut byte = 0;
2555
2556 let rd = Pin::new(&mut self.inner);
2557
2558 match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2559 Ok(0) => Poll::Ready(None),
2560 Ok(..) => Poll::Ready(Some(Ok(byte))),
2561 Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2562 Err(e) => Poll::Ready(Some(Err(e))),
2563 }
2564 }
2565}
2566
2567impl<R: AsyncRead> AsyncRead for Bytes<R> {
2568 fn poll_read(
2569 self: Pin<&mut Self>,
2570 cx: &mut Context<'_>,
2571 buf: &mut [u8],
2572 ) -> Poll<Result<usize>> {
2573 self.project().inner.poll_read(cx, buf)
2574 }
2575
2576 fn poll_read_vectored(
2577 self: Pin<&mut Self>,
2578 cx: &mut Context<'_>,
2579 bufs: &mut [IoSliceMut<'_>],
2580 ) -> Poll<Result<usize>> {
2581 self.project().inner.poll_read_vectored(cx, bufs)
2582 }
2583}
2584
2585pin_project! {
2586 /// Reader for the [`AsyncReadExt::chain()`] method.
2587 pub struct Chain<R1, R2> {
2588 #[pin]
2589 first: R1,
2590 #[pin]
2591 second: R2,
2592 done_first: bool,
2593 }
2594}
2595
2596impl<R1, R2> Chain<R1, R2> {
2597 /// Gets references to the underlying readers.
2598 ///
2599 /// # Examples
2600 ///
2601 /// ```
2602 /// use futures_lite::io::{AsyncReadExt, Cursor};
2603 ///
2604 /// let r1 = Cursor::new(b"hello");
2605 /// let r2 = Cursor::new(b"world");
2606 ///
2607 /// let reader = r1.chain(r2);
2608 /// let (r1, r2) = reader.get_ref();
2609 /// ```
2610 pub fn get_ref(&self) -> (&R1, &R2) {
2611 (&self.first, &self.second)
2612 }
2613
2614 /// Gets mutable references to the underlying readers.
2615 ///
2616 /// # Examples
2617 ///
2618 /// ```
2619 /// use futures_lite::io::{AsyncReadExt, Cursor};
2620 ///
2621 /// let r1 = Cursor::new(b"hello");
2622 /// let r2 = Cursor::new(b"world");
2623 ///
2624 /// let mut reader = r1.chain(r2);
2625 /// let (r1, r2) = reader.get_mut();
2626 /// ```
2627 pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2628 (&mut self.first, &mut self.second)
2629 }
2630
2631 /// Unwraps the adapter, returning the underlying readers.
2632 ///
2633 /// # Examples
2634 ///
2635 /// ```
2636 /// use futures_lite::io::{AsyncReadExt, Cursor};
2637 ///
2638 /// let r1 = Cursor::new(b"hello");
2639 /// let r2 = Cursor::new(b"world");
2640 ///
2641 /// let reader = r1.chain(r2);
2642 /// let (r1, r2) = reader.into_inner();
2643 /// ```
2644 pub fn into_inner(self) -> (R1, R2) {
2645 (self.first, self.second)
2646 }
2647}
2648
2649impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2650 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2651 f.debug_struct("Chain")
2652 .field("r1", &self.first)
2653 .field("r2", &self.second)
2654 .finish()
2655 }
2656}
2657
2658impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2659 fn poll_read(
2660 self: Pin<&mut Self>,
2661 cx: &mut Context<'_>,
2662 buf: &mut [u8],
2663 ) -> Poll<Result<usize>> {
2664 let this = self.project();
2665 if !*this.done_first {
2666 match ready!(this.first.poll_read(cx, buf)) {
2667 Ok(0) if !buf.is_empty() => *this.done_first = true,
2668 Ok(n) => return Poll::Ready(Ok(n)),
2669 Err(err) => return Poll::Ready(Err(err)),
2670 }
2671 }
2672
2673 this.second.poll_read(cx, buf)
2674 }
2675
2676 fn poll_read_vectored(
2677 self: Pin<&mut Self>,
2678 cx: &mut Context<'_>,
2679 bufs: &mut [IoSliceMut<'_>],
2680 ) -> Poll<Result<usize>> {
2681 let this = self.project();
2682 if !*this.done_first {
2683 match ready!(this.first.poll_read_vectored(cx, bufs)) {
2684 Ok(0) if !bufs.is_empty() => *this.done_first = true,
2685 Ok(n) => return Poll::Ready(Ok(n)),
2686 Err(err) => return Poll::Ready(Err(err)),
2687 }
2688 }
2689
2690 this.second.poll_read_vectored(cx, bufs)
2691 }
2692}
2693
2694impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2695 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2696 let this = self.project();
2697 if !*this.done_first {
2698 match ready!(this.first.poll_fill_buf(cx)) {
2699 Ok([]) => *this.done_first = true,
2700 Ok(buf) => return Poll::Ready(Ok(buf)),
2701 Err(err) => return Poll::Ready(Err(err)),
2702 }
2703 }
2704
2705 this.second.poll_fill_buf(cx)
2706 }
2707
2708 fn consume(self: Pin<&mut Self>, amt: usize) {
2709 let this = self.project();
2710 if !*this.done_first {
2711 this.first.consume(amt)
2712 } else {
2713 this.second.consume(amt)
2714 }
2715 }
2716}
2717
2718/// Extension trait for [`AsyncSeek`].
2719pub trait AsyncSeekExt: AsyncSeek {
2720 /// Seeks to a new position in a byte stream.
2721 ///
2722 /// Returns the new position in the byte stream.
2723 ///
2724 /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2725 ///
2726 /// # Examples
2727 ///
2728 /// ```
2729 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2730 ///
2731 /// # spin_on::spin_on(async {
2732 /// let mut cursor = Cursor::new("hello");
2733 ///
2734 /// // Move the cursor to the end.
2735 /// cursor.seek(SeekFrom::End(0)).await?;
2736 ///
2737 /// // Check the current position.
2738 /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2739 /// # std::io::Result::Ok(()) });
2740 /// ```
2741 fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2742 where
2743 Self: Unpin,
2744 {
2745 SeekFuture { seeker: self, pos }
2746 }
2747}
2748
2749impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2750
2751/// Future for the [`AsyncSeekExt::seek()`] method.
2752#[derive(Debug)]
2753#[must_use = "futures do nothing unless you `.await` or poll them"]
2754pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2755 seeker: &'a mut S,
2756 pos: SeekFrom,
2757}
2758
2759impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2760
2761impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2762 type Output = Result<u64>;
2763
2764 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2765 let pos = self.pos;
2766 Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2767 }
2768}
2769
2770/// Extension trait for [`AsyncWrite`].
2771pub trait AsyncWriteExt: AsyncWrite {
2772 /// Writes some bytes into the byte stream.
2773 ///
2774 /// Returns the number of bytes written from the start of the buffer.
2775 ///
2776 /// If the return value is `Ok(n)` then it must be guaranteed that
2777 /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2778 /// object is no longer able to accept bytes and will likely not be able to in the
2779 /// future as well, or that the provided buffer is empty.
2780 ///
2781 /// # Examples
2782 ///
2783 /// ```
2784 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2785 ///
2786 /// # spin_on::spin_on(async {
2787 /// let mut output = Vec::new();
2788 /// let mut writer = BufWriter::new(&mut output);
2789 ///
2790 /// let n = writer.write(b"hello").await?;
2791 /// # std::io::Result::Ok(()) });
2792 /// ```
2793 fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2794 where
2795 Self: Unpin,
2796 {
2797 WriteFuture { writer: self, buf }
2798 }
2799
2800 /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2801 ///
2802 /// Data is copied from each buffer in order, with the final buffer possibly being only
2803 /// partially consumed. This method must behave same as a call to
2804 /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2805 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2806 where
2807 Self: Unpin,
2808 {
2809 WriteVectoredFuture { writer: self, bufs }
2810 }
2811
2812 /// Writes an entire buffer into the byte stream.
2813 ///
2814 /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2815 /// data to be written or an error occurs. It will not return before the entire buffer is
2816 /// successfully written or an error occurs.
2817 ///
2818 /// # Examples
2819 ///
2820 /// ```
2821 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2822 ///
2823 /// # spin_on::spin_on(async {
2824 /// let mut output = Vec::new();
2825 /// let mut writer = BufWriter::new(&mut output);
2826 ///
2827 /// let n = writer.write_all(b"hello").await?;
2828 /// # std::io::Result::Ok(()) });
2829 /// ```
2830 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2831 where
2832 Self: Unpin,
2833 {
2834 WriteAllFuture { writer: self, buf }
2835 }
2836
2837 /// Flushes the stream to ensure that all buffered contents reach their destination.
2838 ///
2839 /// # Examples
2840 ///
2841 /// ```
2842 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2843 ///
2844 /// # spin_on::spin_on(async {
2845 /// let mut output = Vec::new();
2846 /// let mut writer = BufWriter::new(&mut output);
2847 ///
2848 /// writer.write_all(b"hello").await?;
2849 /// writer.flush().await?;
2850 /// # std::io::Result::Ok(()) });
2851 /// ```
2852 fn flush(&mut self) -> FlushFuture<'_, Self>
2853 where
2854 Self: Unpin,
2855 {
2856 FlushFuture { writer: self }
2857 }
2858
2859 /// Closes the writer.
2860 ///
2861 /// # Examples
2862 ///
2863 /// ```
2864 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2865 ///
2866 /// # spin_on::spin_on(async {
2867 /// let mut output = Vec::new();
2868 /// let mut writer = BufWriter::new(&mut output);
2869 ///
2870 /// writer.close().await?;
2871 /// # std::io::Result::Ok(()) });
2872 /// ```
2873 fn close(&mut self) -> CloseFuture<'_, Self>
2874 where
2875 Self: Unpin,
2876 {
2877 CloseFuture { writer: self }
2878 }
2879
2880 /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2881 ///
2882 /// # Examples
2883 ///
2884 /// ```
2885 /// use futures_lite::io::AsyncWriteExt;
2886 ///
2887 /// let writer = Vec::<u8>::new().boxed_writer();
2888 /// ```
2889 #[cfg(feature = "alloc")]
2890 fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2891 where
2892 Self: Sized + Send + 'a,
2893 {
2894 Box::pin(self)
2895 }
2896}
2897
2898impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2899
2900/// Future for the [`AsyncWriteExt::write()`] method.
2901#[derive(Debug)]
2902#[must_use = "futures do nothing unless you `.await` or poll them"]
2903pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2904 writer: &'a mut W,
2905 buf: &'a [u8],
2906}
2907
2908impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2909
2910impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2911 type Output = Result<usize>;
2912
2913 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2914 let buf = self.buf;
2915 Pin::new(&mut *self.writer).poll_write(cx, buf)
2916 }
2917}
2918
2919/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2920#[derive(Debug)]
2921#[must_use = "futures do nothing unless you `.await` or poll them"]
2922pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2923 writer: &'a mut W,
2924 bufs: &'a [IoSlice<'a>],
2925}
2926
2927impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2928
2929impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2930 type Output = Result<usize>;
2931
2932 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2933 let bufs = self.bufs;
2934 Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2935 }
2936}
2937
2938/// Future for the [`AsyncWriteExt::write_all()`] method.
2939#[derive(Debug)]
2940#[must_use = "futures do nothing unless you `.await` or poll them"]
2941pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2942 writer: &'a mut W,
2943 buf: &'a [u8],
2944}
2945
2946impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2947
2948impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2949 type Output = Result<()>;
2950
2951 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2952 let Self { writer, buf } = &mut *self;
2953
2954 while !buf.is_empty() {
2955 let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2956 let (_, rest) = mem::take(buf).split_at(n);
2957 *buf = rest;
2958
2959 if n == 0 {
2960 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2961 }
2962 }
2963
2964 Poll::Ready(Ok(()))
2965 }
2966}
2967
2968/// Future for the [`AsyncWriteExt::flush()`] method.
2969#[derive(Debug)]
2970#[must_use = "futures do nothing unless you `.await` or poll them"]
2971pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2972 writer: &'a mut W,
2973}
2974
2975impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2976
2977impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2978 type Output = Result<()>;
2979
2980 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2981 Pin::new(&mut *self.writer).poll_flush(cx)
2982 }
2983}
2984
2985/// Future for the [`AsyncWriteExt::close()`] method.
2986#[derive(Debug)]
2987#[must_use = "futures do nothing unless you `.await` or poll them"]
2988pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2989 writer: &'a mut W,
2990}
2991
2992impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2993
2994impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2995 type Output = Result<()>;
2996
2997 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2998 Pin::new(&mut *self.writer).poll_close(cx)
2999 }
3000}
3001
3002/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3003///
3004/// # Examples
3005///
3006/// ```
3007/// use futures_lite::io::AsyncReadExt;
3008///
3009/// let reader = [1, 2, 3].boxed_reader();
3010/// ```
3011#[cfg(feature = "alloc")]
3012pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3013
3014/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3015///
3016/// # Examples
3017///
3018/// ```
3019/// use futures_lite::io::AsyncWriteExt;
3020///
3021/// let writer = Vec::<u8>::new().boxed_writer();
3022/// ```
3023#[cfg(feature = "alloc")]
3024pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3025
3026/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3027///
3028/// # Examples
3029///
3030/// ```
3031/// use futures_lite::io::{self, Cursor};
3032///
3033/// # spin_on::spin_on(async {
3034/// let stream = Cursor::new(vec![]);
3035/// let (mut reader, mut writer) = io::split(stream);
3036/// # std::io::Result::Ok(()) });
3037/// ```
3038pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3039where
3040 T: AsyncRead + AsyncWrite + Unpin,
3041{
3042 let inner = Arc::new(Mutex::new(stream));
3043 (ReadHalf(inner.clone()), WriteHalf(inner))
3044}
3045
3046/// The read half returned by [`split()`].
3047#[derive(Debug)]
3048pub struct ReadHalf<T>(Arc<Mutex<T>>);
3049
3050/// The write half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct WriteHalf<T>(Arc<Mutex<T>>);
3053
3054impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3055 fn poll_read(
3056 self: Pin<&mut Self>,
3057 cx: &mut Context<'_>,
3058 buf: &mut [u8],
3059 ) -> Poll<Result<usize>> {
3060 let mut inner = self.0.lock().unwrap();
3061 Pin::new(&mut *inner).poll_read(cx, buf)
3062 }
3063
3064 fn poll_read_vectored(
3065 self: Pin<&mut Self>,
3066 cx: &mut Context<'_>,
3067 bufs: &mut [IoSliceMut<'_>],
3068 ) -> Poll<Result<usize>> {
3069 let mut inner = self.0.lock().unwrap();
3070 Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3071 }
3072}
3073
3074impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3075 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3076 let mut inner = self.0.lock().unwrap();
3077 Pin::new(&mut *inner).poll_write(cx, buf)
3078 }
3079
3080 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3081 let mut inner = self.0.lock().unwrap();
3082 Pin::new(&mut *inner).poll_flush(cx)
3083 }
3084
3085 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3086 let mut inner = self.0.lock().unwrap();
3087 Pin::new(&mut *inner).poll_close(cx)
3088 }
3089}
3090
3091#[cfg(feature = "memchr")]
3092use memchr::memchr;
3093
3094/// Unoptimized memchr fallback.
3095#[cfg(not(feature = "memchr"))]
3096fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
3097 haystack.iter().position(|&b| b == needle)
3098}