futures_io/lib.rs
1//! Asynchronous I/O
2//!
3//! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and
4//! `AsyncBufRead` traits, the asynchronous analogs to
5//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is
6//! that these traits integrate with the asynchronous task system.
7//!
8//! All items of this library are only available when the `std` feature of this
9//! library is activated, and it is activated by default.
10
11#![no_std]
12#![doc(test(
13 no_crate_inject,
14 attr(
15 deny(warnings, rust_2018_idioms, single_use_lifetimes),
16 allow(dead_code, unused_assignments, unused_variables)
17 )
18))]
19#![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22#[cfg(feature = "std")]
23extern crate std;
24
25#[cfg(feature = "std")]
26mod if_std {
27 use std::boxed::Box;
28 use std::io;
29 use std::ops::DerefMut;
30 use std::pin::Pin;
31 use std::task::{Context, Poll};
32 use std::vec::Vec;
33
34 // Re-export some types from `std::io` so that users don't have to deal
35 // with conflicts when `use`ing `futures::io` and `std::io`.
36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37 #[doc(no_inline)]
38 pub use io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
39
40 /// Read bytes asynchronously.
41 ///
42 /// This trait is analogous to the `std::io::Read` trait, but integrates
43 /// with the asynchronous task system. In particular, the `poll_read`
44 /// method, unlike `Read::read`, will automatically queue the current task
45 /// for wakeup and return if data is not yet available, rather than blocking
46 /// the calling thread.
47 pub trait AsyncRead {
48 /// Attempt to read from the `AsyncRead` into `buf`.
49 ///
50 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
51 ///
52 /// If no data is available for reading, the method returns
53 /// `Poll::Pending` and arranges for the current task (via
54 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
55 /// readable or is closed.
56 ///
57 /// # Implementation
58 ///
59 /// This function may not return errors of kind `WouldBlock` or
60 /// `Interrupted`. Implementations must convert `WouldBlock` into
61 /// `Poll::Pending` and either internally retry or convert
62 /// `Interrupted` into another error kind.
63 fn poll_read(
64 self: Pin<&mut Self>,
65 cx: &mut Context<'_>,
66 buf: &mut [u8],
67 ) -> Poll<Result<usize>>;
68
69 /// Attempt to read from the `AsyncRead` into `bufs` using vectored
70 /// IO operations.
71 ///
72 /// This method is similar to `poll_read`, but allows data to be read
73 /// into multiple buffers using a single operation.
74 ///
75 /// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
76 ///
77 /// If no data is available for reading, the method returns
78 /// `Poll::Pending` and arranges for the current task (via
79 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
80 /// readable or is closed.
81 /// By default, this method delegates to using `poll_read` on the first
82 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
83 /// support vectored IO should override this method.
84 ///
85 /// # Implementation
86 ///
87 /// This function may not return errors of kind `WouldBlock` or
88 /// `Interrupted`. Implementations must convert `WouldBlock` into
89 /// `Poll::Pending` and either internally retry or convert
90 /// `Interrupted` into another error kind.
91 fn poll_read_vectored(
92 self: Pin<&mut Self>,
93 cx: &mut Context<'_>,
94 bufs: &mut [IoSliceMut<'_>],
95 ) -> Poll<Result<usize>> {
96 for b in bufs {
97 if !b.is_empty() {
98 return self.poll_read(cx, b);
99 }
100 }
101
102 self.poll_read(cx, &mut [])
103 }
104 }
105
106 /// Write bytes asynchronously.
107 ///
108 /// This trait is analogous to the `std::io::Write` trait, but integrates
109 /// with the asynchronous task system. In particular, the `poll_write`
110 /// method, unlike `Write::write`, will automatically queue the current task
111 /// for wakeup and return if the writer cannot take more data, rather than blocking
112 /// the calling thread.
113 pub trait AsyncWrite {
114 /// Attempt to write bytes from `buf` into the object.
115 ///
116 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
117 ///
118 /// If the object is not ready for writing, the method returns
119 /// `Poll::Pending` and arranges for the current task (via
120 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
121 /// writable or is closed.
122 ///
123 /// # Implementation
124 ///
125 /// This function may not return errors of kind `WouldBlock` or
126 /// `Interrupted`. Implementations must convert `WouldBlock` into
127 /// `Poll::Pending` and either internally retry or convert
128 /// `Interrupted` into another error kind.
129 ///
130 /// `poll_write` must try to make progress by flushing the underlying object if
131 /// that is the only way the underlying object can become writable again.
132 fn poll_write(
133 self: Pin<&mut Self>,
134 cx: &mut Context<'_>,
135 buf: &[u8],
136 ) -> Poll<Result<usize>>;
137
138 /// Attempt to write bytes from `bufs` into the object using vectored
139 /// IO operations.
140 ///
141 /// This method is similar to `poll_write`, but allows data from multiple buffers to be written
142 /// using a single operation.
143 ///
144 /// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
145 ///
146 /// If the object is not ready for writing, the method returns
147 /// `Poll::Pending` and arranges for the current task (via
148 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
149 /// writable or is closed.
150 ///
151 /// By default, this method delegates to using `poll_write` on the first
152 /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which
153 /// support vectored IO should override this method.
154 ///
155 /// # Implementation
156 ///
157 /// This function may not return errors of kind `WouldBlock` or
158 /// `Interrupted`. Implementations must convert `WouldBlock` into
159 /// `Poll::Pending` and either internally retry or convert
160 /// `Interrupted` into another error kind.
161 fn poll_write_vectored(
162 self: Pin<&mut Self>,
163 cx: &mut Context<'_>,
164 bufs: &[IoSlice<'_>],
165 ) -> Poll<Result<usize>> {
166 for b in bufs {
167 if !b.is_empty() {
168 return self.poll_write(cx, b);
169 }
170 }
171
172 self.poll_write(cx, &[])
173 }
174
175 /// Attempt to flush the object, ensuring that any buffered data reach
176 /// their destination.
177 ///
178 /// On success, returns `Poll::Ready(Ok(()))`.
179 ///
180 /// If flushing cannot immediately complete, this method returns
181 /// `Poll::Pending` and arranges for the current task (via
182 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
183 /// progress towards flushing.
184 ///
185 /// # Implementation
186 ///
187 /// This function may not return errors of kind `WouldBlock` or
188 /// `Interrupted`. Implementations must convert `WouldBlock` into
189 /// `Poll::Pending` and either internally retry or convert
190 /// `Interrupted` into another error kind.
191 ///
192 /// It only makes sense to do anything here if you actually buffer data.
193 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
194
195 /// Attempt to close the object.
196 ///
197 /// On success, returns `Poll::Ready(Ok(()))`.
198 ///
199 /// If closing cannot immediately complete, this function returns
200 /// `Poll::Pending` and arranges for the current task (via
201 /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
202 /// progress towards closing.
203 ///
204 /// # Implementation
205 ///
206 /// This function may not return errors of kind `WouldBlock` or
207 /// `Interrupted`. Implementations must convert `WouldBlock` into
208 /// `Poll::Pending` and either internally retry or convert
209 /// `Interrupted` into another error kind.
210 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;
211 }
212
213 /// Seek bytes asynchronously.
214 ///
215 /// This trait is analogous to the `std::io::Seek` trait, but integrates
216 /// with the asynchronous task system. In particular, the `poll_seek`
217 /// method, unlike `Seek::seek`, will automatically queue the current task
218 /// for wakeup and return if data is not yet available, rather than blocking
219 /// the calling thread.
220 pub trait AsyncSeek {
221 /// Attempt to seek to an offset, in bytes, in a stream.
222 ///
223 /// A seek beyond the end of a stream is allowed, but behavior is defined
224 /// by the implementation.
225 ///
226 /// If the seek operation completed successfully,
227 /// this method returns the new position from the start of the stream.
228 /// That position can be used later with [`SeekFrom::Start`].
229 ///
230 /// # Errors
231 ///
232 /// Seeking to a negative offset is considered an error.
233 ///
234 /// # Implementation
235 ///
236 /// This function may not return errors of kind `WouldBlock` or
237 /// `Interrupted`. Implementations must convert `WouldBlock` into
238 /// `Poll::Pending` and either internally retry or convert
239 /// `Interrupted` into another error kind.
240 fn poll_seek(
241 self: Pin<&mut Self>,
242 cx: &mut Context<'_>,
243 pos: SeekFrom,
244 ) -> Poll<Result<u64>>;
245 }
246
247 /// Read bytes asynchronously.
248 ///
249 /// This trait is analogous to the `std::io::BufRead` trait, but integrates
250 /// with the asynchronous task system. In particular, the `poll_fill_buf`
251 /// method, unlike `BufRead::fill_buf`, will automatically queue the current task
252 /// for wakeup and return if data is not yet available, rather than blocking
253 /// the calling thread.
254 pub trait AsyncBufRead: AsyncRead {
255 /// Attempt to return the contents of the internal buffer, filling it with more data
256 /// from the inner reader if it is empty.
257 ///
258 /// On success, returns `Poll::Ready(Ok(buf))`.
259 ///
260 /// If no data is available for reading, the method returns
261 /// `Poll::Pending` and arranges for the current task (via
262 /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
263 /// readable or is closed.
264 ///
265 /// This function is a lower-level call. It needs to be paired with the
266 /// [`consume`] method to function properly. When calling this
267 /// method, none of the contents will be "read" in the sense that later
268 /// calling [`poll_read`] may return the same contents. As such, [`consume`] must
269 /// be called with the number of bytes that are consumed from this buffer to
270 /// ensure that the bytes are never returned twice.
271 ///
272 /// [`poll_read`]: AsyncRead::poll_read
273 /// [`consume`]: AsyncBufRead::consume
274 ///
275 /// An empty buffer returned indicates that the stream has reached EOF.
276 ///
277 /// # Implementation
278 ///
279 /// This function may not return errors of kind `WouldBlock` or
280 /// `Interrupted`. Implementations must convert `WouldBlock` into
281 /// `Poll::Pending` and either internally retry or convert
282 /// `Interrupted` into another error kind.
283 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>>;
284
285 /// Tells this buffer that `amt` bytes have been consumed from the buffer,
286 /// so they should no longer be returned in calls to [`poll_read`].
287 ///
288 /// This function is a lower-level call. It needs to be paired with the
289 /// [`poll_fill_buf`] method to function properly. This function does
290 /// not perform any I/O, it simply informs this object that some amount of
291 /// its buffer, returned from [`poll_fill_buf`], has been consumed and should
292 /// no longer be returned. As such, this function may do odd things if
293 /// [`poll_fill_buf`] isn't called before calling it.
294 ///
295 /// The `amt` must be `<=` the number of bytes in the buffer returned by
296 /// [`poll_fill_buf`].
297 ///
298 /// [`poll_read`]: AsyncRead::poll_read
299 /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
300 fn consume(self: Pin<&mut Self>, amt: usize);
301 }
302
303 macro_rules! deref_async_read {
304 () => {
305 fn poll_read(
306 mut self: Pin<&mut Self>,
307 cx: &mut Context<'_>,
308 buf: &mut [u8],
309 ) -> Poll<Result<usize>> {
310 Pin::new(&mut **self).poll_read(cx, buf)
311 }
312
313 fn poll_read_vectored(
314 mut self: Pin<&mut Self>,
315 cx: &mut Context<'_>,
316 bufs: &mut [IoSliceMut<'_>],
317 ) -> Poll<Result<usize>> {
318 Pin::new(&mut **self).poll_read_vectored(cx, bufs)
319 }
320 };
321 }
322
323 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
324 deref_async_read!();
325 }
326
327 impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for &mut T {
328 deref_async_read!();
329 }
330
331 impl<P> AsyncRead for Pin<P>
332 where
333 P: DerefMut + Unpin,
334 P::Target: AsyncRead,
335 {
336 fn poll_read(
337 self: Pin<&mut Self>,
338 cx: &mut Context<'_>,
339 buf: &mut [u8],
340 ) -> Poll<Result<usize>> {
341 self.get_mut().as_mut().poll_read(cx, buf)
342 }
343
344 fn poll_read_vectored(
345 self: Pin<&mut Self>,
346 cx: &mut Context<'_>,
347 bufs: &mut [IoSliceMut<'_>],
348 ) -> Poll<Result<usize>> {
349 self.get_mut().as_mut().poll_read_vectored(cx, bufs)
350 }
351 }
352
353 macro_rules! delegate_async_read_to_stdio {
354 () => {
355 fn poll_read(
356 mut self: Pin<&mut Self>,
357 _: &mut Context<'_>,
358 buf: &mut [u8],
359 ) -> Poll<Result<usize>> {
360 Poll::Ready(io::Read::read(&mut *self, buf))
361 }
362
363 fn poll_read_vectored(
364 mut self: Pin<&mut Self>,
365 _: &mut Context<'_>,
366 bufs: &mut [IoSliceMut<'_>],
367 ) -> Poll<Result<usize>> {
368 Poll::Ready(io::Read::read_vectored(&mut *self, bufs))
369 }
370 };
371 }
372
373 impl AsyncRead for &[u8] {
374 delegate_async_read_to_stdio!();
375 }
376
377 macro_rules! deref_async_write {
378 () => {
379 fn poll_write(
380 mut self: Pin<&mut Self>,
381 cx: &mut Context<'_>,
382 buf: &[u8],
383 ) -> Poll<Result<usize>> {
384 Pin::new(&mut **self).poll_write(cx, buf)
385 }
386
387 fn poll_write_vectored(
388 mut self: Pin<&mut Self>,
389 cx: &mut Context<'_>,
390 bufs: &[IoSlice<'_>],
391 ) -> Poll<Result<usize>> {
392 Pin::new(&mut **self).poll_write_vectored(cx, bufs)
393 }
394
395 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
396 Pin::new(&mut **self).poll_flush(cx)
397 }
398
399 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
400 Pin::new(&mut **self).poll_close(cx)
401 }
402 };
403 }
404
405 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
406 deref_async_write!();
407 }
408
409 impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &mut T {
410 deref_async_write!();
411 }
412
413 impl<P> AsyncWrite for Pin<P>
414 where
415 P: DerefMut + Unpin,
416 P::Target: AsyncWrite,
417 {
418 fn poll_write(
419 self: Pin<&mut Self>,
420 cx: &mut Context<'_>,
421 buf: &[u8],
422 ) -> Poll<Result<usize>> {
423 self.get_mut().as_mut().poll_write(cx, buf)
424 }
425
426 fn poll_write_vectored(
427 self: Pin<&mut Self>,
428 cx: &mut Context<'_>,
429 bufs: &[IoSlice<'_>],
430 ) -> Poll<Result<usize>> {
431 self.get_mut().as_mut().poll_write_vectored(cx, bufs)
432 }
433
434 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
435 self.get_mut().as_mut().poll_flush(cx)
436 }
437
438 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
439 self.get_mut().as_mut().poll_close(cx)
440 }
441 }
442
443 macro_rules! delegate_async_write_to_stdio {
444 () => {
445 fn poll_write(
446 mut self: Pin<&mut Self>,
447 _: &mut Context<'_>,
448 buf: &[u8],
449 ) -> Poll<Result<usize>> {
450 Poll::Ready(io::Write::write(&mut *self, buf))
451 }
452
453 fn poll_write_vectored(
454 mut self: Pin<&mut Self>,
455 _: &mut Context<'_>,
456 bufs: &[IoSlice<'_>],
457 ) -> Poll<Result<usize>> {
458 Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
459 }
460
461 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
462 Poll::Ready(io::Write::flush(&mut *self))
463 }
464
465 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
466 self.poll_flush(cx)
467 }
468 };
469 }
470
471 impl AsyncWrite for Vec<u8> {
472 delegate_async_write_to_stdio!();
473 }
474
475 macro_rules! deref_async_seek {
476 () => {
477 fn poll_seek(
478 mut self: Pin<&mut Self>,
479 cx: &mut Context<'_>,
480 pos: SeekFrom,
481 ) -> Poll<Result<u64>> {
482 Pin::new(&mut **self).poll_seek(cx, pos)
483 }
484 };
485 }
486
487 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
488 deref_async_seek!();
489 }
490
491 impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
492 deref_async_seek!();
493 }
494
495 impl<P> AsyncSeek for Pin<P>
496 where
497 P: DerefMut + Unpin,
498 P::Target: AsyncSeek,
499 {
500 fn poll_seek(
501 self: Pin<&mut Self>,
502 cx: &mut Context<'_>,
503 pos: SeekFrom,
504 ) -> Poll<Result<u64>> {
505 self.get_mut().as_mut().poll_seek(cx, pos)
506 }
507 }
508
509 macro_rules! deref_async_buf_read {
510 () => {
511 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
512 Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
513 }
514
515 fn consume(mut self: Pin<&mut Self>, amt: usize) {
516 Pin::new(&mut **self).consume(amt)
517 }
518 };
519 }
520
521 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
522 deref_async_buf_read!();
523 }
524
525 impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
526 deref_async_buf_read!();
527 }
528
529 impl<P> AsyncBufRead for Pin<P>
530 where
531 P: DerefMut + Unpin,
532 P::Target: AsyncBufRead,
533 {
534 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
535 self.get_mut().as_mut().poll_fill_buf(cx)
536 }
537
538 fn consume(self: Pin<&mut Self>, amt: usize) {
539 self.get_mut().as_mut().consume(amt)
540 }
541 }
542
543 macro_rules! delegate_async_buf_read_to_stdio {
544 () => {
545 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
546 Poll::Ready(io::BufRead::fill_buf(self.get_mut()))
547 }
548
549 fn consume(self: Pin<&mut Self>, amt: usize) {
550 io::BufRead::consume(self.get_mut(), amt)
551 }
552 };
553 }
554
555 impl AsyncBufRead for &[u8] {
556 delegate_async_buf_read_to_stdio!();
557 }
558}
559
560#[cfg(feature = "std")]
561pub use self::if_std::*;