futures_lite/
io.rs

1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::cmp;
25use std::fmt;
26use std::future::Future;
27use std::io::{IoSlice, IoSliceMut};
28use std::mem;
29use std::pin::Pin;
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll};
32
33use futures_core::stream::Stream;
34use pin_project_lite::pin_project;
35
36use crate::future;
37use crate::ready;
38
39const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40
41/// Copies the entire contents of a reader into a writer.
42///
43/// This function will read data from `reader` and write it into `writer` in a streaming fashion
44/// until `reader` returns EOF.
45///
46/// On success, returns the total number of bytes copied.
47///
48/// # Examples
49///
50/// ```
51/// use futures_lite::io::{self, BufReader, BufWriter};
52///
53/// # spin_on::spin_on(async {
54/// let input: &[u8] = b"hello";
55/// let reader = BufReader::new(input);
56///
57/// let mut output = Vec::new();
58/// let writer = BufWriter::new(&mut output);
59///
60/// io::copy(reader, writer).await?;
61/// # std::io::Result::Ok(()) });
62/// ```
63pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
64where
65    R: AsyncRead,
66    W: AsyncWrite,
67{
68    pin_project! {
69        struct CopyFuture<R, W> {
70            #[pin]
71            reader: R,
72            #[pin]
73            writer: W,
74            amt: u64,
75        }
76    }
77
78    impl<R, W> Future for CopyFuture<R, W>
79    where
80        R: AsyncBufRead,
81        W: AsyncWrite,
82    {
83        type Output = Result<u64>;
84
85        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86            let mut this = self.project();
87            loop {
88                let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
89                if buffer.is_empty() {
90                    ready!(this.writer.as_mut().poll_flush(cx))?;
91                    return Poll::Ready(Ok(*this.amt));
92                }
93
94                let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
95                if i == 0 {
96                    return Poll::Ready(Err(ErrorKind::WriteZero.into()));
97                }
98                *this.amt += i as u64;
99                this.reader.as_mut().consume(i);
100            }
101        }
102    }
103
104    let future = CopyFuture {
105        reader: BufReader::new(reader),
106        writer,
107        amt: 0,
108    };
109    future.await
110}
111
112/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
113///
114/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
115/// This is usually the case for in-memory buffered I/O.
116///
117/// # Examples
118///
119/// ```
120/// use futures_lite::io::{AssertAsync, AsyncReadExt};
121///
122/// let reader: &[u8] = b"hello";
123///
124/// # spin_on::spin_on(async {
125/// let mut async_reader = AssertAsync::new(reader);
126/// let mut contents = String::new();
127///
128/// // This line works in async manner - note that there is await:
129/// async_reader.read_to_string(&mut contents).await?;
130/// # std::io::Result::Ok(()) });
131/// ```
132#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
133pub struct AssertAsync<T>(T);
134
135impl<T> Unpin for AssertAsync<T> {}
136
137impl<T> AssertAsync<T> {
138    /// Wraps an I/O handle implementing [`std::io`] traits.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use futures_lite::io::AssertAsync;
144    ///
145    /// let reader: &[u8] = b"hello";
146    ///
147    /// let async_reader = AssertAsync::new(reader);
148    /// ```
149    #[inline(always)]
150    pub fn new(io: T) -> Self {
151        AssertAsync(io)
152    }
153
154    /// Gets a reference to the inner I/O handle.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use futures_lite::io::AssertAsync;
160    ///
161    /// let reader: &[u8] = b"hello";
162    ///
163    /// let async_reader = AssertAsync::new(reader);
164    /// let r = async_reader.get_ref();
165    /// ```
166    #[inline(always)]
167    pub fn get_ref(&self) -> &T {
168        &self.0
169    }
170
171    /// Gets a mutable reference to the inner I/O handle.
172    ///
173    /// # Examples
174    ///
175    /// ```
176    /// use futures_lite::io::AssertAsync;
177    ///
178    /// let reader: &[u8] = b"hello";
179    ///
180    /// let mut async_reader = AssertAsync::new(reader);
181    /// let r = async_reader.get_mut();
182    /// ```
183    #[inline(always)]
184    pub fn get_mut(&mut self) -> &mut T {
185        &mut self.0
186    }
187
188    /// Extracts the inner I/O handle.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use futures_lite::io::AssertAsync;
194    ///
195    /// let reader: &[u8] = b"hello";
196    ///
197    /// let async_reader = AssertAsync::new(reader);
198    /// let inner = async_reader.into_inner();
199    /// ```
200    #[inline(always)]
201    pub fn into_inner(self) -> T {
202        self.0
203    }
204}
205
206fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
207where
208    F: FnMut() -> std::io::Result<T>,
209{
210    loop {
211        match f() {
212            Err(err) if err.kind() == ErrorKind::Interrupted => {}
213            res => return Poll::Ready(res),
214        }
215    }
216}
217
218impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
219    #[inline]
220    fn poll_read(
221        mut self: Pin<&mut Self>,
222        _: &mut Context<'_>,
223        buf: &mut [u8],
224    ) -> Poll<Result<usize>> {
225        assert_async_wrapio(move || self.0.read(buf))
226    }
227
228    #[inline]
229    fn poll_read_vectored(
230        mut self: Pin<&mut Self>,
231        _: &mut Context<'_>,
232        bufs: &mut [IoSliceMut<'_>],
233    ) -> Poll<Result<usize>> {
234        assert_async_wrapio(move || self.0.read_vectored(bufs))
235    }
236}
237
238impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
239    #[inline]
240    fn poll_write(
241        mut self: Pin<&mut Self>,
242        _: &mut Context<'_>,
243        buf: &[u8],
244    ) -> Poll<Result<usize>> {
245        assert_async_wrapio(move || self.0.write(buf))
246    }
247
248    #[inline]
249    fn poll_write_vectored(
250        mut self: Pin<&mut Self>,
251        _: &mut Context<'_>,
252        bufs: &[IoSlice<'_>],
253    ) -> Poll<Result<usize>> {
254        assert_async_wrapio(move || self.0.write_vectored(bufs))
255    }
256
257    #[inline]
258    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
259        assert_async_wrapio(move || self.0.flush())
260    }
261
262    #[inline]
263    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
264        self.poll_flush(cx)
265    }
266}
267
268impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
269    #[inline]
270    fn poll_seek(
271        mut self: Pin<&mut Self>,
272        _: &mut Context<'_>,
273        pos: SeekFrom,
274    ) -> Poll<Result<u64>> {
275        assert_async_wrapio(move || self.0.seek(pos))
276    }
277}
278
279/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
280/// polls to `WouldBlock` errors.
281///
282/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
283/// that take `Read` as a parameter.
284///
285/// # Examples
286///
287/// ```
288/// use std::io::Read;
289/// use std::task::{Poll, Context};
290///
291/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
292///     // Assume we have a library that's built around `Read` and `Write` traits.
293///     use cooltls::Session;
294///
295///     // We want to use it with our writer that implements `AsyncWrite`.
296///     let writer = Stream::new();
297///
298///     // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
299///     use futures_lite::io::AsyncAsSync;
300///     let writer = AsyncAsSync::new(cx, writer);
301///
302///     // Now, we can use it with `cooltls`.
303///     let mut session = Session::new(writer);
304///
305///     // Match on the result of `read()` and translate it to poll.
306///     match session.read(&mut [0; 1024]) {
307///         Ok(n) => Poll::Ready(n),
308///         Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
309///         Err(err) => panic!("unexpected error: {}", err),
310///     }
311/// }
312///
313/// // Usually, poll-based functions are best wrapped using `poll_fn`.
314/// use futures_lite::future::poll_fn;
315/// # futures_lite::future::block_on(async {
316/// poll_fn(|cx| poll_for_io(cx)).await;
317/// # });
318/// # struct Stream;
319/// # impl Stream {
320/// #     fn new() -> Stream {
321/// #         Stream
322/// #     }
323/// # }
324/// # impl futures_lite::io::AsyncRead for Stream {
325/// #     fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
326/// #         Poll::Ready(Ok(0))
327/// #     }
328/// # }
329/// # mod cooltls {
330/// #     pub struct Session<W> {
331/// #         reader: W,
332/// #     }
333/// #     impl<W> Session<W> {
334/// #         pub fn new(reader: W) -> Session<W> {
335/// #             Session { reader }
336/// #         }
337/// #     }
338/// #     impl<W: std::io::Read> std::io::Read for Session<W> {
339/// #         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
340/// #             self.reader.read(buf)
341/// #         }
342/// #     }
343/// # }
344/// ```
345#[derive(Debug)]
346pub struct AsyncAsSync<'r, 'ctx, T> {
347    /// The context we are using to poll the future.
348    pub context: &'r mut Context<'ctx>,
349
350    /// The actual reader/writer we are wrapping.
351    pub inner: T,
352}
353
354impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
355    /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// use futures_lite::io::AsyncAsSync;
361    /// use std::task::Context;
362    /// use waker_fn::waker_fn;
363    ///
364    /// let reader: &[u8] = b"hello";
365    /// let waker = waker_fn(|| {});
366    /// let mut context = Context::from_waker(&waker);
367    ///
368    /// let async_reader = AsyncAsSync::new(&mut context, reader);
369    /// ```
370    #[inline]
371    pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
372        AsyncAsSync { context, inner }
373    }
374
375    /// Attempt to shutdown the I/O handle.
376    ///
377    /// # Examples
378    ///
379    /// ```
380    /// use futures_lite::io::AsyncAsSync;
381    /// use std::task::Context;
382    /// use waker_fn::waker_fn;
383    ///
384    /// let reader: Vec<u8> = b"hello".to_vec();
385    /// let waker = waker_fn(|| {});
386    /// let mut context = Context::from_waker(&waker);
387    ///
388    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
389    /// async_reader.close().unwrap();
390    /// ```
391    #[inline]
392    pub fn close(&mut self) -> Result<()>
393    where
394        T: AsyncWrite + Unpin,
395    {
396        self.poll_with(|io, cx| io.poll_close(cx))
397    }
398
399    /// Poll this `AsyncAsSync` for some function.
400    ///
401    /// # Examples
402    ///
403    /// ```
404    /// use futures_lite::io::{AsyncAsSync, AsyncRead};
405    /// use std::task::Context;
406    /// use waker_fn::waker_fn;
407    ///
408    /// let reader: &[u8] = b"hello";
409    /// let waker = waker_fn(|| {});
410    /// let mut context = Context::from_waker(&waker);
411    ///
412    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
413    /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
414    /// assert_eq!(r.unwrap(), 5);
415    /// ```
416    #[inline]
417    pub fn poll_with<R>(
418        &mut self,
419        f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
420    ) -> Result<R>
421    where
422        T: Unpin,
423    {
424        match f(Pin::new(&mut self.inner), self.context) {
425            Poll::Ready(res) => res,
426            Poll::Pending => Err(ErrorKind::WouldBlock.into()),
427        }
428    }
429}
430
431impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
432    #[inline]
433    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
434        self.poll_with(|io, cx| io.poll_read(cx, buf))
435    }
436
437    #[inline]
438    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
439        self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
440    }
441}
442
443impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
444    #[inline]
445    fn write(&mut self, buf: &[u8]) -> Result<usize> {
446        self.poll_with(|io, cx| io.poll_write(cx, buf))
447    }
448
449    #[inline]
450    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
451        self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
452    }
453
454    #[inline]
455    fn flush(&mut self) -> Result<()> {
456        self.poll_with(|io, cx| io.poll_flush(cx))
457    }
458}
459
460impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
461    #[inline]
462    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
463        self.poll_with(|io, cx| io.poll_seek(cx, pos))
464    }
465}
466
467impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
468    #[inline]
469    fn as_ref(&self) -> &T {
470        &self.inner
471    }
472}
473
474impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
475    #[inline]
476    fn as_mut(&mut self) -> &mut T {
477        &mut self.inner
478    }
479}
480
481impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
482    #[inline]
483    fn borrow(&self) -> &T {
484        &self.inner
485    }
486}
487
488impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
489    #[inline]
490    fn borrow_mut(&mut self) -> &mut T {
491        &mut self.inner
492    }
493}
494
495/// Blocks on all async I/O operations and implements [`std::io`] traits.
496///
497/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
498/// manually all the time becomes too tedious, use this type for more convenient blocking on async
499/// I/O operations.
500///
501/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
502/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
503/// [`AsyncSeek`], respectively.
504///
505/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
506/// dropping the [`BlockOn`] handle or some buffered data might get lost.
507///
508/// # Examples
509///
510/// ```
511/// use futures_lite::io::BlockOn;
512/// use futures_lite::pin;
513/// use std::io::Read;
514///
515/// let reader: &[u8] = b"hello";
516/// pin!(reader);
517///
518/// let mut blocking_reader = BlockOn::new(reader);
519/// let mut contents = String::new();
520///
521/// // This line blocks - note that there is no await:
522/// blocking_reader.read_to_string(&mut contents)?;
523/// # std::io::Result::Ok(())
524/// ```
525#[derive(Debug)]
526pub struct BlockOn<T>(T);
527
528impl<T> BlockOn<T> {
529    /// Wraps an async I/O handle into a blocking interface.
530    ///
531    /// # Examples
532    ///
533    /// ```
534    /// use futures_lite::io::BlockOn;
535    /// use futures_lite::pin;
536    ///
537    /// let reader: &[u8] = b"hello";
538    /// pin!(reader);
539    ///
540    /// let blocking_reader = BlockOn::new(reader);
541    /// ```
542    pub fn new(io: T) -> BlockOn<T> {
543        BlockOn(io)
544    }
545
546    /// Gets a reference to the async I/O handle.
547    ///
548    /// # Examples
549    ///
550    /// ```
551    /// use futures_lite::io::BlockOn;
552    /// use futures_lite::pin;
553    ///
554    /// let reader: &[u8] = b"hello";
555    /// pin!(reader);
556    ///
557    /// let blocking_reader = BlockOn::new(reader);
558    /// let r = blocking_reader.get_ref();
559    /// ```
560    pub fn get_ref(&self) -> &T {
561        &self.0
562    }
563
564    /// Gets a mutable reference to the async I/O handle.
565    ///
566    /// # Examples
567    ///
568    /// ```
569    /// use futures_lite::io::BlockOn;
570    /// use futures_lite::pin;
571    ///
572    /// let reader: &[u8] = b"hello";
573    /// pin!(reader);
574    ///
575    /// let mut blocking_reader = BlockOn::new(reader);
576    /// let r = blocking_reader.get_mut();
577    /// ```
578    pub fn get_mut(&mut self) -> &mut T {
579        &mut self.0
580    }
581
582    /// Extracts the inner async I/O handle.
583    ///
584    /// # Examples
585    ///
586    /// ```
587    /// use futures_lite::io::BlockOn;
588    /// use futures_lite::pin;
589    ///
590    /// let reader: &[u8] = b"hello";
591    /// pin!(reader);
592    ///
593    /// let blocking_reader = BlockOn::new(reader);
594    /// let inner = blocking_reader.into_inner();
595    /// ```
596    pub fn into_inner(self) -> T {
597        self.0
598    }
599}
600
601impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
602    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
603        future::block_on(self.0.read(buf))
604    }
605}
606
607impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
608    fn fill_buf(&mut self) -> Result<&[u8]> {
609        future::block_on(self.0.fill_buf())
610    }
611
612    fn consume(&mut self, amt: usize) {
613        Pin::new(&mut self.0).consume(amt)
614    }
615}
616
617impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
618    fn write(&mut self, buf: &[u8]) -> Result<usize> {
619        future::block_on(self.0.write(buf))
620    }
621
622    fn flush(&mut self) -> Result<()> {
623        future::block_on(self.0.flush())
624    }
625}
626
627impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
628    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
629        future::block_on(self.0.seek(pos))
630    }
631}
632
633pin_project! {
634    /// Adds buffering to a reader.
635    ///
636    /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
637    /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
638    /// maintains an in-memory buffer of the incoming byte stream.
639    ///
640    /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
641    /// the same file or networking socket. It does not help when reading very large amounts at
642    /// once, or reading just once or a few times. It also provides no advantage when reading from
643    /// a source that is already in memory, like a `Vec<u8>`.
644    ///
645    /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
646    /// multiple instances of [`BufReader`] on the same reader can cause data loss.
647    ///
648    /// # Examples
649    ///
650    /// ```
651    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
652    ///
653    /// # spin_on::spin_on(async {
654    /// let input: &[u8] = b"hello";
655    /// let mut reader = BufReader::new(input);
656    ///
657    /// let mut line = String::new();
658    /// reader.read_line(&mut line).await?;
659    /// # std::io::Result::Ok(()) });
660    /// ```
661    pub struct BufReader<R> {
662        #[pin]
663        inner: R,
664        buf: Box<[u8]>,
665        pos: usize,
666        cap: usize,
667    }
668}
669
670impl<R: AsyncRead> BufReader<R> {
671    /// Creates a buffered reader with the default buffer capacity.
672    ///
673    /// The default capacity is currently 8 KB, but that may change in the future.
674    ///
675    /// # Examples
676    ///
677    /// ```
678    /// use futures_lite::io::BufReader;
679    ///
680    /// let input: &[u8] = b"hello";
681    /// let reader = BufReader::new(input);
682    /// ```
683    pub fn new(inner: R) -> BufReader<R> {
684        BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
685    }
686
687    /// Creates a buffered reader with the specified capacity.
688    ///
689    /// # Examples
690    ///
691    /// ```
692    /// use futures_lite::io::BufReader;
693    ///
694    /// let input: &[u8] = b"hello";
695    /// let reader = BufReader::with_capacity(1024, input);
696    /// ```
697    pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
698        BufReader {
699            inner,
700            buf: vec![0; capacity].into_boxed_slice(),
701            pos: 0,
702            cap: 0,
703        }
704    }
705}
706
707impl<R> BufReader<R> {
708    /// Gets a reference to the underlying reader.
709    ///
710    /// It is not advisable to directly read from the underlying reader.
711    ///
712    /// # Examples
713    ///
714    /// ```
715    /// use futures_lite::io::BufReader;
716    ///
717    /// let input: &[u8] = b"hello";
718    /// let reader = BufReader::new(input);
719    ///
720    /// let r = reader.get_ref();
721    /// ```
722    pub fn get_ref(&self) -> &R {
723        &self.inner
724    }
725
726    /// Gets a mutable reference to the underlying reader.
727    ///
728    /// It is not advisable to directly read from the underlying reader.
729    ///
730    /// # Examples
731    ///
732    /// ```
733    /// use futures_lite::io::BufReader;
734    ///
735    /// let input: &[u8] = b"hello";
736    /// let mut reader = BufReader::new(input);
737    ///
738    /// let r = reader.get_mut();
739    /// ```
740    pub fn get_mut(&mut self) -> &mut R {
741        &mut self.inner
742    }
743
744    /// Gets a pinned mutable reference to the underlying reader.
745    ///
746    /// It is not advisable to directly read from the underlying reader.
747    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
748        self.project().inner
749    }
750
751    /// Returns a reference to the internal buffer.
752    ///
753    /// This method will not attempt to fill the buffer if it is empty.
754    ///
755    /// # Examples
756    ///
757    /// ```
758    /// use futures_lite::io::BufReader;
759    ///
760    /// let input: &[u8] = b"hello";
761    /// let reader = BufReader::new(input);
762    ///
763    /// // The internal buffer is empty until the first read request.
764    /// assert_eq!(reader.buffer(), &[]);
765    /// ```
766    pub fn buffer(&self) -> &[u8] {
767        &self.buf[self.pos..self.cap]
768    }
769
770    /// Unwraps the buffered reader, returning the underlying reader.
771    ///
772    /// Note that any leftover data in the internal buffer will be lost.
773    ///
774    /// # Examples
775    ///
776    /// ```
777    /// use futures_lite::io::BufReader;
778    ///
779    /// let input: &[u8] = b"hello";
780    /// let reader = BufReader::new(input);
781    ///
782    /// assert_eq!(reader.into_inner(), input);
783    /// ```
784    pub fn into_inner(self) -> R {
785        self.inner
786    }
787
788    /// Invalidates all data in the internal buffer.
789    #[inline]
790    fn discard_buffer(self: Pin<&mut Self>) {
791        let this = self.project();
792        *this.pos = 0;
793        *this.cap = 0;
794    }
795}
796
797impl<R: AsyncRead> AsyncRead for BufReader<R> {
798    fn poll_read(
799        mut self: Pin<&mut Self>,
800        cx: &mut Context<'_>,
801        buf: &mut [u8],
802    ) -> Poll<Result<usize>> {
803        // If we don't have any buffered data and we're doing a massive read
804        // (larger than our internal buffer), bypass our internal buffer
805        // entirely.
806        if self.pos == self.cap && buf.len() >= self.buf.len() {
807            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
808            self.discard_buffer();
809            return Poll::Ready(res);
810        }
811        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
812        let nread = std::io::Read::read(&mut rem, buf)?;
813        self.consume(nread);
814        Poll::Ready(Ok(nread))
815    }
816
817    fn poll_read_vectored(
818        mut self: Pin<&mut Self>,
819        cx: &mut Context<'_>,
820        bufs: &mut [IoSliceMut<'_>],
821    ) -> Poll<Result<usize>> {
822        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
823        if self.pos == self.cap && total_len >= self.buf.len() {
824            let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
825            self.discard_buffer();
826            return Poll::Ready(res);
827        }
828        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
829        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
830        self.consume(nread);
831        Poll::Ready(Ok(nread))
832    }
833}
834
835impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
836    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
837        let mut this = self.project();
838
839        // If we've reached the end of our internal buffer then we need to fetch
840        // some more data from the underlying reader.
841        // Branch using `>=` instead of the more correct `==`
842        // to tell the compiler that the pos..cap slice is always valid.
843        if *this.pos >= *this.cap {
844            debug_assert!(*this.pos == *this.cap);
845            *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
846            *this.pos = 0;
847        }
848        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
849    }
850
851    fn consume(self: Pin<&mut Self>, amt: usize) {
852        let this = self.project();
853        *this.pos = cmp::min(*this.pos + amt, *this.cap);
854    }
855}
856
857impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
858    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
859        f.debug_struct("BufReader")
860            .field("reader", &self.inner)
861            .field(
862                "buffer",
863                &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
864            )
865            .finish()
866    }
867}
868
869impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
870    /// Seeks to an offset, in bytes, in the underlying reader.
871    ///
872    /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
873    /// reader would be at if the [`BufReader`] had no internal buffer.
874    ///
875    /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
876    /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
877    /// immediately after a seek yields the underlying reader at the same position.
878    ///
879    /// See [`AsyncSeek`] for more details.
880    ///
881    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
882    /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
883    /// the second seek returns `Err`, the underlying reader will be left at the same position it
884    /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
885    fn poll_seek(
886        mut self: Pin<&mut Self>,
887        cx: &mut Context<'_>,
888        pos: SeekFrom,
889    ) -> Poll<Result<u64>> {
890        let result: u64;
891        if let SeekFrom::Current(n) = pos {
892            let remainder = (self.cap - self.pos) as i64;
893            // it should be safe to assume that remainder fits within an i64 as the alternative
894            // means we managed to allocate 8 exbibytes and that's absurd.
895            // But it's not out of the realm of possibility for some weird underlying reader to
896            // support seeking by i64::min_value() so we need to handle underflow when subtracting
897            // remainder.
898            if let Some(offset) = n.checked_sub(remainder) {
899                result = ready!(self
900                    .as_mut()
901                    .get_pin_mut()
902                    .poll_seek(cx, SeekFrom::Current(offset)))?;
903            } else {
904                // seek backwards by our remainder, and then by the offset
905                ready!(self
906                    .as_mut()
907                    .get_pin_mut()
908                    .poll_seek(cx, SeekFrom::Current(-remainder)))?;
909                self.as_mut().discard_buffer();
910                result = ready!(self
911                    .as_mut()
912                    .get_pin_mut()
913                    .poll_seek(cx, SeekFrom::Current(n)))?;
914            }
915        } else {
916            // Seeking with Start/End doesn't care about our buffer length.
917            result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
918        }
919        self.discard_buffer();
920        Poll::Ready(Ok(result))
921    }
922}
923
924impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
925    fn poll_write(
926        mut self: Pin<&mut Self>,
927        cx: &mut Context<'_>,
928        buf: &[u8],
929    ) -> Poll<Result<usize>> {
930        self.as_mut().get_pin_mut().poll_write(cx, buf)
931    }
932
933    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
934        self.as_mut().get_pin_mut().poll_flush(cx)
935    }
936
937    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938        self.as_mut().get_pin_mut().poll_close(cx)
939    }
940}
941
942pin_project! {
943    /// Adds buffering to a writer.
944    ///
945    /// It can be excessively inefficient to work directly with something that implements
946    /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
947    /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
948    /// writes it to the underlying writer in large, infrequent batches.
949    ///
950    /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
951    /// the same file or networking socket. It does not help when writing very large amounts at
952    /// once, or writing just once or a few times. It also provides no advantage when writing to a
953    /// destination that is in memory, like a `Vec<u8>`.
954    ///
955    /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
956    /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
957    /// dropping the [`BufWriter`].
958    ///
959    /// # Examples
960    ///
961    /// ```
962    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
963    ///
964    /// # spin_on::spin_on(async {
965    /// let mut output = Vec::new();
966    /// let mut writer = BufWriter::new(&mut output);
967    ///
968    /// writer.write_all(b"hello").await?;
969    /// writer.flush().await?;
970    /// # std::io::Result::Ok(()) });
971    /// ```
972    pub struct BufWriter<W> {
973        #[pin]
974        inner: W,
975        buf: Vec<u8>,
976        written: usize,
977    }
978}
979
980impl<W: AsyncWrite> BufWriter<W> {
981    /// Creates a buffered writer with the default buffer capacity.
982    ///
983    /// The default capacity is currently 8 KB, but that may change in the future.
984    ///
985    /// # Examples
986    ///
987    /// ```
988    /// use futures_lite::io::BufWriter;
989    ///
990    /// let mut output = Vec::new();
991    /// let writer = BufWriter::new(&mut output);
992    /// ```
993    pub fn new(inner: W) -> BufWriter<W> {
994        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
995    }
996
997    /// Creates a buffered writer with the specified buffer capacity.
998    ///
999    /// # Examples
1000    ///
1001    /// ```
1002    /// use futures_lite::io::BufWriter;
1003    ///
1004    /// let mut output = Vec::new();
1005    /// let writer = BufWriter::with_capacity(100, &mut output);
1006    /// ```
1007    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1008        BufWriter {
1009            inner,
1010            buf: Vec::with_capacity(capacity),
1011            written: 0,
1012        }
1013    }
1014
1015    /// Gets a reference to the underlying writer.
1016    ///
1017    /// # Examples
1018    ///
1019    /// ```
1020    /// use futures_lite::io::BufWriter;
1021    ///
1022    /// let mut output = Vec::new();
1023    /// let writer = BufWriter::new(&mut output);
1024    ///
1025    /// let r = writer.get_ref();
1026    /// ```
1027    pub fn get_ref(&self) -> &W {
1028        &self.inner
1029    }
1030
1031    /// Gets a mutable reference to the underlying writer.
1032    ///
1033    /// It is not advisable to directly write to the underlying writer.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```
1038    /// use futures_lite::io::BufWriter;
1039    ///
1040    /// let mut output = Vec::new();
1041    /// let mut writer = BufWriter::new(&mut output);
1042    ///
1043    /// let r = writer.get_mut();
1044    /// ```
1045    pub fn get_mut(&mut self) -> &mut W {
1046        &mut self.inner
1047    }
1048
1049    /// Gets a pinned mutable reference to the underlying writer.
1050    ///
1051    /// It is not not advisable to directly write to the underlying writer.
1052    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1053        self.project().inner
1054    }
1055
1056    /// Unwraps the buffered writer, returning the underlying writer.
1057    ///
1058    /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1059    /// that data, flush the buffered writer before unwrapping it.
1060    ///
1061    /// # Examples
1062    ///
1063    /// ```
1064    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1065    ///
1066    /// # spin_on::spin_on(async {
1067    /// let mut output = vec![1, 2, 3];
1068    /// let mut writer = BufWriter::new(&mut output);
1069    ///
1070    /// writer.write_all(&[4]).await?;
1071    /// writer.flush().await?;
1072    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1073    /// # std::io::Result::Ok(()) });
1074    /// ```
1075    pub fn into_inner(self) -> W {
1076        self.inner
1077    }
1078
1079    /// Returns a reference to the internal buffer.
1080    ///
1081    /// # Examples
1082    ///
1083    /// ```
1084    /// use futures_lite::io::BufWriter;
1085    ///
1086    /// let mut output = Vec::new();
1087    /// let writer = BufWriter::new(&mut output);
1088    ///
1089    /// // The internal buffer is empty until the first write request.
1090    /// assert_eq!(writer.buffer(), &[]);
1091    /// ```
1092    pub fn buffer(&self) -> &[u8] {
1093        &self.buf
1094    }
1095
1096    /// Flush the buffer.
1097    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1098        let mut this = self.project();
1099        let len = this.buf.len();
1100        let mut ret = Ok(());
1101
1102        while *this.written < len {
1103            match this
1104                .inner
1105                .as_mut()
1106                .poll_write(cx, &this.buf[*this.written..])
1107            {
1108                Poll::Ready(Ok(0)) => {
1109                    ret = Err(Error::new(
1110                        ErrorKind::WriteZero,
1111                        "Failed to write buffered data",
1112                    ));
1113                    break;
1114                }
1115                Poll::Ready(Ok(n)) => *this.written += n,
1116                Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1117                Poll::Ready(Err(e)) => {
1118                    ret = Err(e);
1119                    break;
1120                }
1121                Poll::Pending => return Poll::Pending,
1122            }
1123        }
1124
1125        if *this.written > 0 {
1126            this.buf.drain(..*this.written);
1127        }
1128        *this.written = 0;
1129
1130        Poll::Ready(ret)
1131    }
1132}
1133
1134impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136        f.debug_struct("BufWriter")
1137            .field("writer", &self.inner)
1138            .field("buf", &self.buf)
1139            .finish()
1140    }
1141}
1142
1143impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1144    fn poll_write(
1145        mut self: Pin<&mut Self>,
1146        cx: &mut Context<'_>,
1147        buf: &[u8],
1148    ) -> Poll<Result<usize>> {
1149        if self.buf.len() + buf.len() > self.buf.capacity() {
1150            ready!(self.as_mut().poll_flush_buf(cx))?;
1151        }
1152        if buf.len() >= self.buf.capacity() {
1153            self.get_pin_mut().poll_write(cx, buf)
1154        } else {
1155            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1156        }
1157    }
1158
1159    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160        ready!(self.as_mut().poll_flush_buf(cx))?;
1161        self.get_pin_mut().poll_flush(cx)
1162    }
1163
1164    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1165        ready!(self.as_mut().poll_flush_buf(cx))?;
1166        self.get_pin_mut().poll_close(cx)
1167    }
1168}
1169
1170impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1171    /// Seek to the offset, in bytes, in the underlying writer.
1172    ///
1173    /// Seeking always writes out the internal buffer before seeking.
1174    fn poll_seek(
1175        mut self: Pin<&mut Self>,
1176        cx: &mut Context<'_>,
1177        pos: SeekFrom,
1178    ) -> Poll<Result<u64>> {
1179        ready!(self.as_mut().poll_flush_buf(cx))?;
1180        self.get_pin_mut().poll_seek(cx, pos)
1181    }
1182}
1183
1184/// Gives an in-memory buffer a cursor for reading and writing.
1185///
1186/// # Examples
1187///
1188/// ```
1189/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1190///
1191/// # spin_on::spin_on(async {
1192/// let mut bytes = b"hello".to_vec();
1193/// let mut cursor = Cursor::new(&mut bytes);
1194///
1195/// // Overwrite 'h' with 'H'.
1196/// cursor.write_all(b"H").await?;
1197///
1198/// // Move the cursor one byte forward.
1199/// cursor.seek(SeekFrom::Current(1)).await?;
1200///
1201/// // Read a byte.
1202/// let mut byte = [0];
1203/// cursor.read_exact(&mut byte).await?;
1204/// assert_eq!(&byte, b"l");
1205///
1206/// // Check the final buffer.
1207/// assert_eq!(bytes, b"Hello");
1208/// # std::io::Result::Ok(()) });
1209/// ```
1210#[derive(Clone, Debug, Default)]
1211pub struct Cursor<T> {
1212    inner: std::io::Cursor<T>,
1213}
1214
1215impl<T> Cursor<T> {
1216    /// Creates a cursor for an in-memory buffer.
1217    ///
1218    /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1219    /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1220    /// the buffer using [`set_position()`][Cursor::set_position()`] or
1221    /// [`seek()`][`AsyncSeekExt::seek()`].
1222    ///
1223    /// # Examples
1224    ///
1225    /// ```
1226    /// use futures_lite::io::Cursor;
1227    ///
1228    /// let cursor = Cursor::new(Vec::<u8>::new());
1229    /// ```
1230    pub fn new(inner: T) -> Cursor<T> {
1231        Cursor {
1232            inner: std::io::Cursor::new(inner),
1233        }
1234    }
1235
1236    /// Gets a reference to the underlying buffer.
1237    ///
1238    /// # Examples
1239    ///
1240    /// ```
1241    /// use futures_lite::io::Cursor;
1242    ///
1243    /// let cursor = Cursor::new(Vec::<u8>::new());
1244    /// let r = cursor.get_ref();
1245    /// ```
1246    pub fn get_ref(&self) -> &T {
1247        self.inner.get_ref()
1248    }
1249
1250    /// Gets a mutable reference to the underlying buffer.
1251    ///
1252    /// # Examples
1253    ///
1254    /// ```
1255    /// use futures_lite::io::Cursor;
1256    ///
1257    /// let mut cursor = Cursor::new(Vec::<u8>::new());
1258    /// let r = cursor.get_mut();
1259    /// ```
1260    pub fn get_mut(&mut self) -> &mut T {
1261        self.inner.get_mut()
1262    }
1263
1264    /// Unwraps the cursor, returning the underlying buffer.
1265    ///
1266    /// # Examples
1267    ///
1268    /// ```
1269    /// use futures_lite::io::Cursor;
1270    ///
1271    /// let cursor = Cursor::new(vec![1, 2, 3]);
1272    /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1273    /// ```
1274    pub fn into_inner(self) -> T {
1275        self.inner.into_inner()
1276    }
1277
1278    /// Returns the current position of this cursor.
1279    ///
1280    /// # Examples
1281    ///
1282    /// ```
1283    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1284    ///
1285    /// # spin_on::spin_on(async {
1286    /// let mut cursor = Cursor::new(b"hello");
1287    /// assert_eq!(cursor.position(), 0);
1288    ///
1289    /// cursor.seek(SeekFrom::Start(2)).await?;
1290    /// assert_eq!(cursor.position(), 2);
1291    /// # std::io::Result::Ok(()) });
1292    /// ```
1293    pub fn position(&self) -> u64 {
1294        self.inner.position()
1295    }
1296
1297    /// Sets the position of this cursor.
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```
1302    /// use futures_lite::io::Cursor;
1303    ///
1304    /// let mut cursor = Cursor::new(b"hello");
1305    /// assert_eq!(cursor.position(), 0);
1306    ///
1307    /// cursor.set_position(2);
1308    /// assert_eq!(cursor.position(), 2);
1309    /// ```
1310    pub fn set_position(&mut self, pos: u64) {
1311        self.inner.set_position(pos)
1312    }
1313}
1314
1315impl<T> AsyncSeek for Cursor<T>
1316where
1317    T: AsRef<[u8]> + Unpin,
1318{
1319    fn poll_seek(
1320        mut self: Pin<&mut Self>,
1321        _: &mut Context<'_>,
1322        pos: SeekFrom,
1323    ) -> Poll<Result<u64>> {
1324        Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1325    }
1326}
1327
1328impl<T> AsyncRead for Cursor<T>
1329where
1330    T: AsRef<[u8]> + Unpin,
1331{
1332    fn poll_read(
1333        mut self: Pin<&mut Self>,
1334        _cx: &mut Context<'_>,
1335        buf: &mut [u8],
1336    ) -> Poll<Result<usize>> {
1337        Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1338    }
1339
1340    fn poll_read_vectored(
1341        mut self: Pin<&mut Self>,
1342        _: &mut Context<'_>,
1343        bufs: &mut [IoSliceMut<'_>],
1344    ) -> Poll<Result<usize>> {
1345        Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1346    }
1347}
1348
1349impl<T> AsyncBufRead for Cursor<T>
1350where
1351    T: AsRef<[u8]> + Unpin,
1352{
1353    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1354        Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1355    }
1356
1357    fn consume(mut self: Pin<&mut Self>, amt: usize) {
1358        std::io::BufRead::consume(&mut self.inner, amt)
1359    }
1360}
1361
1362impl AsyncWrite for Cursor<&mut [u8]> {
1363    fn poll_write(
1364        mut self: Pin<&mut Self>,
1365        _: &mut Context<'_>,
1366        buf: &[u8],
1367    ) -> Poll<Result<usize>> {
1368        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1369    }
1370
1371    fn poll_write_vectored(
1372        mut self: Pin<&mut Self>,
1373        _: &mut Context<'_>,
1374        bufs: &[IoSlice<'_>],
1375    ) -> Poll<Result<usize>> {
1376        Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1377    }
1378
1379    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1380        Poll::Ready(std::io::Write::flush(&mut self.inner))
1381    }
1382
1383    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1384        self.poll_flush(cx)
1385    }
1386}
1387
1388impl AsyncWrite for Cursor<&mut Vec<u8>> {
1389    fn poll_write(
1390        mut self: Pin<&mut Self>,
1391        _: &mut Context<'_>,
1392        buf: &[u8],
1393    ) -> Poll<Result<usize>> {
1394        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1395    }
1396
1397    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1398        self.poll_flush(cx)
1399    }
1400
1401    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1402        Poll::Ready(std::io::Write::flush(&mut self.inner))
1403    }
1404}
1405
1406impl AsyncWrite for Cursor<Vec<u8>> {
1407    fn poll_write(
1408        mut self: Pin<&mut Self>,
1409        _: &mut Context<'_>,
1410        buf: &[u8],
1411    ) -> Poll<Result<usize>> {
1412        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1413    }
1414
1415    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1416        self.poll_flush(cx)
1417    }
1418
1419    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1420        Poll::Ready(std::io::Write::flush(&mut self.inner))
1421    }
1422}
1423
1424/// Creates an empty reader.
1425///
1426/// # Examples
1427///
1428/// ```
1429/// use futures_lite::io::{self, AsyncReadExt};
1430///
1431/// # spin_on::spin_on(async {
1432/// let mut reader = io::empty();
1433///
1434/// let mut contents = Vec::new();
1435/// reader.read_to_end(&mut contents).await?;
1436/// assert!(contents.is_empty());
1437/// # std::io::Result::Ok(()) });
1438/// ```
1439pub fn empty() -> Empty {
1440    Empty { _private: () }
1441}
1442
1443/// Reader for the [`empty()`] function.
1444pub struct Empty {
1445    _private: (),
1446}
1447
1448impl fmt::Debug for Empty {
1449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1450        f.pad("Empty { .. }")
1451    }
1452}
1453
1454impl AsyncRead for Empty {
1455    #[inline]
1456    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1457        Poll::Ready(Ok(0))
1458    }
1459}
1460
1461impl AsyncBufRead for Empty {
1462    #[inline]
1463    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1464        Poll::Ready(Ok(&[]))
1465    }
1466
1467    #[inline]
1468    fn consume(self: Pin<&mut Self>, _: usize) {}
1469}
1470
1471/// Creates an infinite reader that reads the same byte repeatedly.
1472///
1473/// # Examples
1474///
1475/// ```
1476/// use futures_lite::io::{self, AsyncReadExt};
1477///
1478/// # spin_on::spin_on(async {
1479/// let mut reader = io::repeat(b'a');
1480///
1481/// let mut contents = vec![0; 5];
1482/// reader.read_exact(&mut contents).await?;
1483/// assert_eq!(contents, b"aaaaa");
1484/// # std::io::Result::Ok(()) });
1485/// ```
1486pub fn repeat(byte: u8) -> Repeat {
1487    Repeat { byte }
1488}
1489
1490/// Reader for the [`repeat()`] function.
1491#[derive(Debug)]
1492pub struct Repeat {
1493    byte: u8,
1494}
1495
1496impl AsyncRead for Repeat {
1497    #[inline]
1498    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1499        for b in &mut *buf {
1500            *b = self.byte;
1501        }
1502        Poll::Ready(Ok(buf.len()))
1503    }
1504}
1505
1506/// Creates a writer that consumes and drops all data.
1507///
1508/// # Examples
1509///
1510/// ```
1511/// use futures_lite::io::{self, AsyncWriteExt};
1512///
1513/// # spin_on::spin_on(async {
1514/// let mut writer = io::sink();
1515/// writer.write_all(b"hello").await?;
1516/// # std::io::Result::Ok(()) });
1517/// ```
1518pub fn sink() -> Sink {
1519    Sink { _private: () }
1520}
1521
1522/// Writer for the [`sink()`] function.
1523#[derive(Debug)]
1524pub struct Sink {
1525    _private: (),
1526}
1527
1528impl AsyncWrite for Sink {
1529    #[inline]
1530    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1531        Poll::Ready(Ok(buf.len()))
1532    }
1533
1534    #[inline]
1535    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1536        Poll::Ready(Ok(()))
1537    }
1538
1539    #[inline]
1540    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1541        Poll::Ready(Ok(()))
1542    }
1543}
1544
1545/// Extension trait for [`AsyncBufRead`].
1546pub trait AsyncBufReadExt: AsyncBufRead {
1547    /// Returns the contents of the internal buffer, filling it with more data if empty.
1548    ///
1549    /// If the stream has reached EOF, an empty buffer will be returned.
1550    ///
1551    /// # Examples
1552    ///
1553    /// ```
1554    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1555    /// use std::pin::Pin;
1556    ///
1557    /// # spin_on::spin_on(async {
1558    /// let input: &[u8] = b"hello world";
1559    /// let mut reader = BufReader::with_capacity(5, input);
1560    ///
1561    /// assert_eq!(reader.fill_buf().await?, b"hello");
1562    /// reader.consume(2);
1563    /// assert_eq!(reader.fill_buf().await?, b"llo");
1564    /// reader.consume(3);
1565    /// assert_eq!(reader.fill_buf().await?, b" worl");
1566    /// # std::io::Result::Ok(()) });
1567    /// ```
1568    fn fill_buf(&mut self) -> FillBuf<'_, Self>
1569    where
1570        Self: Unpin,
1571    {
1572        FillBuf { reader: Some(self) }
1573    }
1574
1575    /// Consumes `amt` buffered bytes.
1576    ///
1577    /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1578    /// internal buffer.
1579    ///
1580    /// The `amt` must be <= the number of bytes in the buffer returned by
1581    /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1582    ///
1583    /// # Examples
1584    ///
1585    /// ```
1586    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1587    /// use std::pin::Pin;
1588    ///
1589    /// # spin_on::spin_on(async {
1590    /// let input: &[u8] = b"hello";
1591    /// let mut reader = BufReader::with_capacity(4, input);
1592    ///
1593    /// assert_eq!(reader.fill_buf().await?, b"hell");
1594    /// reader.consume(2);
1595    /// assert_eq!(reader.fill_buf().await?, b"ll");
1596    /// # std::io::Result::Ok(()) });
1597    /// ```
1598    fn consume(&mut self, amt: usize)
1599    where
1600        Self: Unpin,
1601    {
1602        AsyncBufRead::consume(Pin::new(self), amt);
1603    }
1604
1605    /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1606    ///
1607    /// This method will read bytes from the underlying stream until the delimiter or EOF is
1608    /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1609    ///
1610    /// If successful, returns the total number of bytes read.
1611    ///
1612    /// # Examples
1613    ///
1614    /// ```
1615    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1616    ///
1617    /// # spin_on::spin_on(async {
1618    /// let input: &[u8] = b"hello";
1619    /// let mut reader = BufReader::new(input);
1620    ///
1621    /// let mut buf = Vec::new();
1622    /// let n = reader.read_until(b'\n', &mut buf).await?;
1623    /// # std::io::Result::Ok(()) });
1624    /// ```
1625    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
1626    where
1627        Self: Unpin,
1628    {
1629        ReadUntilFuture {
1630            reader: self,
1631            byte,
1632            buf,
1633            read: 0,
1634        }
1635    }
1636
1637    /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1638    ///
1639    /// This method will read bytes from the underlying stream until the newline delimiter (the
1640    /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1641    /// will be appended to `buf`.
1642    ///
1643    /// If successful, returns the total number of bytes read.
1644    ///
1645    /// # Examples
1646    ///
1647    /// ```
1648    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1649    ///
1650    /// # spin_on::spin_on(async {
1651    /// let input: &[u8] = b"hello";
1652    /// let mut reader = BufReader::new(input);
1653    ///
1654    /// let mut line = String::new();
1655    /// let n = reader.read_line(&mut line).await?;
1656    /// # std::io::Result::Ok(()) });
1657    /// ```
1658    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
1659    where
1660        Self: Unpin,
1661    {
1662        ReadLineFuture {
1663            reader: self,
1664            buf,
1665            bytes: Vec::new(),
1666            read: 0,
1667        }
1668    }
1669
1670    /// Returns a stream over the lines of this byte stream.
1671    ///
1672    /// The stream returned from this method yields items of type
1673    /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1674    /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1675    /// at the end.
1676    ///
1677    /// # Examples
1678    ///
1679    /// ```
1680    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1681    /// use futures_lite::stream::StreamExt;
1682    ///
1683    /// # spin_on::spin_on(async {
1684    /// let input: &[u8] = b"hello\nworld\n";
1685    /// let mut reader = BufReader::new(input);
1686    /// let mut lines = reader.lines();
1687    ///
1688    /// while let Some(line) = lines.next().await {
1689    ///     println!("{}", line?);
1690    /// }
1691    /// # std::io::Result::Ok(()) });
1692    /// ```
1693    fn lines(self) -> Lines<Self>
1694    where
1695        Self: Sized,
1696    {
1697        Lines {
1698            reader: self,
1699            buf: String::new(),
1700            bytes: Vec::new(),
1701            read: 0,
1702        }
1703    }
1704
1705    /// Returns a stream over the contents of this reader split on the specified `byte`.
1706    ///
1707    /// The stream returned from this method yields items of type
1708    /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1709    /// Each vector returned will *not* have the delimiter byte at the end.
1710    ///
1711    /// # Examples
1712    ///
1713    /// ```
1714    /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1715    /// use futures_lite::stream::StreamExt;
1716    ///
1717    /// # spin_on::spin_on(async {
1718    /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1719    /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1720    ///
1721    /// assert_eq!(items[0], b"lorem");
1722    /// assert_eq!(items[1], b"ipsum");
1723    /// assert_eq!(items[2], b"dolor");
1724    /// # std::io::Result::Ok(()) });
1725    /// ```
1726    fn split(self, byte: u8) -> Split<Self>
1727    where
1728        Self: Sized,
1729    {
1730        Split {
1731            reader: self,
1732            buf: Vec::new(),
1733            delim: byte,
1734            read: 0,
1735        }
1736    }
1737}
1738
1739impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1740
1741/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1742#[derive(Debug)]
1743#[must_use = "futures do nothing unless you `.await` or poll them"]
1744pub struct FillBuf<'a, R: ?Sized> {
1745    reader: Option<&'a mut R>,
1746}
1747
1748impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1749
1750impl<'a, R> Future for FillBuf<'a, R>
1751where
1752    R: AsyncBufRead + Unpin + ?Sized,
1753{
1754    type Output = Result<&'a [u8]>;
1755
1756    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1757        let this = &mut *self;
1758        let reader = this
1759            .reader
1760            .take()
1761            .expect("polled `FillBuf` after completion");
1762
1763        match Pin::new(&mut *reader).poll_fill_buf(cx) {
1764            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1765                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1766                poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1767            },
1768            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1769            Poll::Pending => {
1770                this.reader = Some(reader);
1771                Poll::Pending
1772            }
1773        }
1774    }
1775}
1776
1777/// Future for the [`AsyncBufReadExt::read_until()`] method.
1778#[derive(Debug)]
1779#[must_use = "futures do nothing unless you `.await` or poll them"]
1780pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1781    reader: &'a mut R,
1782    byte: u8,
1783    buf: &'a mut Vec<u8>,
1784    read: usize,
1785}
1786
1787impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1788
1789impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1790    type Output = Result<usize>;
1791
1792    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1793        let Self {
1794            reader,
1795            byte,
1796            buf,
1797            read,
1798        } = &mut *self;
1799        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1800    }
1801}
1802
1803fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1804    mut reader: Pin<&mut R>,
1805    cx: &mut Context<'_>,
1806    byte: u8,
1807    buf: &mut Vec<u8>,
1808    read: &mut usize,
1809) -> Poll<Result<usize>> {
1810    loop {
1811        let (done, used) = {
1812            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1813
1814            if let Some(i) = memchr(byte, available) {
1815                buf.extend_from_slice(&available[..=i]);
1816                (true, i + 1)
1817            } else {
1818                buf.extend_from_slice(available);
1819                (false, available.len())
1820            }
1821        };
1822
1823        reader.as_mut().consume(used);
1824        *read += used;
1825
1826        if done || used == 0 {
1827            return Poll::Ready(Ok(mem::replace(read, 0)));
1828        }
1829    }
1830}
1831
1832/// Future for the [`AsyncBufReadExt::read_line()`] method.
1833#[derive(Debug)]
1834#[must_use = "futures do nothing unless you `.await` or poll them"]
1835pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1836    reader: &'a mut R,
1837    buf: &'a mut String,
1838    bytes: Vec<u8>,
1839    read: usize,
1840}
1841
1842impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1843
1844impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1845    type Output = Result<usize>;
1846
1847    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1848        let Self {
1849            reader,
1850            buf,
1851            bytes,
1852            read,
1853        } = &mut *self;
1854        read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1855    }
1856}
1857
1858pin_project! {
1859    /// Stream for the [`AsyncBufReadExt::lines()`] method.
1860    #[derive(Debug)]
1861    #[must_use = "streams do nothing unless polled"]
1862    pub struct Lines<R> {
1863        #[pin]
1864        reader: R,
1865        buf: String,
1866        bytes: Vec<u8>,
1867        read: usize,
1868    }
1869}
1870
1871impl<R: AsyncBufRead> Stream for Lines<R> {
1872    type Item = Result<String>;
1873
1874    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1875        let this = self.project();
1876
1877        let n = ready!(read_line_internal(
1878            this.reader,
1879            cx,
1880            this.buf,
1881            this.bytes,
1882            this.read
1883        ))?;
1884        if n == 0 && this.buf.is_empty() {
1885            return Poll::Ready(None);
1886        }
1887
1888        if this.buf.ends_with('\n') {
1889            this.buf.pop();
1890            if this.buf.ends_with('\r') {
1891                this.buf.pop();
1892            }
1893        }
1894        Poll::Ready(Some(Ok(mem::take(this.buf))))
1895    }
1896}
1897
1898fn read_line_internal<R: AsyncBufRead + ?Sized>(
1899    reader: Pin<&mut R>,
1900    cx: &mut Context<'_>,
1901    buf: &mut String,
1902    bytes: &mut Vec<u8>,
1903    read: &mut usize,
1904) -> Poll<Result<usize>> {
1905    let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1906
1907    match String::from_utf8(mem::take(bytes)) {
1908        Ok(s) => {
1909            debug_assert!(buf.is_empty());
1910            debug_assert_eq!(*read, 0);
1911            *buf = s;
1912            Poll::Ready(ret)
1913        }
1914        Err(_) => Poll::Ready(ret.and_then(|_| {
1915            Err(Error::new(
1916                ErrorKind::InvalidData,
1917                "stream did not contain valid UTF-8",
1918            ))
1919        })),
1920    }
1921}
1922
1923pin_project! {
1924    /// Stream for the [`AsyncBufReadExt::split()`] method.
1925    #[derive(Debug)]
1926    #[must_use = "streams do nothing unless polled"]
1927    pub struct Split<R> {
1928        #[pin]
1929        reader: R,
1930        buf: Vec<u8>,
1931        read: usize,
1932        delim: u8,
1933    }
1934}
1935
1936impl<R: AsyncBufRead> Stream for Split<R> {
1937    type Item = Result<Vec<u8>>;
1938
1939    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1940        let this = self.project();
1941
1942        let n = ready!(read_until_internal(
1943            this.reader,
1944            cx,
1945            *this.delim,
1946            this.buf,
1947            this.read
1948        ))?;
1949        if n == 0 && this.buf.is_empty() {
1950            return Poll::Ready(None);
1951        }
1952
1953        if this.buf[this.buf.len() - 1] == *this.delim {
1954            this.buf.pop();
1955        }
1956        Poll::Ready(Some(Ok(mem::take(this.buf))))
1957    }
1958}
1959
1960/// Extension trait for [`AsyncRead`].
1961pub trait AsyncReadExt: AsyncRead {
1962    /// Reads some bytes from the byte stream.
1963    ///
1964    /// On success, returns the total number of bytes read.
1965    ///
1966    /// If the return value is `Ok(n)`, then it must be guaranteed that
1967    /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1968    /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1969    /// scenarios:
1970    ///
1971    /// 1. This reader has reached its "end of file" and will likely no longer be able to
1972    ///    produce bytes. Note that this does not mean that the reader will always no
1973    ///    longer be able to produce bytes.
1974    /// 2. The buffer specified was 0 bytes in length.
1975    ///
1976    /// # Examples
1977    ///
1978    /// ```
1979    /// use futures_lite::io::{AsyncReadExt, BufReader};
1980    ///
1981    /// # spin_on::spin_on(async {
1982    /// let input: &[u8] = b"hello";
1983    /// let mut reader = BufReader::new(input);
1984    ///
1985    /// let mut buf = vec![0; 1024];
1986    /// let n = reader.read(&mut buf).await?;
1987    /// # std::io::Result::Ok(()) });
1988    /// ```
1989    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1990    where
1991        Self: Unpin,
1992    {
1993        ReadFuture { reader: self, buf }
1994    }
1995
1996    /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1997    ///
1998    /// Data is copied to fill each buffer in order, with the final buffer possibly being
1999    /// only partially filled. This method must behave same as a single call to
2000    /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2001    fn read_vectored<'a>(
2002        &'a mut self,
2003        bufs: &'a mut [IoSliceMut<'a>],
2004    ) -> ReadVectoredFuture<'a, Self>
2005    where
2006        Self: Unpin,
2007    {
2008        ReadVectoredFuture { reader: self, bufs }
2009    }
2010
2011    /// Reads the entire contents and appends them to a [`Vec`].
2012    ///
2013    /// On success, returns the total number of bytes read.
2014    ///
2015    /// # Examples
2016    ///
2017    /// ```
2018    /// use futures_lite::io::{AsyncReadExt, Cursor};
2019    ///
2020    /// # spin_on::spin_on(async {
2021    /// let mut reader = Cursor::new(vec![1, 2, 3]);
2022    /// let mut contents = Vec::new();
2023    ///
2024    /// let n = reader.read_to_end(&mut contents).await?;
2025    /// assert_eq!(n, 3);
2026    /// assert_eq!(contents, [1, 2, 3]);
2027    /// # std::io::Result::Ok(()) });
2028    /// ```
2029    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2030    where
2031        Self: Unpin,
2032    {
2033        let start_len = buf.len();
2034        ReadToEndFuture {
2035            reader: self,
2036            buf,
2037            start_len,
2038        }
2039    }
2040
2041    /// Reads the entire contents and appends them to a [`String`].
2042    ///
2043    /// On success, returns the total number of bytes read.
2044    ///
2045    /// # Examples
2046    ///
2047    /// ```
2048    /// use futures_lite::io::{AsyncReadExt, Cursor};
2049    ///
2050    /// # spin_on::spin_on(async {
2051    /// let mut reader = Cursor::new(&b"hello");
2052    /// let mut contents = String::new();
2053    ///
2054    /// let n = reader.read_to_string(&mut contents).await?;
2055    /// assert_eq!(n, 5);
2056    /// assert_eq!(contents, "hello");
2057    /// # std::io::Result::Ok(()) });
2058    /// ```
2059    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2060    where
2061        Self: Unpin,
2062    {
2063        ReadToStringFuture {
2064            reader: self,
2065            buf,
2066            bytes: Vec::new(),
2067            start_len: 0,
2068        }
2069    }
2070
2071    /// Reads the exact number of bytes required to fill `buf`.
2072    ///
2073    /// # Examples
2074    ///
2075    /// ```
2076    /// use futures_lite::io::{AsyncReadExt, Cursor};
2077    ///
2078    /// # spin_on::spin_on(async {
2079    /// let mut reader = Cursor::new(&b"hello");
2080    /// let mut contents = vec![0; 3];
2081    ///
2082    /// reader.read_exact(&mut contents).await?;
2083    /// assert_eq!(contents, b"hel");
2084    /// # std::io::Result::Ok(()) });
2085    /// ```
2086    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2087    where
2088        Self: Unpin,
2089    {
2090        ReadExactFuture { reader: self, buf }
2091    }
2092
2093    /// Creates an adapter which will read at most `limit` bytes from it.
2094    ///
2095    /// This method returns a new instance of [`AsyncRead`] which will read at most
2096    /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2097    ///
2098    /// # Examples
2099    ///
2100    /// ```
2101    /// use futures_lite::io::{AsyncReadExt, Cursor};
2102    ///
2103    /// # spin_on::spin_on(async {
2104    /// let mut reader = Cursor::new(&b"hello");
2105    /// let mut contents = String::new();
2106    ///
2107    /// let n = reader.take(3).read_to_string(&mut contents).await?;
2108    /// assert_eq!(n, 3);
2109    /// assert_eq!(contents, "hel");
2110    /// # std::io::Result::Ok(()) });
2111    /// ```
2112    fn take(self, limit: u64) -> Take<Self>
2113    where
2114        Self: Sized,
2115    {
2116        Take { inner: self, limit }
2117    }
2118
2119    /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2120    ///
2121    /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2122    ///
2123    /// ```
2124    /// use futures_lite::io::{AsyncReadExt, Cursor};
2125    /// use futures_lite::stream::StreamExt;
2126    ///
2127    /// # spin_on::spin_on(async {
2128    /// let reader = Cursor::new(&b"hello");
2129    /// let mut bytes = reader.bytes();
2130    ///
2131    /// while let Some(byte) = bytes.next().await {
2132    ///     println!("byte: {}", byte?);
2133    /// }
2134    /// # std::io::Result::Ok(()) });
2135    /// ```
2136    fn bytes(self) -> Bytes<Self>
2137    where
2138        Self: Sized,
2139    {
2140        Bytes { inner: self }
2141    }
2142
2143    /// Creates an adapter which will chain this stream with another.
2144    ///
2145    /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2146    /// until EOF is found, and then continue with `next`.
2147    ///
2148    /// # Examples
2149    ///
2150    /// ```
2151    /// use futures_lite::io::{AsyncReadExt, Cursor};
2152    ///
2153    /// # spin_on::spin_on(async {
2154    /// let r1 = Cursor::new(&b"hello");
2155    /// let r2 = Cursor::new(&b"world");
2156    /// let mut reader = r1.chain(r2);
2157    ///
2158    /// let mut contents = String::new();
2159    /// reader.read_to_string(&mut contents).await?;
2160    /// assert_eq!(contents, "helloworld");
2161    /// # std::io::Result::Ok(()) });
2162    /// ```
2163    fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2164    where
2165        Self: Sized,
2166    {
2167        Chain {
2168            first: self,
2169            second: next,
2170            done_first: false,
2171        }
2172    }
2173
2174    /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2175    ///
2176    /// # Examples
2177    ///
2178    /// ```
2179    /// use futures_lite::io::AsyncReadExt;
2180    ///
2181    /// let reader = [1, 2, 3].boxed_reader();
2182    /// ```
2183    #[cfg(feature = "alloc")]
2184    fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2185    where
2186        Self: Sized + Send + 'a,
2187    {
2188        Box::pin(self)
2189    }
2190}
2191
2192impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2193
2194/// Future for the [`AsyncReadExt::read()`] method.
2195#[derive(Debug)]
2196#[must_use = "futures do nothing unless you `.await` or poll them"]
2197pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2198    reader: &'a mut R,
2199    buf: &'a mut [u8],
2200}
2201
2202impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2203
2204impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2205    type Output = Result<usize>;
2206
2207    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2208        let Self { reader, buf } = &mut *self;
2209        Pin::new(reader).poll_read(cx, buf)
2210    }
2211}
2212
2213/// Future for the [`AsyncReadExt::read_vectored()`] method.
2214#[derive(Debug)]
2215#[must_use = "futures do nothing unless you `.await` or poll them"]
2216pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2217    reader: &'a mut R,
2218    bufs: &'a mut [IoSliceMut<'a>],
2219}
2220
2221impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2222
2223impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2224    type Output = Result<usize>;
2225
2226    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2227        let Self { reader, bufs } = &mut *self;
2228        Pin::new(reader).poll_read_vectored(cx, bufs)
2229    }
2230}
2231
2232/// Future for the [`AsyncReadExt::read_to_end()`] method.
2233#[derive(Debug)]
2234#[must_use = "futures do nothing unless you `.await` or poll them"]
2235pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2236    reader: &'a mut R,
2237    buf: &'a mut Vec<u8>,
2238    start_len: usize,
2239}
2240
2241impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2242
2243impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2244    type Output = Result<usize>;
2245
2246    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2247        let Self {
2248            reader,
2249            buf,
2250            start_len,
2251        } = &mut *self;
2252        read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2253    }
2254}
2255
2256/// Future for the [`AsyncReadExt::read_to_string()`] method.
2257#[derive(Debug)]
2258#[must_use = "futures do nothing unless you `.await` or poll them"]
2259pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2260    reader: &'a mut R,
2261    buf: &'a mut String,
2262    bytes: Vec<u8>,
2263    start_len: usize,
2264}
2265
2266impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2267
2268impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2269    type Output = Result<usize>;
2270
2271    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2272        let Self {
2273            reader,
2274            buf,
2275            bytes,
2276            start_len,
2277        } = &mut *self;
2278        let reader = Pin::new(reader);
2279
2280        let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2281
2282        match String::from_utf8(mem::take(bytes)) {
2283            Ok(s) => {
2284                debug_assert!(buf.is_empty());
2285                **buf = s;
2286                Poll::Ready(ret)
2287            }
2288            Err(_) => Poll::Ready(ret.and_then(|_| {
2289                Err(Error::new(
2290                    ErrorKind::InvalidData,
2291                    "stream did not contain valid UTF-8",
2292                ))
2293            })),
2294        }
2295    }
2296}
2297
2298// This uses an adaptive system to extend the vector when it fills. We want to
2299// avoid paying to allocate and zero a huge chunk of memory if the reader only
2300// has 4 bytes while still making large reads if the reader does have a ton
2301// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2302// time is 4,500 times (!) slower than this if the reader has a very small
2303// amount of data to return.
2304//
2305// Because we're extending the buffer with uninitialized data for trusted
2306// readers, we need to make sure to truncate that if any of this panics.
2307fn read_to_end_internal<R: AsyncRead + ?Sized>(
2308    mut rd: Pin<&mut R>,
2309    cx: &mut Context<'_>,
2310    buf: &mut Vec<u8>,
2311    start_len: usize,
2312) -> Poll<Result<usize>> {
2313    struct Guard<'a> {
2314        buf: &'a mut Vec<u8>,
2315        len: usize,
2316    }
2317
2318    impl Drop for Guard<'_> {
2319        fn drop(&mut self) {
2320            self.buf.resize(self.len, 0);
2321        }
2322    }
2323
2324    let mut g = Guard {
2325        len: buf.len(),
2326        buf,
2327    };
2328    let ret;
2329    loop {
2330        if g.len == g.buf.len() {
2331            g.buf.reserve(32);
2332            let capacity = g.buf.capacity();
2333            g.buf.resize(capacity, 0);
2334        }
2335
2336        match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2337            Ok(0) => {
2338                ret = Poll::Ready(Ok(g.len - start_len));
2339                break;
2340            }
2341            Ok(n) => g.len += n,
2342            Err(e) => {
2343                ret = Poll::Ready(Err(e));
2344                break;
2345            }
2346        }
2347    }
2348
2349    ret
2350}
2351
2352/// Future for the [`AsyncReadExt::read_exact()`] method.
2353#[derive(Debug)]
2354#[must_use = "futures do nothing unless you `.await` or poll them"]
2355pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2356    reader: &'a mut R,
2357    buf: &'a mut [u8],
2358}
2359
2360impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2361
2362impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2363    type Output = Result<()>;
2364
2365    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2366        let Self { reader, buf } = &mut *self;
2367
2368        while !buf.is_empty() {
2369            let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2370            let (_, rest) = mem::take(buf).split_at_mut(n);
2371            *buf = rest;
2372
2373            if n == 0 {
2374                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2375            }
2376        }
2377
2378        Poll::Ready(Ok(()))
2379    }
2380}
2381
2382pin_project! {
2383    /// Reader for the [`AsyncReadExt::take()`] method.
2384    #[derive(Debug)]
2385    pub struct Take<R> {
2386        #[pin]
2387        inner: R,
2388        limit: u64,
2389    }
2390}
2391
2392impl<R> Take<R> {
2393    /// Returns the number of bytes before this adapter will return EOF.
2394    ///
2395    /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2396    ///
2397    /// # Examples
2398    ///
2399    /// ```
2400    /// use futures_lite::io::{AsyncReadExt, Cursor};
2401    ///
2402    /// let reader = Cursor::new("hello");
2403    ///
2404    /// let reader = reader.take(3);
2405    /// assert_eq!(reader.limit(), 3);
2406    /// ```
2407    pub fn limit(&self) -> u64 {
2408        self.limit
2409    }
2410
2411    /// Puts a limit on the number of bytes.
2412    ///
2413    /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2414    ///
2415    /// # Examples
2416    ///
2417    /// ```
2418    /// use futures_lite::io::{AsyncReadExt, Cursor};
2419    ///
2420    /// let reader = Cursor::new("hello");
2421    ///
2422    /// let mut reader = reader.take(10);
2423    /// assert_eq!(reader.limit(), 10);
2424    ///
2425    /// reader.set_limit(3);
2426    /// assert_eq!(reader.limit(), 3);
2427    /// ```
2428    pub fn set_limit(&mut self, limit: u64) {
2429        self.limit = limit;
2430    }
2431
2432    /// Gets a reference to the underlying reader.
2433    ///
2434    /// # Examples
2435    ///
2436    /// ```
2437    /// use futures_lite::io::{AsyncReadExt, Cursor};
2438    ///
2439    /// let reader = Cursor::new("hello");
2440    ///
2441    /// let reader = reader.take(3);
2442    /// let r = reader.get_ref();
2443    /// ```
2444    pub fn get_ref(&self) -> &R {
2445        &self.inner
2446    }
2447
2448    /// Gets a mutable reference to the underlying reader.
2449    ///
2450    /// # Examples
2451    ///
2452    /// ```
2453    /// use futures_lite::io::{AsyncReadExt, Cursor};
2454    ///
2455    /// let reader = Cursor::new("hello");
2456    ///
2457    /// let mut reader = reader.take(3);
2458    /// let r = reader.get_mut();
2459    /// ```
2460    pub fn get_mut(&mut self) -> &mut R {
2461        &mut self.inner
2462    }
2463
2464    /// Unwraps the adapter, returning the underlying reader.
2465    ///
2466    /// # Examples
2467    ///
2468    /// ```
2469    /// use futures_lite::io::{AsyncReadExt, Cursor};
2470    ///
2471    /// let reader = Cursor::new("hello");
2472    ///
2473    /// let reader = reader.take(3);
2474    /// let reader = reader.into_inner();
2475    /// ```
2476    pub fn into_inner(self) -> R {
2477        self.inner
2478    }
2479}
2480
2481impl<R: AsyncRead> AsyncRead for Take<R> {
2482    fn poll_read(
2483        self: Pin<&mut Self>,
2484        cx: &mut Context<'_>,
2485        buf: &mut [u8],
2486    ) -> Poll<Result<usize>> {
2487        let this = self.project();
2488        take_read_internal(this.inner, cx, buf, this.limit)
2489    }
2490}
2491
2492fn take_read_internal<R: AsyncRead + ?Sized>(
2493    mut rd: Pin<&mut R>,
2494    cx: &mut Context<'_>,
2495    buf: &mut [u8],
2496    limit: &mut u64,
2497) -> Poll<Result<usize>> {
2498    // Don't call into inner reader at all at EOF because it may still block
2499    if *limit == 0 {
2500        return Poll::Ready(Ok(0));
2501    }
2502
2503    let max = cmp::min(buf.len() as u64, *limit) as usize;
2504
2505    match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2506        Ok(n) => {
2507            *limit -= n as u64;
2508            Poll::Ready(Ok(n))
2509        }
2510        Err(e) => Poll::Ready(Err(e)),
2511    }
2512}
2513
2514impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2515    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2516        let this = self.project();
2517
2518        if *this.limit == 0 {
2519            return Poll::Ready(Ok(&[]));
2520        }
2521
2522        match ready!(this.inner.poll_fill_buf(cx)) {
2523            Ok(buf) => {
2524                let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2525                Poll::Ready(Ok(&buf[..cap]))
2526            }
2527            Err(e) => Poll::Ready(Err(e)),
2528        }
2529    }
2530
2531    fn consume(self: Pin<&mut Self>, amt: usize) {
2532        let this = self.project();
2533        // Don't let callers reset the limit by passing an overlarge value
2534        let amt = cmp::min(amt as u64, *this.limit) as usize;
2535        *this.limit -= amt as u64;
2536
2537        this.inner.consume(amt);
2538    }
2539}
2540
2541pin_project! {
2542    /// Reader for the [`AsyncReadExt::bytes()`] method.
2543    #[derive(Debug)]
2544    pub struct Bytes<R> {
2545        #[pin]
2546        inner: R,
2547    }
2548}
2549
2550impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2551    type Item = Result<u8>;
2552
2553    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2554        let mut byte = 0;
2555
2556        let rd = Pin::new(&mut self.inner);
2557
2558        match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2559            Ok(0) => Poll::Ready(None),
2560            Ok(..) => Poll::Ready(Some(Ok(byte))),
2561            Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2562            Err(e) => Poll::Ready(Some(Err(e))),
2563        }
2564    }
2565}
2566
2567impl<R: AsyncRead> AsyncRead for Bytes<R> {
2568    fn poll_read(
2569        self: Pin<&mut Self>,
2570        cx: &mut Context<'_>,
2571        buf: &mut [u8],
2572    ) -> Poll<Result<usize>> {
2573        self.project().inner.poll_read(cx, buf)
2574    }
2575
2576    fn poll_read_vectored(
2577        self: Pin<&mut Self>,
2578        cx: &mut Context<'_>,
2579        bufs: &mut [IoSliceMut<'_>],
2580    ) -> Poll<Result<usize>> {
2581        self.project().inner.poll_read_vectored(cx, bufs)
2582    }
2583}
2584
2585pin_project! {
2586    /// Reader for the [`AsyncReadExt::chain()`] method.
2587    pub struct Chain<R1, R2> {
2588        #[pin]
2589        first: R1,
2590        #[pin]
2591        second: R2,
2592        done_first: bool,
2593    }
2594}
2595
2596impl<R1, R2> Chain<R1, R2> {
2597    /// Gets references to the underlying readers.
2598    ///
2599    /// # Examples
2600    ///
2601    /// ```
2602    /// use futures_lite::io::{AsyncReadExt, Cursor};
2603    ///
2604    /// let r1 = Cursor::new(b"hello");
2605    /// let r2 = Cursor::new(b"world");
2606    ///
2607    /// let reader = r1.chain(r2);
2608    /// let (r1, r2) = reader.get_ref();
2609    /// ```
2610    pub fn get_ref(&self) -> (&R1, &R2) {
2611        (&self.first, &self.second)
2612    }
2613
2614    /// Gets mutable references to the underlying readers.
2615    ///
2616    /// # Examples
2617    ///
2618    /// ```
2619    /// use futures_lite::io::{AsyncReadExt, Cursor};
2620    ///
2621    /// let r1 = Cursor::new(b"hello");
2622    /// let r2 = Cursor::new(b"world");
2623    ///
2624    /// let mut reader = r1.chain(r2);
2625    /// let (r1, r2) = reader.get_mut();
2626    /// ```
2627    pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2628        (&mut self.first, &mut self.second)
2629    }
2630
2631    /// Unwraps the adapter, returning the underlying readers.
2632    ///
2633    /// # Examples
2634    ///
2635    /// ```
2636    /// use futures_lite::io::{AsyncReadExt, Cursor};
2637    ///
2638    /// let r1 = Cursor::new(b"hello");
2639    /// let r2 = Cursor::new(b"world");
2640    ///
2641    /// let reader = r1.chain(r2);
2642    /// let (r1, r2) = reader.into_inner();
2643    /// ```
2644    pub fn into_inner(self) -> (R1, R2) {
2645        (self.first, self.second)
2646    }
2647}
2648
2649impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2650    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2651        f.debug_struct("Chain")
2652            .field("r1", &self.first)
2653            .field("r2", &self.second)
2654            .finish()
2655    }
2656}
2657
2658impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2659    fn poll_read(
2660        self: Pin<&mut Self>,
2661        cx: &mut Context<'_>,
2662        buf: &mut [u8],
2663    ) -> Poll<Result<usize>> {
2664        let this = self.project();
2665        if !*this.done_first {
2666            match ready!(this.first.poll_read(cx, buf)) {
2667                Ok(0) if !buf.is_empty() => *this.done_first = true,
2668                Ok(n) => return Poll::Ready(Ok(n)),
2669                Err(err) => return Poll::Ready(Err(err)),
2670            }
2671        }
2672
2673        this.second.poll_read(cx, buf)
2674    }
2675
2676    fn poll_read_vectored(
2677        self: Pin<&mut Self>,
2678        cx: &mut Context<'_>,
2679        bufs: &mut [IoSliceMut<'_>],
2680    ) -> Poll<Result<usize>> {
2681        let this = self.project();
2682        if !*this.done_first {
2683            match ready!(this.first.poll_read_vectored(cx, bufs)) {
2684                Ok(0) if !bufs.is_empty() => *this.done_first = true,
2685                Ok(n) => return Poll::Ready(Ok(n)),
2686                Err(err) => return Poll::Ready(Err(err)),
2687            }
2688        }
2689
2690        this.second.poll_read_vectored(cx, bufs)
2691    }
2692}
2693
2694impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2695    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2696        let this = self.project();
2697        if !*this.done_first {
2698            match ready!(this.first.poll_fill_buf(cx)) {
2699                Ok([]) => *this.done_first = true,
2700                Ok(buf) => return Poll::Ready(Ok(buf)),
2701                Err(err) => return Poll::Ready(Err(err)),
2702            }
2703        }
2704
2705        this.second.poll_fill_buf(cx)
2706    }
2707
2708    fn consume(self: Pin<&mut Self>, amt: usize) {
2709        let this = self.project();
2710        if !*this.done_first {
2711            this.first.consume(amt)
2712        } else {
2713            this.second.consume(amt)
2714        }
2715    }
2716}
2717
2718/// Extension trait for [`AsyncSeek`].
2719pub trait AsyncSeekExt: AsyncSeek {
2720    /// Seeks to a new position in a byte stream.
2721    ///
2722    /// Returns the new position in the byte stream.
2723    ///
2724    /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2725    ///
2726    /// # Examples
2727    ///
2728    /// ```
2729    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2730    ///
2731    /// # spin_on::spin_on(async {
2732    /// let mut cursor = Cursor::new("hello");
2733    ///
2734    /// // Move the cursor to the end.
2735    /// cursor.seek(SeekFrom::End(0)).await?;
2736    ///
2737    /// // Check the current position.
2738    /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2739    /// # std::io::Result::Ok(()) });
2740    /// ```
2741    fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2742    where
2743        Self: Unpin,
2744    {
2745        SeekFuture { seeker: self, pos }
2746    }
2747}
2748
2749impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2750
2751/// Future for the [`AsyncSeekExt::seek()`] method.
2752#[derive(Debug)]
2753#[must_use = "futures do nothing unless you `.await` or poll them"]
2754pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2755    seeker: &'a mut S,
2756    pos: SeekFrom,
2757}
2758
2759impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2760
2761impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2762    type Output = Result<u64>;
2763
2764    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2765        let pos = self.pos;
2766        Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2767    }
2768}
2769
2770/// Extension trait for [`AsyncWrite`].
2771pub trait AsyncWriteExt: AsyncWrite {
2772    /// Writes some bytes into the byte stream.
2773    ///
2774    /// Returns the number of bytes written from the start of the buffer.
2775    ///
2776    /// If the return value is `Ok(n)` then it must be guaranteed that
2777    /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2778    /// object is no longer able to accept bytes and will likely not be able to in the
2779    /// future as well, or that the provided buffer is empty.
2780    ///
2781    /// # Examples
2782    ///
2783    /// ```
2784    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2785    ///
2786    /// # spin_on::spin_on(async {
2787    /// let mut output = Vec::new();
2788    /// let mut writer = BufWriter::new(&mut output);
2789    ///
2790    /// let n = writer.write(b"hello").await?;
2791    /// # std::io::Result::Ok(()) });
2792    /// ```
2793    fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2794    where
2795        Self: Unpin,
2796    {
2797        WriteFuture { writer: self, buf }
2798    }
2799
2800    /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2801    ///
2802    /// Data is copied from each buffer in order, with the final buffer possibly being only
2803    /// partially consumed. This method must behave same as a call to
2804    /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2805    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2806    where
2807        Self: Unpin,
2808    {
2809        WriteVectoredFuture { writer: self, bufs }
2810    }
2811
2812    /// Writes an entire buffer into the byte stream.
2813    ///
2814    /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2815    /// data to be written or an error occurs. It will not return before the entire buffer is
2816    /// successfully written or an error occurs.
2817    ///
2818    /// # Examples
2819    ///
2820    /// ```
2821    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2822    ///
2823    /// # spin_on::spin_on(async {
2824    /// let mut output = Vec::new();
2825    /// let mut writer = BufWriter::new(&mut output);
2826    ///
2827    /// let n = writer.write_all(b"hello").await?;
2828    /// # std::io::Result::Ok(()) });
2829    /// ```
2830    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2831    where
2832        Self: Unpin,
2833    {
2834        WriteAllFuture { writer: self, buf }
2835    }
2836
2837    /// Flushes the stream to ensure that all buffered contents reach their destination.
2838    ///
2839    /// # Examples
2840    ///
2841    /// ```
2842    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2843    ///
2844    /// # spin_on::spin_on(async {
2845    /// let mut output = Vec::new();
2846    /// let mut writer = BufWriter::new(&mut output);
2847    ///
2848    /// writer.write_all(b"hello").await?;
2849    /// writer.flush().await?;
2850    /// # std::io::Result::Ok(()) });
2851    /// ```
2852    fn flush(&mut self) -> FlushFuture<'_, Self>
2853    where
2854        Self: Unpin,
2855    {
2856        FlushFuture { writer: self }
2857    }
2858
2859    /// Closes the writer.
2860    ///
2861    /// # Examples
2862    ///
2863    /// ```
2864    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2865    ///
2866    /// # spin_on::spin_on(async {
2867    /// let mut output = Vec::new();
2868    /// let mut writer = BufWriter::new(&mut output);
2869    ///
2870    /// writer.close().await?;
2871    /// # std::io::Result::Ok(()) });
2872    /// ```
2873    fn close(&mut self) -> CloseFuture<'_, Self>
2874    where
2875        Self: Unpin,
2876    {
2877        CloseFuture { writer: self }
2878    }
2879
2880    /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2881    ///
2882    /// # Examples
2883    ///
2884    /// ```
2885    /// use futures_lite::io::AsyncWriteExt;
2886    ///
2887    /// let writer = Vec::<u8>::new().boxed_writer();
2888    /// ```
2889    #[cfg(feature = "alloc")]
2890    fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2891    where
2892        Self: Sized + Send + 'a,
2893    {
2894        Box::pin(self)
2895    }
2896}
2897
2898impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2899
2900/// Future for the [`AsyncWriteExt::write()`] method.
2901#[derive(Debug)]
2902#[must_use = "futures do nothing unless you `.await` or poll them"]
2903pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2904    writer: &'a mut W,
2905    buf: &'a [u8],
2906}
2907
2908impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2909
2910impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2911    type Output = Result<usize>;
2912
2913    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2914        let buf = self.buf;
2915        Pin::new(&mut *self.writer).poll_write(cx, buf)
2916    }
2917}
2918
2919/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2920#[derive(Debug)]
2921#[must_use = "futures do nothing unless you `.await` or poll them"]
2922pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2923    writer: &'a mut W,
2924    bufs: &'a [IoSlice<'a>],
2925}
2926
2927impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2928
2929impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2930    type Output = Result<usize>;
2931
2932    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2933        let bufs = self.bufs;
2934        Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2935    }
2936}
2937
2938/// Future for the [`AsyncWriteExt::write_all()`] method.
2939#[derive(Debug)]
2940#[must_use = "futures do nothing unless you `.await` or poll them"]
2941pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2942    writer: &'a mut W,
2943    buf: &'a [u8],
2944}
2945
2946impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2947
2948impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2949    type Output = Result<()>;
2950
2951    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2952        let Self { writer, buf } = &mut *self;
2953
2954        while !buf.is_empty() {
2955            let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2956            let (_, rest) = mem::take(buf).split_at(n);
2957            *buf = rest;
2958
2959            if n == 0 {
2960                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2961            }
2962        }
2963
2964        Poll::Ready(Ok(()))
2965    }
2966}
2967
2968/// Future for the [`AsyncWriteExt::flush()`] method.
2969#[derive(Debug)]
2970#[must_use = "futures do nothing unless you `.await` or poll them"]
2971pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2972    writer: &'a mut W,
2973}
2974
2975impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2976
2977impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2978    type Output = Result<()>;
2979
2980    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2981        Pin::new(&mut *self.writer).poll_flush(cx)
2982    }
2983}
2984
2985/// Future for the [`AsyncWriteExt::close()`] method.
2986#[derive(Debug)]
2987#[must_use = "futures do nothing unless you `.await` or poll them"]
2988pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2989    writer: &'a mut W,
2990}
2991
2992impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2993
2994impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2995    type Output = Result<()>;
2996
2997    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2998        Pin::new(&mut *self.writer).poll_close(cx)
2999    }
3000}
3001
3002/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3003///
3004/// # Examples
3005///
3006/// ```
3007/// use futures_lite::io::AsyncReadExt;
3008///
3009/// let reader = [1, 2, 3].boxed_reader();
3010/// ```
3011#[cfg(feature = "alloc")]
3012pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3013
3014/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3015///
3016/// # Examples
3017///
3018/// ```
3019/// use futures_lite::io::AsyncWriteExt;
3020///
3021/// let writer = Vec::<u8>::new().boxed_writer();
3022/// ```
3023#[cfg(feature = "alloc")]
3024pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3025
3026/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3027///
3028/// # Examples
3029///
3030/// ```
3031/// use futures_lite::io::{self, Cursor};
3032///
3033/// # spin_on::spin_on(async {
3034/// let stream = Cursor::new(vec![]);
3035/// let (mut reader, mut writer) = io::split(stream);
3036/// # std::io::Result::Ok(()) });
3037/// ```
3038pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3039where
3040    T: AsyncRead + AsyncWrite + Unpin,
3041{
3042    let inner = Arc::new(Mutex::new(stream));
3043    (ReadHalf(inner.clone()), WriteHalf(inner))
3044}
3045
3046/// The read half returned by [`split()`].
3047#[derive(Debug)]
3048pub struct ReadHalf<T>(Arc<Mutex<T>>);
3049
3050/// The write half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct WriteHalf<T>(Arc<Mutex<T>>);
3053
3054impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3055    fn poll_read(
3056        self: Pin<&mut Self>,
3057        cx: &mut Context<'_>,
3058        buf: &mut [u8],
3059    ) -> Poll<Result<usize>> {
3060        let mut inner = self.0.lock().unwrap();
3061        Pin::new(&mut *inner).poll_read(cx, buf)
3062    }
3063
3064    fn poll_read_vectored(
3065        self: Pin<&mut Self>,
3066        cx: &mut Context<'_>,
3067        bufs: &mut [IoSliceMut<'_>],
3068    ) -> Poll<Result<usize>> {
3069        let mut inner = self.0.lock().unwrap();
3070        Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3071    }
3072}
3073
3074impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3075    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3076        let mut inner = self.0.lock().unwrap();
3077        Pin::new(&mut *inner).poll_write(cx, buf)
3078    }
3079
3080    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3081        let mut inner = self.0.lock().unwrap();
3082        Pin::new(&mut *inner).poll_flush(cx)
3083    }
3084
3085    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3086        let mut inner = self.0.lock().unwrap();
3087        Pin::new(&mut *inner).poll_close(cx)
3088    }
3089}
3090
3091#[cfg(feature = "memchr")]
3092use memchr::memchr;
3093
3094/// Unoptimized memchr fallback.
3095#[cfg(not(feature = "memchr"))]
3096fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
3097    haystack.iter().position(|&b| b == needle)
3098}