futures_core/stream.rs
1//! Asynchronous streams.
2
3use core::ops::DerefMut;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7/// An owned dynamically typed [`Stream`] for use in cases where you can't
8/// statically type your result or need to add some indirection.
9///
10/// This type is often created by the [`boxed`] method on [`StreamExt`]. See its documentation for more.
11///
12/// [`boxed`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed
13/// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
14#[cfg(feature = "alloc")]
15pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>;
16
17/// `BoxStream`, but without the `Send` requirement.
18///
19/// This type is often created by the [`boxed_local`] method on [`StreamExt`]. See its documentation for more.
20///
21/// [`boxed_local`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.boxed_local
22/// [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
23#[cfg(feature = "alloc")]
24pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>;
25
26/// A stream of values produced asynchronously.
27///
28/// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item
29/// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream
30/// represents a sequence of value-producing events that occur asynchronously to
31/// the caller.
32///
33/// The trait is modeled after `Future`, but allows `poll_next` to be called
34/// even after a value has been produced, yielding `None` once the stream has
35/// been fully exhausted.
36#[must_use = "streams do nothing unless polled"]
37pub trait Stream {
38 /// Values yielded by the stream.
39 type Item;
40
41 /// Attempt to pull out the next value of this stream, registering the
42 /// current task for wakeup if the value is not yet available, and returning
43 /// `None` if the stream is exhausted.
44 ///
45 /// # Return value
46 ///
47 /// There are several possible return values, each indicating a distinct
48 /// stream state:
49 ///
50 /// - `Poll::Pending` means that this stream's next value is not ready
51 /// yet. Implementations will ensure that the current task will be notified
52 /// when the next value may be ready.
53 ///
54 /// - `Poll::Ready(Some(val))` means that the stream has successfully
55 /// produced a value, `val`, and may produce further values on subsequent
56 /// `poll_next` calls.
57 ///
58 /// - `Poll::Ready(None)` means that the stream has terminated, and
59 /// `poll_next` should not be invoked again.
60 ///
61 /// # Panics
62 ///
63 /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
64 /// `poll_next` method again may panic, block forever, or cause other kinds of
65 /// problems; the `Stream` trait places no requirements on the effects of
66 /// such a call. However, as the `poll_next` method is not marked `unsafe`,
67 /// Rust's usual rules apply: calls must never cause undefined behavior
68 /// (memory corruption, incorrect use of `unsafe` functions, or the like),
69 /// regardless of the stream's state.
70 ///
71 /// If this is difficult to guard against then the [`fuse`] adapter can be used
72 /// to ensure that `poll_next` always returns `Ready(None)` in subsequent
73 /// calls.
74 ///
75 /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse
76 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
77
78 /// Returns the bounds on the remaining length of the stream.
79 ///
80 /// Specifically, `size_hint()` returns a tuple where the first element
81 /// is the lower bound, and the second element is the upper bound.
82 ///
83 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
84 /// A [`None`] here means that either there is no known upper bound, or the
85 /// upper bound is larger than [`usize`].
86 ///
87 /// # Implementation notes
88 ///
89 /// It is not enforced that a stream implementation yields the declared
90 /// number of elements. A buggy stream may yield less than the lower bound
91 /// or more than the upper bound of elements.
92 ///
93 /// `size_hint()` is primarily intended to be used for optimizations such as
94 /// reserving space for the elements of the stream, but must not be
95 /// trusted to e.g., omit bounds checks in unsafe code. An incorrect
96 /// implementation of `size_hint()` should not lead to memory safety
97 /// violations.
98 ///
99 /// That said, the implementation should provide a correct estimation,
100 /// because otherwise it would be a violation of the trait's protocol.
101 ///
102 /// The default implementation returns `(0, `[`None`]`)` which is correct for any
103 /// stream.
104 #[inline]
105 fn size_hint(&self) -> (usize, Option<usize>) {
106 (0, None)
107 }
108}
109
110impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
111 type Item = S::Item;
112
113 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114 S::poll_next(Pin::new(&mut **self), cx)
115 }
116
117 fn size_hint(&self) -> (usize, Option<usize>) {
118 (**self).size_hint()
119 }
120}
121
122impl<P> Stream for Pin<P>
123where
124 P: DerefMut + Unpin,
125 P::Target: Stream,
126{
127 type Item = <P::Target as Stream>::Item;
128
129 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 self.get_mut().as_mut().poll_next(cx)
131 }
132
133 fn size_hint(&self) -> (usize, Option<usize>) {
134 (**self).size_hint()
135 }
136}
137
138/// A stream which tracks whether or not the underlying stream
139/// should no longer be polled.
140///
141/// `is_terminated` will return `true` if a future should no longer be polled.
142/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
143/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
144/// stream has become inactive and can no longer make progress and should be
145/// ignored or dropped rather than being polled again.
146pub trait FusedStream: Stream {
147 /// Returns `true` if the stream should no longer be polled.
148 fn is_terminated(&self) -> bool;
149}
150
151impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
152 fn is_terminated(&self) -> bool {
153 <F as FusedStream>::is_terminated(&**self)
154 }
155}
156
157impl<P> FusedStream for Pin<P>
158where
159 P: DerefMut + Unpin,
160 P::Target: FusedStream,
161{
162 fn is_terminated(&self) -> bool {
163 <P::Target as FusedStream>::is_terminated(&**self)
164 }
165}
166
167mod private_try_stream {
168 use super::Stream;
169
170 pub trait Sealed {}
171
172 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
173}
174
175/// A convenience for streams that return `Result` values that includes
176/// a variety of adapters tailored to such futures.
177pub trait TryStream: Stream + private_try_stream::Sealed {
178 /// The type of successful values yielded by this future
179 type Ok;
180
181 /// The type of failures yielded by this future
182 type Error;
183
184 /// Poll this `TryStream` as if it were a `Stream`.
185 ///
186 /// This method is a stopgap for a compiler limitation that prevents us from
187 /// directly inheriting from the `Stream` trait; in the future it won't be
188 /// needed.
189 fn try_poll_next(
190 self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
193}
194
195impl<S, T, E> TryStream for S
196where
197 S: ?Sized + Stream<Item = Result<T, E>>,
198{
199 type Ok = T;
200 type Error = E;
201
202 fn try_poll_next(
203 self: Pin<&mut Self>,
204 cx: &mut Context<'_>,
205 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
206 self.poll_next(cx)
207 }
208}
209
210#[cfg(feature = "alloc")]
211mod if_alloc {
212 use super::*;
213 use alloc::boxed::Box;
214
215 impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
216 type Item = S::Item;
217
218 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219 Pin::new(&mut **self).poll_next(cx)
220 }
221
222 fn size_hint(&self) -> (usize, Option<usize>) {
223 (**self).size_hint()
224 }
225 }
226
227 #[cfg(feature = "std")]
228 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
229 type Item = S::Item;
230
231 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
232 unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
233 }
234
235 fn size_hint(&self) -> (usize, Option<usize>) {
236 self.0.size_hint()
237 }
238 }
239
240 impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
241 fn is_terminated(&self) -> bool {
242 <S as FusedStream>::is_terminated(&**self)
243 }
244 }
245}