futures_io/
lib.rs

1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4//! `AsyncBufRead` traits, the asynchronous analogs to
5//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6//! that these traits integrate with the asynchronous task system.
7//!
8//! All items of this library are only available when the `std` feature of this
9//! library is activated, and it is activated by default.
10
11#![no_std]
12#![doc(test(
13    no_crate_inject,
14    attr(
15        deny(warnings, rust_2018_idioms, single_use_lifetimes),
16        allow(dead_code, unused_assignments, unused_variables)
17    )
18))]
19#![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22#[cfg(feature = "std")]
23extern crate std;
24
25#[cfg(feature = "std")]
26mod if_std {
27    use std::boxed::Box;
28    use std::io;
29    use std::ops::DerefMut;
30    use std::pin::Pin;
31    use std::task::{Context, Poll};
32    use std::vec::Vec;
33
34    // Re-export some types from `std::io` so that users don't have to deal
35    // with conflicts when `use`ing `futures::io` and `std::io`.
36    #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37    #[doc(no_inline)]
38    pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
39
40    /// Read bytes asynchronously.
41    ///
42    /// This trait is analogous to the `std::io::Read` trait, but integrates
43    /// with the asynchronous task system. In particular, the `poll_read`
44    /// method, unlike `Read::read`, will automatically queue the current task
45    /// for wakeup and return if data is not yet available, rather than blocking
46    /// the calling thread.
47    pub trait AsyncRead {
48        /// Attempt to read from the `AsyncRead` into `buf`.
49        ///
50        /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
51        ///
52        /// If no data is available for reading, the method returns
53        /// `Poll::Pending` and arranges for the current task (via
54        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
55        /// readable or is closed.
56        ///
57        /// # Implementation
58        ///
59        /// This function may not return errors of kind `WouldBlock` or
60        /// `Interrupted`.  Implementations must convert `WouldBlock` into
61        /// `Poll::Pending` and either internally retry or convert
62        /// `Interrupted` into another error kind.
63        fn poll_read(
64            self: Pin<&mut Self>,
65            cx: &mut Context<'_>,
66            buf: &mut [u8],
67        ) -> Poll<Result<usize>>;
68
69        /// Attempt to read from the `AsyncRead` into `bufs` using vectored
70        /// IO operations.
71        ///
72        /// This method is similar to `poll_read`, but allows data to be read
73        /// into multiple buffers using a single operation.
74        ///
75        /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
76        ///
77        /// If no data is available for reading, the method returns
78        /// `Poll::Pending` and arranges for the current task (via
79        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
80        /// readable or is closed.
81        /// By default, this method delegates to using `poll_read` on the first
82        /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
83        /// support vectored IO should override this method.
84        ///
85        /// # Implementation
86        ///
87        /// This function may not return errors of kind `WouldBlock` or
88        /// `Interrupted`.  Implementations must convert `WouldBlock` into
89        /// `Poll::Pending` and either internally retry or convert
90        /// `Interrupted` into another error kind.
91        fn poll_read_vectored(
92            self: Pin<&mut Self>,
93            cx: &mut Context<'_>,
94            bufs: &mut [IoSliceMut<'_>],
95        ) -> Poll<Result<usize>> {
96            for b in bufs {
97                if !b.is_empty() {
98                    return self.poll_read(cx, b);
99                }
100            }
101
102            self.poll_read(cx, &mut [])
103        }
104    }
105
106    /// Write bytes asynchronously.
107    ///
108    /// This trait is analogous to the `std::io::Write` trait, but integrates
109    /// with the asynchronous task system. In particular, the `poll_write`
110    /// method, unlike `Write::write`, will automatically queue the current task
111    /// for wakeup and return if the writer cannot take more data, rather than blocking
112    /// the calling thread.
113    pub trait AsyncWrite {
114        /// Attempt to write bytes from `buf` into the object.
115        ///
116        /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
117        ///
118        /// If the object is not ready for writing, the method returns
119        /// `Poll::Pending` and arranges for the current task (via
120        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
121        /// writable or is closed.
122        ///
123        /// # Implementation
124        ///
125        /// This function may not return errors of kind `WouldBlock` or
126        /// `Interrupted`.  Implementations must convert `WouldBlock` into
127        /// `Poll::Pending` and either internally retry or convert
128        /// `Interrupted` into another error kind.
129        ///
130        /// `poll_write` must try to make progress by flushing the underlying object if
131        /// that is the only way the underlying object can become writable again.
132        fn poll_write(
133            self: Pin<&mut Self>,
134            cx: &mut Context<'_>,
135            buf: &[u8],
136        ) -> Poll<Result<usize>>;
137
138        /// Attempt to write bytes from `bufs` into the object using vectored
139        /// IO operations.
140        ///
141        /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
142        /// using a single operation.
143        ///
144        /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
145        ///
146        /// If the object is not ready for writing, the method returns
147        /// `Poll::Pending` and arranges for the current task (via
148        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
149        /// writable or is closed.
150        ///
151        /// By default, this method delegates to using `poll_write` on the first
152        /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
153        /// support vectored IO should override this method.
154        ///
155        /// # Implementation
156        ///
157        /// This function may not return errors of kind `WouldBlock` or
158        /// `Interrupted`.  Implementations must convert `WouldBlock` into
159        /// `Poll::Pending` and either internally retry or convert
160        /// `Interrupted` into another error kind.
161        fn poll_write_vectored(
162            self: Pin<&mut Self>,
163            cx: &mut Context<'_>,
164            bufs: &[IoSlice<'_>],
165        ) -> Poll<Result<usize>> {
166            for b in bufs {
167                if !b.is_empty() {
168                    return self.poll_write(cx, b);
169                }
170            }
171
172            self.poll_write(cx, &[])
173        }
174
175        /// Attempt to flush the object, ensuring that any buffered data reach
176        /// their destination.
177        ///
178        /// On success, returns `Poll::Ready(Ok(()))`.
179        ///
180        /// If flushing cannot immediately complete, this method returns
181        /// `Poll::Pending` and arranges for the current task (via
182        /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
183        /// progress towards flushing.
184        ///
185        /// # Implementation
186        ///
187        /// This function may not return errors of kind `WouldBlock` or
188        /// `Interrupted`.  Implementations must convert `WouldBlock` into
189        /// `Poll::Pending` and either internally retry or convert
190        /// `Interrupted` into another error kind.
191        ///
192        /// It only makes sense to do anything here if you actually buffer data.
193        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
194
195        /// Attempt to close the object.
196        ///
197        /// On success, returns `Poll::Ready(Ok(()))`.
198        ///
199        /// If closing cannot immediately complete, this function returns
200        /// `Poll::Pending` and arranges for the current task (via
201        /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
202        /// progress towards closing.
203        ///
204        /// # Implementation
205        ///
206        /// This function may not return errors of kind `WouldBlock` or
207        /// `Interrupted`.  Implementations must convert `WouldBlock` into
208        /// `Poll::Pending` and either internally retry or convert
209        /// `Interrupted` into another error kind.
210        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
211    }
212
213    /// Seek bytes asynchronously.
214    ///
215    /// This trait is analogous to the `std::io::Seek` trait, but integrates
216    /// with the asynchronous task system. In particular, the `poll_seek`
217    /// method, unlike `Seek::seek`, will automatically queue the current task
218    /// for wakeup and return if data is not yet available, rather than blocking
219    /// the calling thread.
220    pub trait AsyncSeek {
221        /// Attempt to seek to an offset, in bytes, in a stream.
222        ///
223        /// A seek beyond the end of a stream is allowed, but behavior is defined
224        /// by the implementation.
225        ///
226        /// If the seek operation completed successfully,
227        /// this method returns the new position from the start of the stream.
228        /// That position can be used later with [`SeekFrom::Start`].
229        ///
230        /// # Errors
231        ///
232        /// Seeking to a negative offset is considered an error.
233        ///
234        /// # Implementation
235        ///
236        /// This function may not return errors of kind `WouldBlock` or
237        /// `Interrupted`.  Implementations must convert `WouldBlock` into
238        /// `Poll::Pending` and either internally retry or convert
239        /// `Interrupted` into another error kind.
240        fn poll_seek(
241            self: Pin<&mut Self>,
242            cx: &mut Context<'_>,
243            pos: SeekFrom,
244        ) -> Poll<Result<u64>>;
245    }
246
247    /// Read bytes asynchronously.
248    ///
249    /// This trait is analogous to the `std::io::BufRead` trait, but integrates
250    /// with the asynchronous task system. In particular, the `poll_fill_buf`
251    /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
252    /// for wakeup and return if data is not yet available, rather than blocking
253    /// the calling thread.
254    pub trait AsyncBufRead: AsyncRead {
255        /// Attempt to return the contents of the internal buffer, filling it with more data
256        /// from the inner reader if it is empty.
257        ///
258        /// On success, returns `Poll::Ready(Ok(buf))`.
259        ///
260        /// If no data is available for reading, the method returns
261        /// `Poll::Pending` and arranges for the current task (via
262        /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
263        /// readable or is closed.
264        ///
265        /// This function is a lower-level call. It needs to be paired with the
266        /// [`consume`] method to function properly. When calling this
267        /// method, none of the contents will be "read" in the sense that later
268        /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
269        /// be called with the number of bytes that are consumed from this buffer to
270        /// ensure that the bytes are never returned twice.
271        ///
272        /// [`poll_read`]: AsyncRead::poll_read
273        /// [`consume`]: AsyncBufRead::consume
274        ///
275        /// An empty buffer returned indicates that the stream has reached EOF.
276        ///
277        /// # Implementation
278        ///
279        /// This function may not return errors of kind `WouldBlock` or
280        /// `Interrupted`.  Implementations must convert `WouldBlock` into
281        /// `Poll::Pending` and either internally retry or convert
282        /// `Interrupted` into another error kind.
283        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
284
285        /// Tells this buffer that `amt` bytes have been consumed from the buffer,
286        /// so they should no longer be returned in calls to [`poll_read`].
287        ///
288        /// This function is a lower-level call. It needs to be paired with the
289        /// [`poll_fill_buf`] method to function properly. This function does
290        /// not perform any I/O, it simply informs this object that some amount of
291        /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
292        /// no longer be returned. As such, this function may do odd things if
293        /// [`poll_fill_buf`] isn't called before calling it.
294        ///
295        /// The `amt` must be `<=` the number of bytes in the buffer returned by
296        /// [`poll_fill_buf`].
297        ///
298        /// [`poll_read`]: AsyncRead::poll_read
299        /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
300        fn consume(self: Pin<&mut Self>, amt: usize);
301    }
302
303    macro_rules! deref_async_read {
304        () => {
305            fn poll_read(
306                mut self: Pin<&mut Self>,
307                cx: &mut Context<'_>,
308                buf: &mut [u8],
309            ) -> Poll<Result<usize>> {
310                Pin::new(&mut **self).poll_read(cx, buf)
311            }
312
313            fn poll_read_vectored(
314                mut self: Pin<&mut Self>,
315                cx: &mut Context<'_>,
316                bufs: &mut [IoSliceMut<'_>],
317            ) -> Poll<Result<usize>> {
318                Pin::new(&mut **self).poll_read_vectored(cx, bufs)
319            }
320        };
321    }
322
323    impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
324        deref_async_read!();
325    }
326
327    impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
328        deref_async_read!();
329    }
330
331    impl<P> AsyncRead for Pin<P>
332    where
333        P: DerefMut + Unpin,
334        P::Target: AsyncRead,
335    {
336        fn poll_read(
337            self: Pin<&mut Self>,
338            cx: &mut Context<'_>,
339            buf: &mut [u8],
340        ) -> Poll<Result<usize>> {
341            self.get_mut().as_mut().poll_read(cx, buf)
342        }
343
344        fn poll_read_vectored(
345            self: Pin<&mut Self>,
346            cx: &mut Context<'_>,
347            bufs: &mut [IoSliceMut<'_>],
348        ) -> Poll<Result<usize>> {
349            self.get_mut().as_mut().poll_read_vectored(cx, bufs)
350        }
351    }
352
353    macro_rules! delegate_async_read_to_stdio {
354        () => {
355            fn poll_read(
356                mut self: Pin<&mut Self>,
357                _: &mut Context<'_>,
358                buf: &mut [u8],
359            ) -> Poll<Result<usize>> {
360                Poll::Ready(io::Read::read(&mut *self, buf))
361            }
362
363            fn poll_read_vectored(
364                mut self: Pin<&mut Self>,
365                _: &mut Context<'_>,
366                bufs: &mut [IoSliceMut<'_>],
367            ) -> Poll<Result<usize>> {
368                Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
369            }
370        };
371    }
372
373    impl AsyncRead for &[u8] {
374        delegate_async_read_to_stdio!();
375    }
376
377    macro_rules! deref_async_write {
378        () => {
379            fn poll_write(
380                mut self: Pin<&mut Self>,
381                cx: &mut Context<'_>,
382                buf: &[u8],
383            ) -> Poll<Result<usize>> {
384                Pin::new(&mut **self).poll_write(cx, buf)
385            }
386
387            fn poll_write_vectored(
388                mut self: Pin<&mut Self>,
389                cx: &mut Context<'_>,
390                bufs: &[IoSlice<'_>],
391            ) -> Poll<Result<usize>> {
392                Pin::new(&mut **self).poll_write_vectored(cx, bufs)
393            }
394
395            fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
396                Pin::new(&mut **self).poll_flush(cx)
397            }
398
399            fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
400                Pin::new(&mut **self).poll_close(cx)
401            }
402        };
403    }
404
405    impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
406        deref_async_write!();
407    }
408
409    impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
410        deref_async_write!();
411    }
412
413    impl<P> AsyncWrite for Pin<P>
414    where
415        P: DerefMut + Unpin,
416        P::Target: AsyncWrite,
417    {
418        fn poll_write(
419            self: Pin<&mut Self>,
420            cx: &mut Context<'_>,
421            buf: &[u8],
422        ) -> Poll<Result<usize>> {
423            self.get_mut().as_mut().poll_write(cx, buf)
424        }
425
426        fn poll_write_vectored(
427            self: Pin<&mut Self>,
428            cx: &mut Context<'_>,
429            bufs: &[IoSlice<'_>],
430        ) -> Poll<Result<usize>> {
431            self.get_mut().as_mut().poll_write_vectored(cx, bufs)
432        }
433
434        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
435            self.get_mut().as_mut().poll_flush(cx)
436        }
437
438        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
439            self.get_mut().as_mut().poll_close(cx)
440        }
441    }
442
443    macro_rules! delegate_async_write_to_stdio {
444        () => {
445            fn poll_write(
446                mut self: Pin<&mut Self>,
447                _: &mut Context<'_>,
448                buf: &[u8],
449            ) -> Poll<Result<usize>> {
450                Poll::Ready(io::Write::write(&mut *self, buf))
451            }
452
453            fn poll_write_vectored(
454                mut self: Pin<&mut Self>,
455                _: &mut Context<'_>,
456                bufs: &[IoSlice<'_>],
457            ) -> Poll<Result<usize>> {
458                Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
459            }
460
461            fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
462                Poll::Ready(io::Write::flush(&mut *self))
463            }
464
465            fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
466                self.poll_flush(cx)
467            }
468        };
469    }
470
471    impl AsyncWrite for Vec<u8> {
472        delegate_async_write_to_stdio!();
473    }
474
475    macro_rules! deref_async_seek {
476        () => {
477            fn poll_seek(
478                mut self: Pin<&mut Self>,
479                cx: &mut Context<'_>,
480                pos: SeekFrom,
481            ) -> Poll<Result<u64>> {
482                Pin::new(&mut **self).poll_seek(cx, pos)
483            }
484        };
485    }
486
487    impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
488        deref_async_seek!();
489    }
490
491    impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
492        deref_async_seek!();
493    }
494
495    impl<P> AsyncSeek for Pin<P>
496    where
497        P: DerefMut + Unpin,
498        P::Target: AsyncSeek,
499    {
500        fn poll_seek(
501            self: Pin<&mut Self>,
502            cx: &mut Context<'_>,
503            pos: SeekFrom,
504        ) -> Poll<Result<u64>> {
505            self.get_mut().as_mut().poll_seek(cx, pos)
506        }
507    }
508
509    macro_rules! deref_async_buf_read {
510        () => {
511            fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
512                Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
513            }
514
515            fn consume(mut self: Pin<&mut Self>, amt: usize) {
516                Pin::new(&mut **self).consume(amt)
517            }
518        };
519    }
520
521    impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
522        deref_async_buf_read!();
523    }
524
525    impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
526        deref_async_buf_read!();
527    }
528
529    impl<P> AsyncBufRead for Pin<P>
530    where
531        P: DerefMut + Unpin,
532        P::Target: AsyncBufRead,
533    {
534        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
535            self.get_mut().as_mut().poll_fill_buf(cx)
536        }
537
538        fn consume(self: Pin<&mut Self>, amt: usize) {
539            self.get_mut().as_mut().consume(amt)
540        }
541    }
542
543    macro_rules! delegate_async_buf_read_to_stdio {
544        () => {
545            fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
546                Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
547            }
548
549            fn consume(self: Pin<&mut Self>, amt: usize) {
550                io::BufRead::consume(self.get_mut(), amt)
551            }
552        };
553    }
554
555    impl AsyncBufRead for &[u8] {
556        delegate_async_buf_read_to_stdio!();
557    }
558}
559
560#[cfg(feature = "std")]
561pub use self::if_std::*;