futures_lite/
stream.rs

1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(all(not(feature = "std"), feature = "alloc"))]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34#[cfg(feature = "race")]
35use fastrand::Rng;
36
37use pin_project_lite::pin_project;
38
39use crate::ready;
40
41/// Converts a stream into a blocking iterator.
42///
43/// # Examples
44///
45/// ```
46/// use futures_lite::{pin, stream};
47///
48/// let stream = stream::once(7);
49/// pin!(stream);
50///
51/// let mut iter = stream::block_on(stream);
52/// assert_eq!(iter.next(), Some(7));
53/// assert_eq!(iter.next(), None);
54/// ```
55#[cfg(feature = "std")]
56pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
57    BlockOn(stream)
58}
59
60/// Iterator for the [`block_on()`] function.
61#[derive(Debug)]
62pub struct BlockOn<S>(S);
63
64#[cfg(feature = "std")]
65impl<S: Stream + Unpin> Iterator for BlockOn<S> {
66    type Item = S::Item;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        crate::future::block_on(self.0.next())
70    }
71
72    fn size_hint(&self) -> (usize, Option<usize>) {
73        self.0.size_hint()
74    }
75
76    fn count(self) -> usize {
77        crate::future::block_on(self.0.count())
78    }
79
80    fn last(self) -> Option<Self::Item> {
81        crate::future::block_on(self.0.last())
82    }
83
84    fn nth(&mut self, n: usize) -> Option<Self::Item> {
85        crate::future::block_on(self.0.nth(n))
86    }
87
88    fn fold<B, F>(self, init: B, f: F) -> B
89    where
90        F: FnMut(B, Self::Item) -> B,
91    {
92        crate::future::block_on(self.0.fold(init, f))
93    }
94
95    fn for_each<F>(self, f: F) -> F::Output
96    where
97        F: FnMut(Self::Item),
98    {
99        crate::future::block_on(self.0.for_each(f))
100    }
101
102    fn all<F>(&mut self, f: F) -> bool
103    where
104        F: FnMut(Self::Item) -> bool,
105    {
106        crate::future::block_on(self.0.all(f))
107    }
108
109    fn any<F>(&mut self, f: F) -> bool
110    where
111        F: FnMut(Self::Item) -> bool,
112    {
113        crate::future::block_on(self.0.any(f))
114    }
115
116    fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
117    where
118        P: FnMut(&Self::Item) -> bool,
119    {
120        crate::future::block_on(self.0.find(predicate))
121    }
122
123    fn find_map<B, F>(&mut self, f: F) -> Option<B>
124    where
125        F: FnMut(Self::Item) -> Option<B>,
126    {
127        crate::future::block_on(self.0.find_map(f))
128    }
129
130    fn position<P>(&mut self, predicate: P) -> Option<usize>
131    where
132        P: FnMut(Self::Item) -> bool,
133    {
134        crate::future::block_on(self.0.position(predicate))
135    }
136}
137
138/// Creates an empty stream.
139///
140/// # Examples
141///
142/// ```
143/// use futures_lite::stream::{self, StreamExt};
144///
145/// # spin_on::spin_on(async {
146/// let mut s = stream::empty::<i32>();
147/// assert_eq!(s.next().await, None);
148/// # })
149/// ```
150pub fn empty<T>() -> Empty<T> {
151    Empty {
152        _marker: PhantomData,
153    }
154}
155
156/// Stream for the [`empty()`] function.
157#[derive(Clone, Debug)]
158#[must_use = "streams do nothing unless polled"]
159pub struct Empty<T> {
160    _marker: PhantomData<T>,
161}
162
163impl<T> Unpin for Empty<T> {}
164
165impl<T> Stream for Empty<T> {
166    type Item = T;
167
168    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169        Poll::Ready(None)
170    }
171
172    fn size_hint(&self) -> (usize, Option<usize>) {
173        (0, Some(0))
174    }
175}
176
177/// Creates a stream from an iterator.
178///
179/// # Examples
180///
181/// ```
182/// use futures_lite::stream::{self, StreamExt};
183///
184/// # spin_on::spin_on(async {
185/// let mut s = stream::iter(vec![1, 2]);
186///
187/// assert_eq!(s.next().await, Some(1));
188/// assert_eq!(s.next().await, Some(2));
189/// assert_eq!(s.next().await, None);
190/// # })
191/// ```
192pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
193    Iter {
194        iter: iter.into_iter(),
195    }
196}
197
198/// Stream for the [`iter()`] function.
199#[derive(Clone, Debug)]
200#[must_use = "streams do nothing unless polled"]
201pub struct Iter<I> {
202    iter: I,
203}
204
205impl<I> Unpin for Iter<I> {}
206
207impl<I: Iterator> Stream for Iter<I> {
208    type Item = I::Item;
209
210    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211        Poll::Ready(self.iter.next())
212    }
213
214    fn size_hint(&self) -> (usize, Option<usize>) {
215        self.iter.size_hint()
216    }
217}
218
219/// Creates a stream that yields a single item.
220///
221/// # Examples
222///
223/// ```
224/// use futures_lite::stream::{self, StreamExt};
225///
226/// # spin_on::spin_on(async {
227/// let mut s = stream::once(7);
228///
229/// assert_eq!(s.next().await, Some(7));
230/// assert_eq!(s.next().await, None);
231/// # })
232/// ```
233pub fn once<T>(t: T) -> Once<T> {
234    Once { value: Some(t) }
235}
236
237pin_project! {
238    /// Stream for the [`once()`] function.
239    #[derive(Clone, Debug)]
240    #[must_use = "streams do nothing unless polled"]
241    pub struct Once<T> {
242        value: Option<T>,
243    }
244}
245
246impl<T> Stream for Once<T> {
247    type Item = T;
248
249    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
250        Poll::Ready(self.project().value.take())
251    }
252
253    fn size_hint(&self) -> (usize, Option<usize>) {
254        if self.value.is_some() {
255            (1, Some(1))
256        } else {
257            (0, Some(0))
258        }
259    }
260}
261
262/// Creates a stream that is always pending.
263///
264/// # Examples
265///
266/// ```no_run
267/// use futures_lite::stream::{self, StreamExt};
268///
269/// # spin_on::spin_on(async {
270/// let mut s = stream::pending::<i32>();
271/// s.next().await;
272/// unreachable!();
273/// # })
274/// ```
275pub fn pending<T>() -> Pending<T> {
276    Pending {
277        _marker: PhantomData,
278    }
279}
280
281/// Stream for the [`pending()`] function.
282#[derive(Clone, Debug)]
283#[must_use = "streams do nothing unless polled"]
284pub struct Pending<T> {
285    _marker: PhantomData<T>,
286}
287
288impl<T> Unpin for Pending<T> {}
289
290impl<T> Stream for Pending<T> {
291    type Item = T;
292
293    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
294        Poll::Pending
295    }
296
297    fn size_hint(&self) -> (usize, Option<usize>) {
298        (0, Some(0))
299    }
300}
301
302/// Creates a stream from a function returning [`Poll`].
303///
304/// # Examples
305///
306/// ```
307/// use futures_lite::stream::{self, StreamExt};
308/// use std::task::{Context, Poll};
309///
310/// # spin_on::spin_on(async {
311/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
312///     Poll::Ready(Some(7))
313/// }
314///
315/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
316/// # })
317/// ```
318pub fn poll_fn<T, F>(f: F) -> PollFn<F>
319where
320    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
321{
322    PollFn { f }
323}
324
325/// Stream for the [`poll_fn()`] function.
326#[derive(Clone)]
327#[must_use = "streams do nothing unless polled"]
328pub struct PollFn<F> {
329    f: F,
330}
331
332impl<F> Unpin for PollFn<F> {}
333
334impl<F> fmt::Debug for PollFn<F> {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        f.debug_struct("PollFn").finish()
337    }
338}
339
340impl<T, F> Stream for PollFn<F>
341where
342    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
343{
344    type Item = T;
345
346    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
347        (&mut self.f)(cx)
348    }
349}
350
351/// Creates an infinite stream that yields the same item repeatedly.
352///
353/// # Examples
354///
355/// ```
356/// use futures_lite::stream::{self, StreamExt};
357///
358/// # spin_on::spin_on(async {
359/// let mut s = stream::repeat(7);
360///
361/// assert_eq!(s.next().await, Some(7));
362/// assert_eq!(s.next().await, Some(7));
363/// # })
364/// ```
365pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
366    Repeat { item }
367}
368
369/// Stream for the [`repeat()`] function.
370#[derive(Clone, Debug)]
371#[must_use = "streams do nothing unless polled"]
372pub struct Repeat<T> {
373    item: T,
374}
375
376impl<T> Unpin for Repeat<T> {}
377
378impl<T: Clone> Stream for Repeat<T> {
379    type Item = T;
380
381    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
382        Poll::Ready(Some(self.item.clone()))
383    }
384
385    fn size_hint(&self) -> (usize, Option<usize>) {
386        (usize::MAX, None)
387    }
388}
389
390/// Creates an infinite stream from a closure that generates items.
391///
392/// # Examples
393///
394/// ```
395/// use futures_lite::stream::{self, StreamExt};
396///
397/// # spin_on::spin_on(async {
398/// let mut s = stream::repeat_with(|| 7);
399///
400/// assert_eq!(s.next().await, Some(7));
401/// assert_eq!(s.next().await, Some(7));
402/// # })
403/// ```
404pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
405where
406    F: FnMut() -> T,
407{
408    RepeatWith { f: repeater }
409}
410
411/// Stream for the [`repeat_with()`] function.
412#[derive(Clone, Debug)]
413#[must_use = "streams do nothing unless polled"]
414pub struct RepeatWith<F> {
415    f: F,
416}
417
418impl<F> Unpin for RepeatWith<F> {}
419
420impl<T, F> Stream for RepeatWith<F>
421where
422    F: FnMut() -> T,
423{
424    type Item = T;
425
426    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427        let item = (&mut self.f)();
428        Poll::Ready(Some(item))
429    }
430
431    fn size_hint(&self) -> (usize, Option<usize>) {
432        (usize::MAX, None)
433    }
434}
435
436/// Creates a stream from a seed value and an async closure operating on it.
437///
438/// # Examples
439///
440/// ```
441/// use futures_lite::stream::{self, StreamExt};
442///
443/// # spin_on::spin_on(async {
444/// let s = stream::unfold(0, |mut n| async move {
445///     if n < 2 {
446///         let m = n + 1;
447///         Some((n, m))
448///     } else {
449///         None
450///     }
451/// });
452///
453/// let v: Vec<i32> = s.collect().await;
454/// assert_eq!(v, [0, 1]);
455/// # })
456/// ```
457pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
458where
459    F: FnMut(T) -> Fut,
460    Fut: Future<Output = Option<(Item, T)>>,
461{
462    Unfold {
463        f,
464        state: Some(seed),
465        fut: None,
466    }
467}
468
469pin_project! {
470    /// Stream for the [`unfold()`] function.
471    #[derive(Clone)]
472    #[must_use = "streams do nothing unless polled"]
473    pub struct Unfold<T, F, Fut> {
474        f: F,
475        state: Option<T>,
476        #[pin]
477        fut: Option<Fut>,
478    }
479}
480
481impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
482where
483    T: fmt::Debug,
484    Fut: fmt::Debug,
485{
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        f.debug_struct("Unfold")
488            .field("state", &self.state)
489            .field("fut", &self.fut)
490            .finish()
491    }
492}
493
494impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
495where
496    F: FnMut(T) -> Fut,
497    Fut: Future<Output = Option<(Item, T)>>,
498{
499    type Item = Item;
500
501    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502        let mut this = self.project();
503
504        if let Some(state) = this.state.take() {
505            this.fut.set(Some((this.f)(state)));
506        }
507
508        let step = ready!(this
509            .fut
510            .as_mut()
511            .as_pin_mut()
512            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
513            .poll(cx));
514        this.fut.set(None);
515
516        if let Some((item, next_state)) = step {
517            *this.state = Some(next_state);
518            Poll::Ready(Some(item))
519        } else {
520            Poll::Ready(None)
521        }
522    }
523}
524
525/// Creates a stream from a seed value and a fallible async closure operating on it.
526///
527/// # Examples
528///
529/// ```
530/// use futures_lite::stream::{self, StreamExt};
531///
532/// # spin_on::spin_on(async {
533/// let s = stream::try_unfold(0, |mut n| async move {
534///     if n < 2 {
535///         let m = n + 1;
536///         Ok(Some((n, m)))
537///     } else {
538///         std::io::Result::Ok(None)
539///     }
540/// });
541///
542/// let v: Vec<i32> = s.try_collect().await?;
543/// assert_eq!(v, [0, 1]);
544/// # std::io::Result::Ok(()) });
545/// ```
546pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
547where
548    F: FnMut(T) -> Fut,
549    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
550{
551    TryUnfold {
552        f,
553        state: Some(init),
554        fut: None,
555    }
556}
557
558pin_project! {
559    /// Stream for the [`try_unfold()`] function.
560    #[derive(Clone)]
561    #[must_use = "streams do nothing unless polled"]
562    pub struct TryUnfold<T, F, Fut> {
563        f: F,
564        state: Option<T>,
565        #[pin]
566        fut: Option<Fut>,
567    }
568}
569
570impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
571where
572    T: fmt::Debug,
573    Fut: fmt::Debug,
574{
575    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576        f.debug_struct("TryUnfold")
577            .field("state", &self.state)
578            .field("fut", &self.fut)
579            .finish()
580    }
581}
582
583impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
584where
585    F: FnMut(T) -> Fut,
586    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
587{
588    type Item = Result<Item, E>;
589
590    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591        let mut this = self.project();
592
593        if let Some(state) = this.state.take() {
594            this.fut.set(Some((this.f)(state)));
595        }
596
597        match this.fut.as_mut().as_pin_mut() {
598            None => {
599                // The future previously errored
600                Poll::Ready(None)
601            }
602            Some(future) => {
603                let step = ready!(future.poll(cx));
604                this.fut.set(None);
605
606                match step {
607                    Ok(Some((item, next_state))) => {
608                        *this.state = Some(next_state);
609                        Poll::Ready(Some(Ok(item)))
610                    }
611                    Ok(None) => Poll::Ready(None),
612                    Err(e) => Poll::Ready(Some(Err(e))),
613                }
614            }
615        }
616    }
617}
618
619/// Creates a stream that invokes the given future as its first item, and then
620/// produces no more items.
621///
622/// # Example
623///
624/// ```
625/// use futures_lite::{stream, prelude::*};
626///
627/// # spin_on::spin_on(async {
628/// let mut stream = Box::pin(stream::once_future(async { 1 }));
629/// assert_eq!(stream.next().await, Some(1));
630/// assert_eq!(stream.next().await, None);
631/// # });
632/// ```
633pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
634    OnceFuture {
635        future: Some(future),
636    }
637}
638
639pin_project! {
640    /// Stream for the [`once_future()`] method.
641    #[derive(Debug)]
642    #[must_use = "futures do nothing unless you `.await` or poll them"]
643    pub struct OnceFuture<F> {
644        #[pin]
645        future: Option<F>,
646    }
647}
648
649impl<F: Future> Stream for OnceFuture<F> {
650    type Item = F::Output;
651
652    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
653        let mut this = self.project();
654
655        match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
656            Some(Poll::Ready(t)) => {
657                this.future.set(None);
658                Poll::Ready(Some(t))
659            }
660            Some(Poll::Pending) => Poll::Pending,
661            None => Poll::Ready(None),
662        }
663    }
664}
665
666/// Take elements from this stream until the provided future resolves.
667///
668/// This function will take elements from the stream until the provided
669/// stopping future `fut` resolves. Once the `fut` future becomes ready,
670/// this stream combinator will always return that the stream is done.
671///
672/// The stopping future may return any type. Once the stream is stopped
673/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
674/// The stream may also be resumed with `StopAfterFuture::take_future()`.
675/// See the documentation of [`StopAfterFuture`] for more information.
676///
677/// ```
678/// use futures_lite::stream::{self, StreamExt, stop_after_future};
679/// use futures_lite::future;
680/// use std::task::Poll;
681///
682/// let stream = stream::iter(1..=10);
683///
684/// # spin_on::spin_on(async {
685/// let mut i = 0;
686/// let stop_fut = future::poll_fn(|_cx| {
687///     i += 1;
688///     if i <= 5 {
689///         Poll::Pending
690///     } else {
691///         Poll::Ready(())
692///     }
693/// });
694///
695/// let stream = stop_after_future(stream, stop_fut);
696///
697/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
698/// # });
699pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
700where
701    S: Sized + Stream,
702    F: Future,
703{
704    StopAfterFuture {
705        stream,
706        fut: Some(future),
707        fut_result: None,
708        free: false,
709    }
710}
711
712pin_project! {
713    /// Stream for the [`StreamExt::stop_after_future()`] method.
714    #[derive(Clone, Debug)]
715    #[must_use = "streams do nothing unless polled"]
716    pub struct StopAfterFuture<S: Stream, Fut: Future> {
717        #[pin]
718        stream: S,
719        // Contains the inner Future on start and None once the inner Future is resolved
720        // or taken out by the user.
721        #[pin]
722        fut: Option<Fut>,
723        // Contains fut's return value once fut is resolved
724        fut_result: Option<Fut::Output>,
725        // Whether the future was taken out by the user.
726        free: bool,
727    }
728}
729
730impl<St, Fut> StopAfterFuture<St, Fut>
731where
732    St: Stream,
733    Fut: Future,
734{
735    /// Extract the stopping future out of the combinator.
736    ///
737    /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
738    /// Taking out the future means the combinator will be yielding
739    /// elements from the wrapped stream without ever stopping it.
740    pub fn take_future(&mut self) -> Option<Fut> {
741        if self.fut.is_some() {
742            self.free = true;
743        }
744
745        self.fut.take()
746    }
747
748    /// Once the stopping future is resolved, this method can be used
749    /// to extract the value returned by the stopping future.
750    ///
751    /// This may be used to retrieve arbitrary data from the stopping
752    /// future, for example a reason why the stream was stopped.
753    ///
754    /// This method will return `None` if the future isn't resolved yet,
755    /// or if the result was already taken out.
756    ///
757    /// # Examples
758    ///
759    /// ```
760    /// # spin_on::spin_on(async {
761    /// use futures_lite::stream::{self, StreamExt, stop_after_future};
762    /// use futures_lite::future;
763    /// use std::task::Poll;
764    ///
765    /// let stream = stream::iter(1..=10);
766    ///
767    /// let mut i = 0;
768    /// let stop_fut = future::poll_fn(|_cx| {
769    ///     i += 1;
770    ///     if i <= 5 {
771    ///         Poll::Pending
772    ///     } else {
773    ///         Poll::Ready("reason")
774    ///     }
775    /// });
776    ///
777    /// let mut stream = stop_after_future(stream, stop_fut);
778    /// let _ = (&mut stream).collect::<Vec<_>>().await;
779    ///
780    /// let result = stream.take_result().unwrap();
781    /// assert_eq!(result, "reason");
782    /// # });
783    /// ```
784    pub fn take_result(&mut self) -> Option<Fut::Output> {
785        self.fut_result.take()
786    }
787
788    /// Whether the stream was stopped yet by the stopping future
789    /// being resolved.
790    pub fn is_stopped(&self) -> bool {
791        !self.free && self.fut.is_none()
792    }
793}
794
795impl<St, Fut> Stream for StopAfterFuture<St, Fut>
796where
797    St: Stream,
798    Fut: Future,
799{
800    type Item = St::Item;
801
802    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
803        let mut this = self.project();
804
805        if let Some(f) = this.fut.as_mut().as_pin_mut() {
806            if let Poll::Ready(result) = f.poll(cx) {
807                this.fut.set(None);
808                *this.fut_result = Some(result);
809            }
810        }
811
812        if !*this.free && this.fut.is_none() {
813            // Future resolved, inner stream stopped
814            Poll::Ready(None)
815        } else {
816            // Future either not resolved yet or taken out by the user
817            let item = ready!(this.stream.poll_next(cx));
818            if item.is_none() {
819                this.fut.set(None);
820            }
821            Poll::Ready(item)
822        }
823    }
824
825    fn size_hint(&self) -> (usize, Option<usize>) {
826        if self.is_stopped() {
827            return (0, Some(0));
828        }
829
830        // Original stream can be truncated at any moment, so the lower bound isn't reliable.
831        let (_, upper_bound) = self.stream.size_hint();
832        (0, upper_bound)
833    }
834}
835
836/// Extension trait for [`Stream`].
837pub trait StreamExt: Stream {
838    /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
839    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
840    where
841        Self: Unpin,
842    {
843        Stream::poll_next(Pin::new(self), cx)
844    }
845
846    /// Retrieves the next item in the stream.
847    ///
848    /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
849    /// resume iteration after that.
850    ///
851    /// # Examples
852    ///
853    /// ```
854    /// use futures_lite::stream::{self, StreamExt};
855    ///
856    /// # spin_on::spin_on(async {
857    /// let mut s = stream::iter(1..=3);
858    ///
859    /// assert_eq!(s.next().await, Some(1));
860    /// assert_eq!(s.next().await, Some(2));
861    /// assert_eq!(s.next().await, Some(3));
862    /// assert_eq!(s.next().await, None);
863    /// # });
864    /// ```
865    fn next(&mut self) -> NextFuture<'_, Self>
866    where
867        Self: Unpin,
868    {
869        NextFuture { stream: self }
870    }
871
872    /// Retrieves the next item in the stream.
873    ///
874    /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
875    /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
876    ///
877    /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
878    ///
879    /// # Examples
880    ///
881    /// ```
882    /// use futures_lite::stream::{self, StreamExt};
883    ///
884    /// # spin_on::spin_on(async {
885    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
886    ///
887    /// assert_eq!(s.try_next().await, Ok(Some(1)));
888    /// assert_eq!(s.try_next().await, Ok(Some(2)));
889    /// assert_eq!(s.try_next().await, Err("error"));
890    /// assert_eq!(s.try_next().await, Ok(None));
891    /// # });
892    /// ```
893    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
894    where
895        Self: Stream<Item = Result<T, E>> + Unpin,
896    {
897        TryNextFuture { stream: self }
898    }
899
900    /// Counts the number of items in the stream.
901    ///
902    /// # Examples
903    ///
904    /// ```
905    /// use futures_lite::stream::{self, StreamExt};
906    ///
907    /// # spin_on::spin_on(async {
908    /// let s1 = stream::iter(vec![0]);
909    /// let s2 = stream::iter(vec![1, 2, 3]);
910    ///
911    /// assert_eq!(s1.count().await, 1);
912    /// assert_eq!(s2.count().await, 3);
913    /// # });
914    /// ```
915    fn count(self) -> CountFuture<Self>
916    where
917        Self: Sized,
918    {
919        CountFuture {
920            stream: self,
921            count: 0,
922        }
923    }
924
925    /// Maps items of the stream to new values using a closure.
926    ///
927    /// # Examples
928    ///
929    /// ```
930    /// use futures_lite::stream::{self, StreamExt};
931    ///
932    /// # spin_on::spin_on(async {
933    /// let s = stream::iter(vec![1, 2, 3]);
934    /// let mut s = s.map(|x| 2 * x);
935    ///
936    /// assert_eq!(s.next().await, Some(2));
937    /// assert_eq!(s.next().await, Some(4));
938    /// assert_eq!(s.next().await, Some(6));
939    /// assert_eq!(s.next().await, None);
940    /// # });
941    /// ```
942    fn map<T, F>(self, f: F) -> Map<Self, F>
943    where
944        Self: Sized,
945        F: FnMut(Self::Item) -> T,
946    {
947        Map { stream: self, f }
948    }
949
950    /// Maps items to streams and then concatenates them.
951    ///
952    /// # Examples
953    ///
954    /// ```
955    /// use futures_lite::stream::{self, StreamExt};
956    ///
957    /// # spin_on::spin_on(async {
958    /// let words = stream::iter(vec!["one", "two"]);
959    ///
960    /// let s: String = words
961    ///     .flat_map(|s| stream::iter(s.chars()))
962    ///     .collect()
963    ///     .await;
964    ///
965    /// assert_eq!(s, "onetwo");
966    /// # });
967    /// ```
968    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
969    where
970        Self: Sized,
971        U: Stream,
972        F: FnMut(Self::Item) -> U,
973    {
974        FlatMap {
975            stream: self.map(f),
976            inner_stream: None,
977        }
978    }
979
980    /// Concatenates inner streams.
981    ///
982    /// # Examples
983    ///
984    /// ```
985    /// use futures_lite::stream::{self, StreamExt};
986    ///
987    /// # spin_on::spin_on(async {
988    /// let s1 = stream::iter(vec![1, 2, 3]);
989    /// let s2 = stream::iter(vec![4, 5]);
990    ///
991    /// let s = stream::iter(vec![s1, s2]);
992    /// let v: Vec<_> = s.flatten().collect().await;
993    /// assert_eq!(v, [1, 2, 3, 4, 5]);
994    /// # });
995    /// ```
996    fn flatten(self) -> Flatten<Self>
997    where
998        Self: Sized,
999        Self::Item: Stream,
1000    {
1001        Flatten {
1002            stream: self,
1003            inner_stream: None,
1004        }
1005    }
1006
1007    /// Maps items of the stream to new values using an async closure.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```
1012    /// use futures_lite::pin;
1013    /// use futures_lite::stream::{self, StreamExt};
1014    ///
1015    /// # spin_on::spin_on(async {
1016    /// let s = stream::iter(vec![1, 2, 3]);
1017    /// let mut s = s.then(|x| async move { 2 * x });
1018    ///
1019    /// pin!(s);
1020    /// assert_eq!(s.next().await, Some(2));
1021    /// assert_eq!(s.next().await, Some(4));
1022    /// assert_eq!(s.next().await, Some(6));
1023    /// assert_eq!(s.next().await, None);
1024    /// # });
1025    /// ```
1026    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1027    where
1028        Self: Sized,
1029        F: FnMut(Self::Item) -> Fut,
1030        Fut: Future,
1031    {
1032        Then {
1033            stream: self,
1034            future: None,
1035            f,
1036        }
1037    }
1038
1039    /// Keeps items of the stream for which `predicate` returns `true`.
1040    ///
1041    /// # Examples
1042    ///
1043    /// ```
1044    /// use futures_lite::stream::{self, StreamExt};
1045    ///
1046    /// # spin_on::spin_on(async {
1047    /// let s = stream::iter(vec![1, 2, 3, 4]);
1048    /// let mut s = s.filter(|i| i % 2 == 0);
1049    ///
1050    /// assert_eq!(s.next().await, Some(2));
1051    /// assert_eq!(s.next().await, Some(4));
1052    /// assert_eq!(s.next().await, None);
1053    /// # });
1054    /// ```
1055    fn filter<P>(self, predicate: P) -> Filter<Self, P>
1056    where
1057        Self: Sized,
1058        P: FnMut(&Self::Item) -> bool,
1059    {
1060        Filter {
1061            stream: self,
1062            predicate,
1063        }
1064    }
1065
1066    /// Filters and maps items of the stream using a closure.
1067    ///
1068    /// # Examples
1069    ///
1070    /// ```
1071    /// use futures_lite::stream::{self, StreamExt};
1072    ///
1073    /// # spin_on::spin_on(async {
1074    /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1075    /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1076    ///
1077    /// assert_eq!(s.next().await, Some(1));
1078    /// assert_eq!(s.next().await, Some(3));
1079    /// assert_eq!(s.next().await, Some(5));
1080    /// assert_eq!(s.next().await, None);
1081    /// # });
1082    /// ```
1083    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1084    where
1085        Self: Sized,
1086        F: FnMut(Self::Item) -> Option<T>,
1087    {
1088        FilterMap { stream: self, f }
1089    }
1090
1091    /// Takes only the first `n` items of the stream.
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use futures_lite::stream::{self, StreamExt};
1097    ///
1098    /// # spin_on::spin_on(async {
1099    /// let mut s = stream::repeat(7).take(2);
1100    ///
1101    /// assert_eq!(s.next().await, Some(7));
1102    /// assert_eq!(s.next().await, Some(7));
1103    /// assert_eq!(s.next().await, None);
1104    /// # });
1105    /// ```
1106    fn take(self, n: usize) -> Take<Self>
1107    where
1108        Self: Sized,
1109    {
1110        Take { stream: self, n }
1111    }
1112
1113    /// Takes items while `predicate` returns `true`.
1114    ///
1115    /// # Examples
1116    ///
1117    /// ```
1118    /// use futures_lite::stream::{self, StreamExt};
1119    ///
1120    /// # spin_on::spin_on(async {
1121    /// let s = stream::iter(vec![1, 2, 3, 4]);
1122    /// let mut s = s.take_while(|x| *x < 3);
1123    ///
1124    /// assert_eq!(s.next().await, Some(1));
1125    /// assert_eq!(s.next().await, Some(2));
1126    /// assert_eq!(s.next().await, None);
1127    /// # });
1128    /// ```
1129    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1130    where
1131        Self: Sized,
1132        P: FnMut(&Self::Item) -> bool,
1133    {
1134        TakeWhile {
1135            stream: self,
1136            predicate,
1137        }
1138    }
1139
1140    /// Maps items while `predicate` returns [`Some`].
1141    ///
1142    /// This stream is not fused. After the predicate returns [`None`] the stream still
1143    /// contains remaining items that can be obtained by subsequent `next` calls.
1144    /// You can [`fuse`](StreamExt::fuse) the stream if this behavior is undesirable.
1145    ///
1146    /// # Examples
1147    ///
1148    /// ```
1149    /// use futures_lite::stream::{self, StreamExt};
1150    ///
1151    /// # spin_on::spin_on(async {
1152    /// let s = stream::iter(vec![1, 2, 0, 3]);
1153    /// let mut s = s.map_while(|x: u32| x.checked_sub(1));
1154    ///
1155    /// assert_eq!(s.next().await, Some(0));
1156    /// assert_eq!(s.next().await, Some(1));
1157    /// assert_eq!(s.next().await, None);
1158    ///
1159    /// // Continue to iterate the stream.
1160    /// assert_eq!(s.next().await, Some(2));
1161    /// assert_eq!(s.next().await, None);
1162    /// # });
1163    /// ```
1164    fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1165    where
1166        Self: Sized,
1167        P: FnMut(Self::Item) -> Option<B>,
1168    {
1169        MapWhile {
1170            stream: self,
1171            predicate,
1172        }
1173    }
1174
1175    /// Skips the first `n` items of the stream.
1176    ///
1177    /// # Examples
1178    ///
1179    /// ```
1180    /// use futures_lite::stream::{self, StreamExt};
1181    ///
1182    /// # spin_on::spin_on(async {
1183    /// let s = stream::iter(vec![1, 2, 3]);
1184    /// let mut s = s.skip(2);
1185    ///
1186    /// assert_eq!(s.next().await, Some(3));
1187    /// assert_eq!(s.next().await, None);
1188    /// # });
1189    /// ```
1190    fn skip(self, n: usize) -> Skip<Self>
1191    where
1192        Self: Sized,
1193    {
1194        Skip { stream: self, n }
1195    }
1196
1197    /// Skips items while `predicate` returns `true`.
1198    ///
1199    /// # Examples
1200    ///
1201    /// ```
1202    /// use futures_lite::stream::{self, StreamExt};
1203    ///
1204    /// # spin_on::spin_on(async {
1205    /// let s = stream::iter(vec![-1i32, 0, 1]);
1206    /// let mut s = s.skip_while(|x| x.is_negative());
1207    ///
1208    /// assert_eq!(s.next().await, Some(0));
1209    /// assert_eq!(s.next().await, Some(1));
1210    /// assert_eq!(s.next().await, None);
1211    /// # });
1212    /// ```
1213    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1214    where
1215        Self: Sized,
1216        P: FnMut(&Self::Item) -> bool,
1217    {
1218        SkipWhile {
1219            stream: self,
1220            predicate: Some(predicate),
1221        }
1222    }
1223
1224    /// Yields every `step`th item.
1225    ///
1226    /// # Panics
1227    ///
1228    /// This method will panic if the `step` is 0.
1229    ///
1230    /// # Examples
1231    ///
1232    /// ```
1233    /// use futures_lite::stream::{self, StreamExt};
1234    ///
1235    /// # spin_on::spin_on(async {
1236    /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1237    /// let mut s = s.step_by(2);
1238    ///
1239    /// assert_eq!(s.next().await, Some(0));
1240    /// assert_eq!(s.next().await, Some(2));
1241    /// assert_eq!(s.next().await, Some(4));
1242    /// assert_eq!(s.next().await, None);
1243    /// # });
1244    /// ```
1245    fn step_by(self, step: usize) -> StepBy<Self>
1246    where
1247        Self: Sized,
1248    {
1249        assert!(step > 0, "`step` must be greater than zero");
1250        StepBy {
1251            stream: self,
1252            step,
1253            i: 0,
1254        }
1255    }
1256
1257    /// Appends another stream to the end of this one.
1258    ///
1259    /// # Examples
1260    ///
1261    /// ```
1262    /// use futures_lite::stream::{self, StreamExt};
1263    ///
1264    /// # spin_on::spin_on(async {
1265    /// let s1 = stream::iter(vec![1, 2]);
1266    /// let s2 = stream::iter(vec![7, 8]);
1267    /// let mut s = s1.chain(s2);
1268    ///
1269    /// assert_eq!(s.next().await, Some(1));
1270    /// assert_eq!(s.next().await, Some(2));
1271    /// assert_eq!(s.next().await, Some(7));
1272    /// assert_eq!(s.next().await, Some(8));
1273    /// assert_eq!(s.next().await, None);
1274    /// # });
1275    /// ```
1276    fn chain<U>(self, other: U) -> Chain<Self, U>
1277    where
1278        Self: Sized,
1279        U: Stream<Item = Self::Item> + Sized,
1280    {
1281        Chain {
1282            first: self.fuse(),
1283            second: other.fuse(),
1284        }
1285    }
1286
1287    /// Clones all items.
1288    ///
1289    /// # Examples
1290    ///
1291    /// ```
1292    /// use futures_lite::stream::{self, StreamExt};
1293    ///
1294    /// # spin_on::spin_on(async {
1295    /// let s = stream::iter(vec![&1, &2]);
1296    /// let mut s = s.cloned();
1297    ///
1298    /// assert_eq!(s.next().await, Some(1));
1299    /// assert_eq!(s.next().await, Some(2));
1300    /// assert_eq!(s.next().await, None);
1301    /// # });
1302    /// ```
1303    fn cloned<'a, T>(self) -> Cloned<Self>
1304    where
1305        Self: Stream<Item = &'a T> + Sized,
1306        T: Clone + 'a,
1307    {
1308        Cloned { stream: self }
1309    }
1310
1311    /// Copies all items.
1312    ///
1313    /// # Examples
1314    ///
1315    /// ```
1316    /// use futures_lite::stream::{self, StreamExt};
1317    ///
1318    /// # spin_on::spin_on(async {
1319    /// let s = stream::iter(vec![&1, &2]);
1320    /// let mut s = s.copied();
1321    ///
1322    /// assert_eq!(s.next().await, Some(1));
1323    /// assert_eq!(s.next().await, Some(2));
1324    /// assert_eq!(s.next().await, None);
1325    /// # });
1326    /// ```
1327    fn copied<'a, T>(self) -> Copied<Self>
1328    where
1329        Self: Stream<Item = &'a T> + Sized,
1330        T: Copy + 'a,
1331    {
1332        Copied { stream: self }
1333    }
1334
1335    /// Collects all items in the stream into a collection.
1336    ///
1337    /// # Examples
1338    ///
1339    /// ```
1340    /// use futures_lite::stream::{self, StreamExt};
1341    ///
1342    /// # spin_on::spin_on(async {
1343    /// let mut s = stream::iter(1..=3);
1344    ///
1345    /// let items: Vec<_> = s.collect().await;
1346    /// assert_eq!(items, [1, 2, 3]);
1347    /// # });
1348    /// ```
1349    fn collect<C>(self) -> CollectFuture<Self, C>
1350    where
1351        Self: Sized,
1352        C: Default + Extend<Self::Item>,
1353    {
1354        CollectFuture {
1355            stream: self,
1356            collection: Default::default(),
1357        }
1358    }
1359
1360    /// Collects all items in the fallible stream into a collection.
1361    ///
1362    /// ```
1363    /// use futures_lite::stream::{self, StreamExt};
1364    ///
1365    /// # spin_on::spin_on(async {
1366    /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1367    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1368    /// assert_eq!(res, Err(2));
1369    ///
1370    /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1371    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1372    /// assert_eq!(res, Ok(vec![1, 2, 3]));
1373    /// # })
1374    /// ```
1375    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1376    where
1377        Self: Stream<Item = Result<T, E>> + Sized,
1378        C: Default + Extend<T>,
1379    {
1380        TryCollectFuture {
1381            stream: self,
1382            items: Default::default(),
1383        }
1384    }
1385
1386    /// Partitions items into those for which `predicate` is `true` and those for which it is
1387    /// `false`, and then collects them into two collections.
1388    ///
1389    /// # Examples
1390    ///
1391    /// ```
1392    /// use futures_lite::stream::{self, StreamExt};
1393    ///
1394    /// # spin_on::spin_on(async {
1395    /// let s = stream::iter(vec![1, 2, 3]);
1396    /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1397    ///
1398    /// assert_eq!(even, &[2]);
1399    /// assert_eq!(odd, &[1, 3]);
1400    /// # })
1401    /// ```
1402    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1403    where
1404        Self: Sized,
1405        B: Default + Extend<Self::Item>,
1406        P: FnMut(&Self::Item) -> bool,
1407    {
1408        PartitionFuture {
1409            stream: self,
1410            predicate,
1411            res: Some(Default::default()),
1412        }
1413    }
1414
1415    /// Accumulates a computation over the stream.
1416    ///
1417    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1418    /// the accumulator and each item in the stream. The final accumulator value is returned.
1419    ///
1420    /// # Examples
1421    ///
1422    /// ```
1423    /// use futures_lite::stream::{self, StreamExt};
1424    ///
1425    /// # spin_on::spin_on(async {
1426    /// let s = stream::iter(vec![1, 2, 3]);
1427    /// let sum = s.fold(0, |acc, x| acc + x).await;
1428    ///
1429    /// assert_eq!(sum, 6);
1430    /// # })
1431    /// ```
1432    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1433    where
1434        Self: Sized,
1435        F: FnMut(T, Self::Item) -> T,
1436    {
1437        FoldFuture {
1438            stream: self,
1439            f,
1440            acc: Some(init),
1441        }
1442    }
1443
1444    /// Accumulates a fallible computation over the stream.
1445    ///
1446    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1447    /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1448    /// error if `f` failed the computation.
1449    ///
1450    /// # Examples
1451    ///
1452    /// ```
1453    /// use futures_lite::stream::{self, StreamExt};
1454    ///
1455    /// # spin_on::spin_on(async {
1456    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1457    ///
1458    /// let sum = s.try_fold(0, |acc, v| {
1459    ///     if (acc + v) % 2 == 1 {
1460    ///         Ok(acc + v)
1461    ///     } else {
1462    ///         Err("fail")
1463    ///     }
1464    /// })
1465    /// .await;
1466    ///
1467    /// assert_eq!(sum, Err("fail"));
1468    /// # })
1469    /// ```
1470    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1471    where
1472        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1473        F: FnMut(B, T) -> Result<B, E>,
1474    {
1475        TryFoldFuture {
1476            stream: self,
1477            f,
1478            acc: Some(init),
1479        }
1480    }
1481
1482    /// Maps items of the stream to new values using a state value and a closure.
1483    ///
1484    /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1485    /// state and each item in the stream. The stream stops when `f` returns `None`.
1486    ///
1487    /// # Examples
1488    ///
1489    /// ```
1490    /// use futures_lite::stream::{self, StreamExt};
1491    ///
1492    /// # spin_on::spin_on(async {
1493    /// let s = stream::iter(vec![1, 2, 3]);
1494    /// let mut s = s.scan(1, |state, x| {
1495    ///     *state = *state * x;
1496    ///     Some(-*state)
1497    /// });
1498    ///
1499    /// assert_eq!(s.next().await, Some(-1));
1500    /// assert_eq!(s.next().await, Some(-2));
1501    /// assert_eq!(s.next().await, Some(-6));
1502    /// assert_eq!(s.next().await, None);
1503    /// # })
1504    /// ```
1505    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1506    where
1507        Self: Sized,
1508        F: FnMut(&mut St, Self::Item) -> Option<B>,
1509    {
1510        Scan {
1511            stream: self,
1512            state_f: (initial_state, f),
1513        }
1514    }
1515
1516    /// Fuses the stream so that it stops yielding items after the first [`None`].
1517    ///
1518    /// # Examples
1519    ///
1520    /// ```
1521    /// use futures_lite::stream::{self, StreamExt};
1522    ///
1523    /// # spin_on::spin_on(async {
1524    /// let mut s = stream::once(1).fuse();
1525    ///
1526    /// assert_eq!(s.next().await, Some(1));
1527    /// assert_eq!(s.next().await, None);
1528    /// assert_eq!(s.next().await, None);
1529    /// # })
1530    /// ```
1531    fn fuse(self) -> Fuse<Self>
1532    where
1533        Self: Sized,
1534    {
1535        Fuse {
1536            stream: self,
1537            done: false,
1538        }
1539    }
1540
1541    /// Repeats the stream from beginning to end, forever.
1542    ///
1543    /// # Examples
1544    ///
1545    /// ```
1546    /// use futures_lite::stream::{self, StreamExt};
1547    ///
1548    /// # spin_on::spin_on(async {
1549    /// let mut s = stream::iter(vec![1, 2]).cycle();
1550    ///
1551    /// assert_eq!(s.next().await, Some(1));
1552    /// assert_eq!(s.next().await, Some(2));
1553    /// assert_eq!(s.next().await, Some(1));
1554    /// assert_eq!(s.next().await, Some(2));
1555    /// # });
1556    /// ```
1557    fn cycle(self) -> Cycle<Self>
1558    where
1559        Self: Clone + Sized,
1560    {
1561        Cycle {
1562            orig: self.clone(),
1563            stream: self,
1564        }
1565    }
1566
1567    /// Enumerates items, mapping them to `(index, item)`.
1568    ///
1569    /// # Examples
1570    ///
1571    /// ```
1572    /// use futures_lite::stream::{self, StreamExt};
1573    ///
1574    /// # spin_on::spin_on(async {
1575    /// let s = stream::iter(vec!['a', 'b', 'c']);
1576    /// let mut s = s.enumerate();
1577    ///
1578    /// assert_eq!(s.next().await, Some((0, 'a')));
1579    /// assert_eq!(s.next().await, Some((1, 'b')));
1580    /// assert_eq!(s.next().await, Some((2, 'c')));
1581    /// assert_eq!(s.next().await, None);
1582    /// # });
1583    /// ```
1584    fn enumerate(self) -> Enumerate<Self>
1585    where
1586        Self: Sized,
1587    {
1588        Enumerate { stream: self, i: 0 }
1589    }
1590
1591    /// Calls a closure on each item and passes it on.
1592    ///
1593    /// # Examples
1594    ///
1595    /// ```
1596    /// use futures_lite::stream::{self, StreamExt};
1597    ///
1598    /// # spin_on::spin_on(async {
1599    /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1600    ///
1601    /// let sum = s
1602    ///    .inspect(|x| println!("about to filter {}", x))
1603    ///    .filter(|x| x % 2 == 0)
1604    ///    .inspect(|x| println!("made it through filter: {}", x))
1605    ///    .fold(0, |sum, i| sum + i)
1606    ///    .await;
1607    /// # });
1608    /// ```
1609    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1610    where
1611        Self: Sized,
1612        F: FnMut(&Self::Item),
1613    {
1614        Inspect { stream: self, f }
1615    }
1616
1617    /// Gets the `n`th item of the stream.
1618    ///
1619    /// In the end, `n+1` items of the stream will be consumed.
1620    ///
1621    /// # Examples
1622    ///
1623    /// ```
1624    /// use futures_lite::stream::{self, StreamExt};
1625    ///
1626    /// # spin_on::spin_on(async {
1627    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1628    ///
1629    /// assert_eq!(s.nth(2).await, Some(2));
1630    /// assert_eq!(s.nth(2).await, Some(5));
1631    /// assert_eq!(s.nth(2).await, None);
1632    /// # });
1633    /// ```
1634    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1635    where
1636        Self: Unpin,
1637    {
1638        NthFuture { stream: self, n }
1639    }
1640
1641    /// Returns the last item in the stream.
1642    ///
1643    /// # Examples
1644    ///
1645    /// ```
1646    /// use futures_lite::stream::{self, StreamExt};
1647    ///
1648    /// # spin_on::spin_on(async {
1649    /// let s = stream::iter(vec![1, 2, 3, 4]);
1650    /// assert_eq!(s.last().await, Some(4));
1651    ///
1652    /// let s = stream::empty::<i32>();
1653    /// assert_eq!(s.last().await, None);
1654    /// # });
1655    /// ```
1656    fn last(self) -> LastFuture<Self>
1657    where
1658        Self: Sized,
1659    {
1660        LastFuture {
1661            stream: self,
1662            last: None,
1663        }
1664    }
1665
1666    /// Finds the first item of the stream for which `predicate` returns `true`.
1667    ///
1668    /// # Examples
1669    ///
1670    /// ```
1671    /// use futures_lite::stream::{self, StreamExt};
1672    ///
1673    /// # spin_on::spin_on(async {
1674    /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1675    ///
1676    /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1677    /// assert_eq!(s.next().await, Some(13));
1678    /// # });
1679    /// ```
1680    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1681    where
1682        Self: Unpin,
1683        P: FnMut(&Self::Item) -> bool,
1684    {
1685        FindFuture {
1686            stream: self,
1687            predicate,
1688        }
1689    }
1690
1691    /// Applies a closure to items in the stream and returns the first [`Some`] result.
1692    ///
1693    /// # Examples
1694    ///
1695    /// ```
1696    /// use futures_lite::stream::{self, StreamExt};
1697    ///
1698    /// # spin_on::spin_on(async {
1699    /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1700    /// let number = s.find_map(|s| s.parse().ok()).await;
1701    ///
1702    /// assert_eq!(number, Some(2));
1703    /// # });
1704    /// ```
1705    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1706    where
1707        Self: Unpin,
1708        F: FnMut(Self::Item) -> Option<B>,
1709    {
1710        FindMapFuture { stream: self, f }
1711    }
1712
1713    /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1714    ///
1715    /// # Examples
1716    ///
1717    /// ```
1718    /// use futures_lite::stream::{self, StreamExt};
1719    ///
1720    /// # spin_on::spin_on(async {
1721    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1722    ///
1723    /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1724    /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1725    /// assert_eq!(s.position(|x| x == 9).await, None);
1726    /// # });
1727    /// ```
1728    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1729    where
1730        Self: Unpin,
1731        P: FnMut(Self::Item) -> bool,
1732    {
1733        PositionFuture {
1734            stream: self,
1735            predicate,
1736            index: 0,
1737        }
1738    }
1739
1740    /// Tests if `predicate` returns `true` for all items in the stream.
1741    ///
1742    /// The result is `true` for an empty stream.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```
1747    /// use futures_lite::stream::{self, StreamExt};
1748    ///
1749    /// # spin_on::spin_on(async {
1750    /// let mut s = stream::iter(vec![1, 2, 3]);
1751    /// assert!(!s.all(|x| x % 2 == 0).await);
1752    ///
1753    /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1754    /// assert!(s.all(|x| x % 2 == 0).await);
1755    ///
1756    /// let mut s = stream::empty::<i32>();
1757    /// assert!(s.all(|x| x % 2 == 0).await);
1758    /// # });
1759    /// ```
1760    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1761    where
1762        Self: Unpin,
1763        P: FnMut(Self::Item) -> bool,
1764    {
1765        AllFuture {
1766            stream: self,
1767            predicate,
1768        }
1769    }
1770
1771    /// Tests if `predicate` returns `true` for any item in the stream.
1772    ///
1773    /// The result is `false` for an empty stream.
1774    ///
1775    /// # Examples
1776    ///
1777    /// ```
1778    /// use futures_lite::stream::{self, StreamExt};
1779    ///
1780    /// # spin_on::spin_on(async {
1781    /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1782    /// assert!(!s.any(|x| x % 2 == 0).await);
1783    ///
1784    /// let mut s = stream::iter(vec![1, 2, 3]);
1785    /// assert!(s.any(|x| x % 2 == 0).await);
1786    ///
1787    /// let mut s = stream::empty::<i32>();
1788    /// assert!(!s.any(|x| x % 2 == 0).await);
1789    /// # });
1790    /// ```
1791    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1792    where
1793        Self: Unpin,
1794        P: FnMut(Self::Item) -> bool,
1795    {
1796        AnyFuture {
1797            stream: self,
1798            predicate,
1799        }
1800    }
1801
1802    /// Calls a closure on each item of the stream.
1803    ///
1804    /// # Examples
1805    ///
1806    /// ```
1807    /// use futures_lite::stream::{self, StreamExt};
1808    ///
1809    /// # spin_on::spin_on(async {
1810    /// let mut s = stream::iter(vec![1, 2, 3]);
1811    /// s.for_each(|s| println!("{}", s)).await;
1812    /// # });
1813    /// ```
1814    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1815    where
1816        Self: Sized,
1817        F: FnMut(Self::Item),
1818    {
1819        ForEachFuture { stream: self, f }
1820    }
1821
1822    /// Calls a fallible closure on each item of the stream, stopping on first error.
1823    ///
1824    /// # Examples
1825    ///
1826    /// ```
1827    /// use futures_lite::stream::{self, StreamExt};
1828    ///
1829    /// # spin_on::spin_on(async {
1830    /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1831    ///
1832    /// let mut v = vec![];
1833    /// let res = s
1834    ///     .try_for_each(|n| {
1835    ///         if n < 2 {
1836    ///             v.push(n);
1837    ///             Ok(())
1838    ///         } else {
1839    ///             Err("too big")
1840    ///         }
1841    ///     })
1842    ///     .await;
1843    ///
1844    /// assert_eq!(v, &[0, 1]);
1845    /// assert_eq!(res, Err("too big"));
1846    /// # });
1847    /// ```
1848    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1849    where
1850        Self: Unpin,
1851        F: FnMut(Self::Item) -> Result<(), E>,
1852    {
1853        TryForEachFuture { stream: self, f }
1854    }
1855
1856    /// Zips up two streams into a single stream of pairs.
1857    ///
1858    /// The stream of pairs stops when either of the original two streams is exhausted.
1859    ///
1860    /// # Examples
1861    ///
1862    /// ```
1863    /// use futures_lite::stream::{self, StreamExt};
1864    ///
1865    /// # spin_on::spin_on(async {
1866    /// let l = stream::iter(vec![1, 2, 3]);
1867    /// let r = stream::iter(vec![4, 5, 6, 7]);
1868    /// let mut s = l.zip(r);
1869    ///
1870    /// assert_eq!(s.next().await, Some((1, 4)));
1871    /// assert_eq!(s.next().await, Some((2, 5)));
1872    /// assert_eq!(s.next().await, Some((3, 6)));
1873    /// assert_eq!(s.next().await, None);
1874    /// # });
1875    /// ```
1876    fn zip<U>(self, other: U) -> Zip<Self, U>
1877    where
1878        Self: Sized,
1879        U: Stream,
1880    {
1881        Zip {
1882            item_slot: None,
1883            first: self,
1884            second: other,
1885        }
1886    }
1887
1888    /// Collects a stream of pairs into a pair of collections.
1889    ///
1890    /// # Examples
1891    ///
1892    /// ```
1893    /// use futures_lite::stream::{self, StreamExt};
1894    ///
1895    /// # spin_on::spin_on(async {
1896    /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1897    /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1898    ///
1899    /// assert_eq!(left, [1, 3]);
1900    /// assert_eq!(right, [2, 4]);
1901    /// # });
1902    /// ```
1903    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1904    where
1905        FromA: Default + Extend<A>,
1906        FromB: Default + Extend<B>,
1907        Self: Stream<Item = (A, B)> + Sized,
1908    {
1909        UnzipFuture {
1910            stream: self,
1911            res: Some(Default::default()),
1912        }
1913    }
1914
1915    /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1916    ///
1917    /// # Examples
1918    ///
1919    /// ```
1920    /// use futures_lite::stream::{self, StreamExt};
1921    /// use futures_lite::stream::{once, pending};
1922    ///
1923    /// # spin_on::spin_on(async {
1924    /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1925    /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1926    ///
1927    /// // The first future wins.
1928    /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1929    /// # })
1930    /// ```
1931    fn or<S>(self, other: S) -> Or<Self, S>
1932    where
1933        Self: Sized,
1934        S: Stream<Item = Self::Item>,
1935    {
1936        Or {
1937            stream1: self,
1938            stream2: other,
1939        }
1940    }
1941
1942    /// Merges with `other` stream, with no preference for either stream when both are ready.
1943    ///
1944    /// # Examples
1945    ///
1946    /// ```
1947    /// use futures_lite::stream::{self, StreamExt};
1948    /// use futures_lite::stream::{once, pending};
1949    ///
1950    /// # spin_on::spin_on(async {
1951    /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1952    /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1953    ///
1954    /// // One of the two stream is randomly chosen as the winner.
1955    /// let res = once(1).race(once(2)).next().await;
1956    /// # })
1957    /// ```
1958    #[cfg(all(feature = "std", feature = "race"))]
1959    fn race<S>(self, other: S) -> Race<Self, S>
1960    where
1961        Self: Sized,
1962        S: Stream<Item = Self::Item>,
1963    {
1964        Race {
1965            stream1: self,
1966            stream2: other,
1967            rng: Rng::new(),
1968        }
1969    }
1970
1971    /// Yields all immediately available values from a stream.
1972    ///
1973    /// This is intended to be used as a way of polling a stream without waiting, similar to the
1974    /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1975    /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1976    /// channel, but will not wait for new messages.
1977    ///
1978    /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1979    /// polling context in order to poll the underlying stream. Since this stream will never return
1980    /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1981    /// [`Iterator`].
1982    ///
1983    /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1984    /// the future if it is polled again.
1985    ///
1986    /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1987    /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1988    /// [`Stream`]: crate::stream::Stream
1989    /// [`Iterator`]: std::iter::Iterator
1990    ///
1991    /// # Examples
1992    ///
1993    /// ```
1994    /// use futures_lite::{future, pin};
1995    /// use futures_lite::stream::{self, StreamExt};
1996    ///
1997    /// # #[cfg(feature = "std")] {
1998    /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1999    /// let pend_once = stream::once_future(async {
2000    ///     future::yield_now().await;
2001    ///     3
2002    /// });
2003    /// let s = stream::iter(vec![1, 2]).chain(pend_once);
2004    /// pin!(s);
2005    ///
2006    /// // This will return the first two values, and then `None` because the stream returns
2007    /// // `Pending` after that.
2008    /// let mut iter = stream::block_on(s.drain());
2009    /// assert_eq!(iter.next(), Some(1));
2010    /// assert_eq!(iter.next(), Some(2));
2011    /// assert_eq!(iter.next(), None);
2012    ///
2013    /// // This will return the last value, because the stream returns `Ready` when polled.
2014    /// assert_eq!(iter.next(), Some(3));
2015    /// assert_eq!(iter.next(), None);
2016    /// # }
2017    /// ```
2018    fn drain(&mut self) -> Drain<'_, Self> {
2019        Drain { stream: self }
2020    }
2021
2022    /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
2023    ///
2024    /// # Examples
2025    ///
2026    /// ```
2027    /// use futures_lite::stream::{self, StreamExt};
2028    ///
2029    /// # spin_on::spin_on(async {
2030    /// let a = stream::once(1);
2031    /// let b = stream::empty();
2032    ///
2033    /// // Streams of different types can be stored in
2034    /// // the same collection when they are boxed:
2035    /// let streams = vec![a.boxed(), b.boxed()];
2036    /// # })
2037    /// ```
2038    #[cfg(feature = "alloc")]
2039    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2040    where
2041        Self: Send + Sized + 'a,
2042    {
2043        Box::pin(self)
2044    }
2045
2046    /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2047    ///
2048    /// # Examples
2049    ///
2050    /// ```
2051    /// use futures_lite::stream::{self, StreamExt};
2052    ///
2053    /// # spin_on::spin_on(async {
2054    /// let a = stream::once(1);
2055    /// let b = stream::empty();
2056    ///
2057    /// // Streams of different types can be stored in
2058    /// // the same collection when they are boxed:
2059    /// let streams = vec![a.boxed_local(), b.boxed_local()];
2060    /// # })
2061    /// ```
2062    #[cfg(feature = "alloc")]
2063    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2064    where
2065        Self: Sized + 'a,
2066    {
2067        Box::pin(self)
2068    }
2069}
2070
2071impl<S: Stream + ?Sized> StreamExt for S {}
2072
2073/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2074///
2075/// # Examples
2076///
2077/// ```
2078/// use futures_lite::stream::{self, StreamExt};
2079///
2080/// // These two lines are equivalent:
2081/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2082/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2083/// ```
2084#[cfg(feature = "alloc")]
2085pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2086
2087/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2088///
2089/// # Examples
2090///
2091/// ```
2092/// use futures_lite::stream::{self, StreamExt};
2093///
2094/// // These two lines are equivalent:
2095/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2096/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2097/// ```
2098#[cfg(feature = "alloc")]
2099pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2100
2101/// Future for the [`StreamExt::next()`] method.
2102#[derive(Debug)]
2103#[must_use = "futures do nothing unless you `.await` or poll them"]
2104pub struct NextFuture<'a, S: ?Sized> {
2105    stream: &'a mut S,
2106}
2107
2108impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2109
2110impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2111    type Output = Option<S::Item>;
2112
2113    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2114        self.stream.poll_next(cx)
2115    }
2116}
2117
2118/// Future for the [`StreamExt::try_next()`] method.
2119#[derive(Debug)]
2120#[must_use = "futures do nothing unless you `.await` or poll them"]
2121pub struct TryNextFuture<'a, S: ?Sized> {
2122    stream: &'a mut S,
2123}
2124
2125impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2126
2127impl<T, E, S> Future for TryNextFuture<'_, S>
2128where
2129    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2130{
2131    type Output = Result<Option<T>, E>;
2132
2133    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2134        let res = ready!(self.stream.poll_next(cx));
2135        Poll::Ready(res.transpose())
2136    }
2137}
2138
2139pin_project! {
2140    /// Future for the [`StreamExt::count()`] method.
2141    #[derive(Debug)]
2142    #[must_use = "futures do nothing unless you `.await` or poll them"]
2143    pub struct CountFuture<S: ?Sized> {
2144        count: usize,
2145        #[pin]
2146        stream: S,
2147    }
2148}
2149
2150impl<S: Stream + ?Sized> Future for CountFuture<S> {
2151    type Output = usize;
2152
2153    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2154        loop {
2155            match ready!(self.as_mut().project().stream.poll_next(cx)) {
2156                None => return Poll::Ready(self.count),
2157                Some(_) => *self.as_mut().project().count += 1,
2158            }
2159        }
2160    }
2161}
2162
2163pin_project! {
2164    /// Future for the [`StreamExt::collect()`] method.
2165    #[derive(Debug)]
2166    #[must_use = "futures do nothing unless you `.await` or poll them"]
2167    pub struct CollectFuture<S, C> {
2168        #[pin]
2169        stream: S,
2170        collection: C,
2171    }
2172}
2173
2174impl<S, C> Future for CollectFuture<S, C>
2175where
2176    S: Stream,
2177    C: Default + Extend<S::Item>,
2178{
2179    type Output = C;
2180
2181    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2182        let mut this = self.as_mut().project();
2183        loop {
2184            match ready!(this.stream.as_mut().poll_next(cx)) {
2185                Some(e) => this.collection.extend(Some(e)),
2186                None => return Poll::Ready(mem::take(self.project().collection)),
2187            }
2188        }
2189    }
2190}
2191
2192pin_project! {
2193    /// Future for the [`StreamExt::try_collect()`] method.
2194    #[derive(Debug)]
2195    #[must_use = "futures do nothing unless you `.await` or poll them"]
2196    pub struct TryCollectFuture<S, C> {
2197        #[pin]
2198        stream: S,
2199        items: C,
2200    }
2201}
2202
2203impl<T, E, S, C> Future for TryCollectFuture<S, C>
2204where
2205    S: Stream<Item = Result<T, E>>,
2206    C: Default + Extend<T>,
2207{
2208    type Output = Result<C, E>;
2209
2210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2211        let mut this = self.project();
2212        Poll::Ready(Ok(loop {
2213            match ready!(this.stream.as_mut().poll_next(cx)?) {
2214                Some(x) => this.items.extend(Some(x)),
2215                None => break mem::take(this.items),
2216            }
2217        }))
2218    }
2219}
2220
2221pin_project! {
2222    /// Future for the [`StreamExt::partition()`] method.
2223    #[derive(Debug)]
2224    #[must_use = "futures do nothing unless you `.await` or poll them"]
2225    pub struct PartitionFuture<S, P, B> {
2226        #[pin]
2227        stream: S,
2228        predicate: P,
2229        res: Option<(B, B)>,
2230    }
2231}
2232
2233impl<S, P, B> Future for PartitionFuture<S, P, B>
2234where
2235    S: Stream + Sized,
2236    P: FnMut(&S::Item) -> bool,
2237    B: Default + Extend<S::Item>,
2238{
2239    type Output = (B, B);
2240
2241    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2242        let mut this = self.project();
2243        loop {
2244            match ready!(this.stream.as_mut().poll_next(cx)) {
2245                Some(v) => {
2246                    let res = this.res.as_mut().unwrap();
2247                    if (this.predicate)(&v) {
2248                        res.0.extend(Some(v))
2249                    } else {
2250                        res.1.extend(Some(v))
2251                    }
2252                }
2253                None => return Poll::Ready(this.res.take().unwrap()),
2254            }
2255        }
2256    }
2257}
2258
2259pin_project! {
2260    /// Future for the [`StreamExt::fold()`] method.
2261    #[derive(Debug)]
2262    #[must_use = "futures do nothing unless you `.await` or poll them"]
2263    pub struct FoldFuture<S, F, T> {
2264        #[pin]
2265        stream: S,
2266        f: F,
2267        acc: Option<T>,
2268    }
2269}
2270
2271impl<S, F, T> Future for FoldFuture<S, F, T>
2272where
2273    S: Stream,
2274    F: FnMut(T, S::Item) -> T,
2275{
2276    type Output = T;
2277
2278    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2279        let mut this = self.project();
2280        loop {
2281            match ready!(this.stream.as_mut().poll_next(cx)) {
2282                Some(v) => {
2283                    let old = this.acc.take().unwrap();
2284                    let new = (this.f)(old, v);
2285                    *this.acc = Some(new);
2286                }
2287                None => return Poll::Ready(this.acc.take().unwrap()),
2288            }
2289        }
2290    }
2291}
2292
2293/// Future for the [`StreamExt::try_fold()`] method.
2294#[derive(Debug)]
2295#[must_use = "futures do nothing unless you `.await` or poll them"]
2296pub struct TryFoldFuture<'a, S, F, B> {
2297    stream: &'a mut S,
2298    f: F,
2299    acc: Option<B>,
2300}
2301
2302impl<S, F, B> Unpin for TryFoldFuture<'_, S, F, B> {}
2303
2304impl<T, E, S, F, B> Future for TryFoldFuture<'_, S, F, B>
2305where
2306    S: Stream<Item = Result<T, E>> + Unpin,
2307    F: FnMut(B, T) -> Result<B, E>,
2308{
2309    type Output = Result<B, E>;
2310
2311    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2312        loop {
2313            match ready!(self.stream.poll_next(cx)) {
2314                Some(Err(e)) => return Poll::Ready(Err(e)),
2315                Some(Ok(t)) => {
2316                    let old = self.acc.take().unwrap();
2317                    let new = (&mut self.f)(old, t);
2318
2319                    match new {
2320                        Ok(t) => self.acc = Some(t),
2321                        Err(e) => return Poll::Ready(Err(e)),
2322                    }
2323                }
2324                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2325            }
2326        }
2327    }
2328}
2329
2330pin_project! {
2331    /// Stream for the [`StreamExt::scan()`] method.
2332    #[derive(Clone, Debug)]
2333    #[must_use = "streams do nothing unless polled"]
2334    pub struct Scan<S, St, F> {
2335        #[pin]
2336        stream: S,
2337        state_f: (St, F),
2338    }
2339}
2340
2341impl<S, St, F, B> Stream for Scan<S, St, F>
2342where
2343    S: Stream,
2344    F: FnMut(&mut St, S::Item) -> Option<B>,
2345{
2346    type Item = B;
2347
2348    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2349        let mut this = self.project();
2350        this.stream.as_mut().poll_next(cx).map(|item| {
2351            item.and_then(|item| {
2352                let (state, f) = this.state_f;
2353                f(state, item)
2354            })
2355        })
2356    }
2357}
2358
2359pin_project! {
2360    /// Stream for the [`StreamExt::fuse()`] method.
2361    #[derive(Clone, Debug)]
2362    #[must_use = "streams do nothing unless polled"]
2363    pub struct Fuse<S> {
2364        #[pin]
2365        stream: S,
2366        done: bool,
2367    }
2368}
2369
2370impl<S: Stream> Stream for Fuse<S> {
2371    type Item = S::Item;
2372
2373    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2374        let this = self.project();
2375
2376        if *this.done {
2377            Poll::Ready(None)
2378        } else {
2379            let next = ready!(this.stream.poll_next(cx));
2380            if next.is_none() {
2381                *this.done = true;
2382            }
2383            Poll::Ready(next)
2384        }
2385    }
2386}
2387
2388pin_project! {
2389    /// Stream for the [`StreamExt::map()`] method.
2390    #[derive(Clone, Debug)]
2391    #[must_use = "streams do nothing unless polled"]
2392    pub struct Map<S, F> {
2393        #[pin]
2394        stream: S,
2395        f: F,
2396    }
2397}
2398
2399impl<S, F, T> Stream for Map<S, F>
2400where
2401    S: Stream,
2402    F: FnMut(S::Item) -> T,
2403{
2404    type Item = T;
2405
2406    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2407        let this = self.project();
2408        let next = ready!(this.stream.poll_next(cx));
2409        Poll::Ready(next.map(this.f))
2410    }
2411
2412    fn size_hint(&self) -> (usize, Option<usize>) {
2413        self.stream.size_hint()
2414    }
2415}
2416
2417pin_project! {
2418    /// Stream for the [`StreamExt::flat_map()`] method.
2419    #[derive(Clone, Debug)]
2420    #[must_use = "streams do nothing unless polled"]
2421    pub struct FlatMap<S, U, F> {
2422        #[pin]
2423        stream: Map<S, F>,
2424        #[pin]
2425        inner_stream: Option<U>,
2426    }
2427}
2428
2429impl<S, U, F> Stream for FlatMap<S, U, F>
2430where
2431    S: Stream,
2432    U: Stream,
2433    F: FnMut(S::Item) -> U,
2434{
2435    type Item = U::Item;
2436
2437    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2438        let mut this = self.project();
2439        loop {
2440            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2441                match ready!(inner.poll_next(cx)) {
2442                    Some(item) => return Poll::Ready(Some(item)),
2443                    None => this.inner_stream.set(None),
2444                }
2445            }
2446
2447            match ready!(this.stream.as_mut().poll_next(cx)) {
2448                Some(stream) => this.inner_stream.set(Some(stream)),
2449                None => return Poll::Ready(None),
2450            }
2451        }
2452    }
2453}
2454
2455pin_project! {
2456    /// Stream for the [`StreamExt::flatten()`] method.
2457    #[derive(Clone, Debug)]
2458    #[must_use = "streams do nothing unless polled"]
2459    pub struct Flatten<S: Stream> {
2460        #[pin]
2461        stream: S,
2462        #[pin]
2463        inner_stream: Option<S::Item>,
2464    }
2465}
2466
2467impl<S, U> Stream for Flatten<S>
2468where
2469    S: Stream<Item = U>,
2470    U: Stream,
2471{
2472    type Item = U::Item;
2473
2474    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2475        let mut this = self.project();
2476        loop {
2477            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2478                match ready!(inner.poll_next(cx)) {
2479                    Some(item) => return Poll::Ready(Some(item)),
2480                    None => this.inner_stream.set(None),
2481                }
2482            }
2483
2484            match ready!(this.stream.as_mut().poll_next(cx)) {
2485                Some(inner) => this.inner_stream.set(Some(inner)),
2486                None => return Poll::Ready(None),
2487            }
2488        }
2489    }
2490}
2491
2492pin_project! {
2493    /// Stream for the [`StreamExt::then()`] method.
2494    #[derive(Clone, Debug)]
2495    #[must_use = "streams do nothing unless polled"]
2496    pub struct Then<S, F, Fut> {
2497        #[pin]
2498        stream: S,
2499        #[pin]
2500        future: Option<Fut>,
2501        f: F,
2502    }
2503}
2504
2505impl<S, F, Fut> Stream for Then<S, F, Fut>
2506where
2507    S: Stream,
2508    F: FnMut(S::Item) -> Fut,
2509    Fut: Future,
2510{
2511    type Item = Fut::Output;
2512
2513    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2514        let mut this = self.project();
2515
2516        loop {
2517            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2518                let item = ready!(fut.poll(cx));
2519                this.future.set(None);
2520                return Poll::Ready(Some(item));
2521            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2522                this.future.set(Some((this.f)(item)));
2523            } else {
2524                return Poll::Ready(None);
2525            }
2526        }
2527    }
2528
2529    fn size_hint(&self) -> (usize, Option<usize>) {
2530        let future_len = self.future.is_some() as usize;
2531        let (lower, upper) = self.stream.size_hint();
2532        let lower = lower.saturating_add(future_len);
2533        let upper = upper.and_then(|u| u.checked_add(future_len));
2534        (lower, upper)
2535    }
2536}
2537
2538pin_project! {
2539    /// Stream for the [`StreamExt::filter()`] method.
2540    #[derive(Clone, Debug)]
2541    #[must_use = "streams do nothing unless polled"]
2542    pub struct Filter<S, P> {
2543        #[pin]
2544        stream: S,
2545        predicate: P,
2546    }
2547}
2548
2549impl<S, P> Stream for Filter<S, P>
2550where
2551    S: Stream,
2552    P: FnMut(&S::Item) -> bool,
2553{
2554    type Item = S::Item;
2555
2556    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2557        let mut this = self.project();
2558        loop {
2559            match ready!(this.stream.as_mut().poll_next(cx)) {
2560                None => return Poll::Ready(None),
2561                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2562                Some(_) => {}
2563            }
2564        }
2565    }
2566
2567    fn size_hint(&self) -> (usize, Option<usize>) {
2568        let (_, hi) = self.stream.size_hint();
2569
2570        // If the filter matches all of the elements, it will match the stream's upper bound.
2571        // If the filter matches none of the elements, there will be zero returned values.
2572        (0, hi)
2573    }
2574}
2575
2576/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2577///
2578/// # Examples
2579///
2580/// ```
2581/// use futures_lite::stream::{self, once, pending, StreamExt};
2582///
2583/// # spin_on::spin_on(async {
2584/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2585/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2586///
2587/// // The first stream wins.
2588/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2589/// # })
2590/// ```
2591pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2592where
2593    S1: Stream<Item = T>,
2594    S2: Stream<Item = T>,
2595{
2596    Or { stream1, stream2 }
2597}
2598
2599pin_project! {
2600    /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2601    #[derive(Clone, Debug)]
2602    #[must_use = "streams do nothing unless polled"]
2603    pub struct Or<S1, S2> {
2604        #[pin]
2605        stream1: S1,
2606        #[pin]
2607        stream2: S2,
2608    }
2609}
2610
2611impl<T, S1, S2> Stream for Or<S1, S2>
2612where
2613    S1: Stream<Item = T>,
2614    S2: Stream<Item = T>,
2615{
2616    type Item = T;
2617
2618    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2619        let mut this = self.project();
2620
2621        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2622            return Poll::Ready(Some(t));
2623        }
2624        this.stream2.as_mut().poll_next(cx)
2625    }
2626}
2627
2628/// Merges two streams, with no preference for either stream when both are ready.
2629///
2630/// # Examples
2631///
2632/// ```
2633/// use futures_lite::stream::{self, once, pending, StreamExt};
2634///
2635/// # spin_on::spin_on(async {
2636/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2637/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2638///
2639/// // One of the two stream is randomly chosen as the winner.
2640/// let res = stream::race(once(1), once(2)).next().await;
2641/// # })
2642/// ```
2643#[cfg(all(feature = "std", feature = "race"))]
2644pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2645where
2646    S1: Stream<Item = T>,
2647    S2: Stream<Item = T>,
2648{
2649    Race {
2650        stream1,
2651        stream2,
2652        rng: Rng::new(),
2653    }
2654}
2655
2656/// Races two streams, but with a user-provided seed for randomness.
2657///
2658/// # Examples
2659///
2660/// ```
2661/// use futures_lite::stream::{self, once, pending, StreamExt};
2662///
2663/// // A fixed seed is used for reproducibility.
2664/// const SEED: u64 = 123;
2665///
2666/// # spin_on::spin_on(async {
2667/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2668/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2669///
2670/// // One of the two stream is randomly chosen as the winner.
2671/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2672/// # })
2673/// ```
2674#[cfg(feature = "race")]
2675pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2676where
2677    S1: Stream<Item = T>,
2678    S2: Stream<Item = T>,
2679{
2680    Race {
2681        stream1,
2682        stream2,
2683        rng: Rng::with_seed(seed),
2684    }
2685}
2686
2687#[cfg(feature = "race")]
2688pin_project! {
2689    /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2690    #[derive(Clone, Debug)]
2691    #[must_use = "streams do nothing unless polled"]
2692    pub struct Race<S1, S2> {
2693        #[pin]
2694        stream1: S1,
2695        #[pin]
2696        stream2: S2,
2697        rng: Rng,
2698    }
2699}
2700
2701#[cfg(feature = "race")]
2702impl<T, S1, S2> Stream for Race<S1, S2>
2703where
2704    S1: Stream<Item = T>,
2705    S2: Stream<Item = T>,
2706{
2707    type Item = T;
2708
2709    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2710        let mut this = self.project();
2711
2712        if this.rng.bool() {
2713            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2714                return Poll::Ready(Some(t));
2715            }
2716            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2717                return Poll::Ready(Some(t));
2718            }
2719        } else {
2720            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2721                return Poll::Ready(Some(t));
2722            }
2723            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2724                return Poll::Ready(Some(t));
2725            }
2726        }
2727        Poll::Pending
2728    }
2729}
2730
2731pin_project! {
2732    /// Stream for the [`StreamExt::filter_map()`] method.
2733    #[derive(Clone, Debug)]
2734    #[must_use = "streams do nothing unless polled"]
2735    pub struct FilterMap<S, F> {
2736        #[pin]
2737        stream: S,
2738        f: F,
2739    }
2740}
2741
2742impl<S, F, T> Stream for FilterMap<S, F>
2743where
2744    S: Stream,
2745    F: FnMut(S::Item) -> Option<T>,
2746{
2747    type Item = T;
2748
2749    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2750        let mut this = self.project();
2751        loop {
2752            match ready!(this.stream.as_mut().poll_next(cx)) {
2753                None => return Poll::Ready(None),
2754                Some(v) => {
2755                    if let Some(t) = (this.f)(v) {
2756                        return Poll::Ready(Some(t));
2757                    }
2758                }
2759            }
2760        }
2761    }
2762}
2763
2764pin_project! {
2765    /// Stream for the [`StreamExt::take()`] method.
2766    #[derive(Clone, Debug)]
2767    #[must_use = "streams do nothing unless polled"]
2768    pub struct Take<S> {
2769        #[pin]
2770        stream: S,
2771        n: usize,
2772    }
2773}
2774
2775impl<S: Stream> Stream for Take<S> {
2776    type Item = S::Item;
2777
2778    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2779        let this = self.project();
2780
2781        if *this.n == 0 {
2782            Poll::Ready(None)
2783        } else {
2784            let next = ready!(this.stream.poll_next(cx));
2785            match next {
2786                Some(_) => *this.n -= 1,
2787                None => *this.n = 0,
2788            }
2789            Poll::Ready(next)
2790        }
2791    }
2792}
2793
2794pin_project! {
2795    /// Stream for the [`StreamExt::take_while()`] method.
2796    #[derive(Clone, Debug)]
2797    #[must_use = "streams do nothing unless polled"]
2798    pub struct TakeWhile<S, P> {
2799        #[pin]
2800        stream: S,
2801        predicate: P,
2802    }
2803}
2804
2805impl<S, P> Stream for TakeWhile<S, P>
2806where
2807    S: Stream,
2808    P: FnMut(&S::Item) -> bool,
2809{
2810    type Item = S::Item;
2811
2812    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2813        let this = self.project();
2814
2815        match ready!(this.stream.poll_next(cx)) {
2816            Some(v) => {
2817                if (this.predicate)(&v) {
2818                    Poll::Ready(Some(v))
2819                } else {
2820                    Poll::Ready(None)
2821                }
2822            }
2823            None => Poll::Ready(None),
2824        }
2825    }
2826}
2827
2828pin_project! {
2829    /// Stream for the [`StreamExt::map_while()`] method.
2830    #[derive(Clone, Debug)]
2831    #[must_use = "streams do nothing unless polled"]
2832    pub struct MapWhile<S, P> {
2833        #[pin]
2834        stream: S,
2835        predicate: P,
2836    }
2837}
2838
2839impl<B, S, P> Stream for MapWhile<S, P>
2840where
2841    S: Stream,
2842    P: FnMut(S::Item) -> Option<B>,
2843{
2844    type Item = B;
2845
2846    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2847        let this = self.project();
2848
2849        match ready!(this.stream.poll_next(cx)) {
2850            Some(v) => Poll::Ready((this.predicate)(v)),
2851            None => Poll::Ready(None),
2852        }
2853    }
2854}
2855
2856pin_project! {
2857    /// Stream for the [`StreamExt::skip()`] method.
2858    #[derive(Clone, Debug)]
2859    #[must_use = "streams do nothing unless polled"]
2860    pub struct Skip<S> {
2861        #[pin]
2862        stream: S,
2863        n: usize,
2864    }
2865}
2866
2867impl<S: Stream> Stream for Skip<S> {
2868    type Item = S::Item;
2869
2870    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2871        let mut this = self.project();
2872        loop {
2873            match ready!(this.stream.as_mut().poll_next(cx)) {
2874                Some(v) => match *this.n {
2875                    0 => return Poll::Ready(Some(v)),
2876                    _ => *this.n -= 1,
2877                },
2878                None => return Poll::Ready(None),
2879            }
2880        }
2881    }
2882}
2883
2884pin_project! {
2885    /// Stream for the [`StreamExt::skip_while()`] method.
2886    #[derive(Clone, Debug)]
2887    #[must_use = "streams do nothing unless polled"]
2888    pub struct SkipWhile<S, P> {
2889        #[pin]
2890        stream: S,
2891        predicate: Option<P>,
2892    }
2893}
2894
2895impl<S, P> Stream for SkipWhile<S, P>
2896where
2897    S: Stream,
2898    P: FnMut(&S::Item) -> bool,
2899{
2900    type Item = S::Item;
2901
2902    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2903        let mut this = self.project();
2904        loop {
2905            match ready!(this.stream.as_mut().poll_next(cx)) {
2906                Some(v) => match this.predicate {
2907                    Some(p) => {
2908                        if !p(&v) {
2909                            *this.predicate = None;
2910                            return Poll::Ready(Some(v));
2911                        }
2912                    }
2913                    None => return Poll::Ready(Some(v)),
2914                },
2915                None => return Poll::Ready(None),
2916            }
2917        }
2918    }
2919}
2920
2921pin_project! {
2922    /// Stream for the [`StreamExt::step_by()`] method.
2923    #[derive(Clone, Debug)]
2924    #[must_use = "streams do nothing unless polled"]
2925    pub struct StepBy<S> {
2926        #[pin]
2927        stream: S,
2928        step: usize,
2929        i: usize,
2930    }
2931}
2932
2933impl<S: Stream> Stream for StepBy<S> {
2934    type Item = S::Item;
2935
2936    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2937        let mut this = self.project();
2938        loop {
2939            match ready!(this.stream.as_mut().poll_next(cx)) {
2940                Some(v) => {
2941                    if *this.i == 0 {
2942                        *this.i = *this.step - 1;
2943                        return Poll::Ready(Some(v));
2944                    } else {
2945                        *this.i -= 1;
2946                    }
2947                }
2948                None => return Poll::Ready(None),
2949            }
2950        }
2951    }
2952}
2953
2954pin_project! {
2955    /// Stream for the [`StreamExt::chain()`] method.
2956    #[derive(Clone, Debug)]
2957    #[must_use = "streams do nothing unless polled"]
2958    pub struct Chain<S, U> {
2959        #[pin]
2960        first: Fuse<S>,
2961        #[pin]
2962        second: Fuse<U>,
2963    }
2964}
2965
2966impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2967    type Item = S::Item;
2968
2969    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2970        let mut this = self.project();
2971
2972        if !this.first.done {
2973            let next = ready!(this.first.as_mut().poll_next(cx));
2974            if let Some(next) = next {
2975                return Poll::Ready(Some(next));
2976            }
2977        }
2978
2979        if !this.second.done {
2980            let next = ready!(this.second.as_mut().poll_next(cx));
2981            if let Some(next) = next {
2982                return Poll::Ready(Some(next));
2983            }
2984        }
2985
2986        if this.first.done && this.second.done {
2987            Poll::Ready(None)
2988        } else {
2989            Poll::Pending
2990        }
2991    }
2992}
2993
2994pin_project! {
2995    /// Stream for the [`StreamExt::cloned()`] method.
2996    #[derive(Clone, Debug)]
2997    #[must_use = "streams do nothing unless polled"]
2998    pub struct Cloned<S> {
2999        #[pin]
3000        stream: S,
3001    }
3002}
3003
3004impl<'a, S, T: 'a> Stream for Cloned<S>
3005where
3006    S: Stream<Item = &'a T>,
3007    T: Clone,
3008{
3009    type Item = T;
3010
3011    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3012        let this = self.project();
3013        let next = ready!(this.stream.poll_next(cx));
3014        Poll::Ready(next.cloned())
3015    }
3016}
3017
3018pin_project! {
3019    /// Stream for the [`StreamExt::copied()`] method.
3020    #[derive(Clone, Debug)]
3021    #[must_use = "streams do nothing unless polled"]
3022    pub struct Copied<S> {
3023        #[pin]
3024        stream: S,
3025    }
3026}
3027
3028impl<'a, S, T: 'a> Stream for Copied<S>
3029where
3030    S: Stream<Item = &'a T>,
3031    T: Copy,
3032{
3033    type Item = T;
3034
3035    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3036        let this = self.project();
3037        let next = ready!(this.stream.poll_next(cx));
3038        Poll::Ready(next.copied())
3039    }
3040}
3041
3042pin_project! {
3043    /// Stream for the [`StreamExt::cycle()`] method.
3044    #[derive(Clone, Debug)]
3045    #[must_use = "streams do nothing unless polled"]
3046    pub struct Cycle<S> {
3047        orig: S,
3048        #[pin]
3049        stream: S,
3050    }
3051}
3052
3053impl<S> Stream for Cycle<S>
3054where
3055    S: Stream + Clone,
3056{
3057    type Item = S::Item;
3058
3059    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3060        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
3061            Some(item) => Poll::Ready(Some(item)),
3062            None => {
3063                let new = self.as_mut().orig.clone();
3064                self.as_mut().project().stream.set(new);
3065                self.project().stream.poll_next(cx)
3066            }
3067        }
3068    }
3069}
3070
3071pin_project! {
3072    /// Stream for the [`StreamExt::enumerate()`] method.
3073    #[derive(Clone, Debug)]
3074    #[must_use = "streams do nothing unless polled"]
3075    pub struct Enumerate<S> {
3076        #[pin]
3077        stream: S,
3078        i: usize,
3079    }
3080}
3081
3082impl<S> Stream for Enumerate<S>
3083where
3084    S: Stream,
3085{
3086    type Item = (usize, S::Item);
3087
3088    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3089        let this = self.project();
3090
3091        match ready!(this.stream.poll_next(cx)) {
3092            Some(v) => {
3093                let ret = (*this.i, v);
3094                *this.i += 1;
3095                Poll::Ready(Some(ret))
3096            }
3097            None => Poll::Ready(None),
3098        }
3099    }
3100}
3101
3102pin_project! {
3103    /// Stream for the [`StreamExt::inspect()`] method.
3104    #[derive(Clone, Debug)]
3105    #[must_use = "streams do nothing unless polled"]
3106    pub struct Inspect<S, F> {
3107        #[pin]
3108        stream: S,
3109        f: F,
3110    }
3111}
3112
3113impl<S, F> Stream for Inspect<S, F>
3114where
3115    S: Stream,
3116    F: FnMut(&S::Item),
3117{
3118    type Item = S::Item;
3119
3120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3121        let mut this = self.project();
3122        let next = ready!(this.stream.as_mut().poll_next(cx));
3123        if let Some(x) = &next {
3124            (this.f)(x);
3125        }
3126        Poll::Ready(next)
3127    }
3128}
3129
3130/// Future for the [`StreamExt::nth()`] method.
3131#[derive(Debug)]
3132#[must_use = "futures do nothing unless you `.await` or poll them"]
3133pub struct NthFuture<'a, S: ?Sized> {
3134    stream: &'a mut S,
3135    n: usize,
3136}
3137
3138impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3139
3140impl<S> Future for NthFuture<'_, S>
3141where
3142    S: Stream + Unpin + ?Sized,
3143{
3144    type Output = Option<S::Item>;
3145
3146    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3147        loop {
3148            match ready!(self.stream.poll_next(cx)) {
3149                Some(v) => match self.n {
3150                    0 => return Poll::Ready(Some(v)),
3151                    _ => self.n -= 1,
3152                },
3153                None => return Poll::Ready(None),
3154            }
3155        }
3156    }
3157}
3158
3159pin_project! {
3160    /// Future for the [`StreamExt::last()`] method.
3161    #[derive(Debug)]
3162    #[must_use = "futures do nothing unless you `.await` or poll them"]
3163    pub struct LastFuture<S: Stream> {
3164        #[pin]
3165        stream: S,
3166        last: Option<S::Item>,
3167    }
3168}
3169
3170impl<S: Stream> Future for LastFuture<S> {
3171    type Output = Option<S::Item>;
3172
3173    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3174        let mut this = self.project();
3175        loop {
3176            match ready!(this.stream.as_mut().poll_next(cx)) {
3177                Some(new) => *this.last = Some(new),
3178                None => return Poll::Ready(this.last.take()),
3179            }
3180        }
3181    }
3182}
3183
3184/// Future for the [`StreamExt::find()`] method.
3185#[derive(Debug)]
3186#[must_use = "futures do nothing unless you `.await` or poll them"]
3187pub struct FindFuture<'a, S: ?Sized, P> {
3188    stream: &'a mut S,
3189    predicate: P,
3190}
3191
3192impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3193
3194impl<S, P> Future for FindFuture<'_, S, P>
3195where
3196    S: Stream + Unpin + ?Sized,
3197    P: FnMut(&S::Item) -> bool,
3198{
3199    type Output = Option<S::Item>;
3200
3201    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3202        loop {
3203            match ready!(self.stream.poll_next(cx)) {
3204                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3205                Some(_) => {}
3206                None => return Poll::Ready(None),
3207            }
3208        }
3209    }
3210}
3211
3212/// Future for the [`StreamExt::find_map()`] method.
3213#[derive(Debug)]
3214#[must_use = "futures do nothing unless you `.await` or poll them"]
3215pub struct FindMapFuture<'a, S: ?Sized, F> {
3216    stream: &'a mut S,
3217    f: F,
3218}
3219
3220impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3221
3222impl<S, B, F> Future for FindMapFuture<'_, S, F>
3223where
3224    S: Stream + Unpin + ?Sized,
3225    F: FnMut(S::Item) -> Option<B>,
3226{
3227    type Output = Option<B>;
3228
3229    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3230        loop {
3231            match ready!(self.stream.poll_next(cx)) {
3232                Some(v) => {
3233                    if let Some(v) = (&mut self.f)(v) {
3234                        return Poll::Ready(Some(v));
3235                    }
3236                }
3237                None => return Poll::Ready(None),
3238            }
3239        }
3240    }
3241}
3242
3243/// Future for the [`StreamExt::position()`] method.
3244#[derive(Debug)]
3245#[must_use = "futures do nothing unless you `.await` or poll them"]
3246pub struct PositionFuture<'a, S: ?Sized, P> {
3247    stream: &'a mut S,
3248    predicate: P,
3249    index: usize,
3250}
3251
3252impl<S: Unpin + ?Sized, P> Unpin for PositionFuture<'_, S, P> {}
3253
3254impl<S, P> Future for PositionFuture<'_, S, P>
3255where
3256    S: Stream + Unpin + ?Sized,
3257    P: FnMut(S::Item) -> bool,
3258{
3259    type Output = Option<usize>;
3260
3261    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3262        loop {
3263            match ready!(self.stream.poll_next(cx)) {
3264                Some(v) => {
3265                    if (&mut self.predicate)(v) {
3266                        return Poll::Ready(Some(self.index));
3267                    } else {
3268                        self.index += 1;
3269                    }
3270                }
3271                None => return Poll::Ready(None),
3272            }
3273        }
3274    }
3275}
3276
3277/// Future for the [`StreamExt::all()`] method.
3278#[derive(Debug)]
3279#[must_use = "futures do nothing unless you `.await` or poll them"]
3280pub struct AllFuture<'a, S: ?Sized, P> {
3281    stream: &'a mut S,
3282    predicate: P,
3283}
3284
3285impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3286
3287impl<S, P> Future for AllFuture<'_, S, P>
3288where
3289    S: Stream + Unpin + ?Sized,
3290    P: FnMut(S::Item) -> bool,
3291{
3292    type Output = bool;
3293
3294    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3295        loop {
3296            match ready!(self.stream.poll_next(cx)) {
3297                Some(v) => {
3298                    if !(&mut self.predicate)(v) {
3299                        return Poll::Ready(false);
3300                    }
3301                }
3302                None => return Poll::Ready(true),
3303            }
3304        }
3305    }
3306}
3307
3308/// Future for the [`StreamExt::any()`] method.
3309#[derive(Debug)]
3310#[must_use = "futures do nothing unless you `.await` or poll them"]
3311pub struct AnyFuture<'a, S: ?Sized, P> {
3312    stream: &'a mut S,
3313    predicate: P,
3314}
3315
3316impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3317
3318impl<S, P> Future for AnyFuture<'_, S, P>
3319where
3320    S: Stream + Unpin + ?Sized,
3321    P: FnMut(S::Item) -> bool,
3322{
3323    type Output = bool;
3324
3325    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3326        loop {
3327            match ready!(self.stream.poll_next(cx)) {
3328                Some(v) => {
3329                    if (&mut self.predicate)(v) {
3330                        return Poll::Ready(true);
3331                    }
3332                }
3333                None => return Poll::Ready(false),
3334            }
3335        }
3336    }
3337}
3338
3339pin_project! {
3340    /// Future for the [`StreamExt::for_each()`] method.
3341    #[derive(Debug)]
3342    #[must_use = "futures do nothing unless you `.await` or poll them"]
3343    pub struct ForEachFuture<S, F> {
3344        #[pin]
3345        stream: S,
3346        f: F,
3347    }
3348}
3349
3350impl<S, F> Future for ForEachFuture<S, F>
3351where
3352    S: Stream,
3353    F: FnMut(S::Item),
3354{
3355    type Output = ();
3356
3357    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3358        let mut this = self.project();
3359        loop {
3360            match ready!(this.stream.as_mut().poll_next(cx)) {
3361                Some(v) => (this.f)(v),
3362                None => return Poll::Ready(()),
3363            }
3364        }
3365    }
3366}
3367
3368/// Future for the [`StreamExt::try_for_each()`] method.
3369#[derive(Debug)]
3370#[must_use = "futures do nothing unless you `.await` or poll them"]
3371pub struct TryForEachFuture<'a, S: ?Sized, F> {
3372    stream: &'a mut S,
3373    f: F,
3374}
3375
3376impl<S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'_, S, F> {}
3377
3378impl<S, F, E> Future for TryForEachFuture<'_, S, F>
3379where
3380    S: Stream + Unpin + ?Sized,
3381    F: FnMut(S::Item) -> Result<(), E>,
3382{
3383    type Output = Result<(), E>;
3384
3385    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3386        loop {
3387            match ready!(self.stream.poll_next(cx)) {
3388                None => return Poll::Ready(Ok(())),
3389                Some(v) => (&mut self.f)(v)?,
3390            }
3391        }
3392    }
3393}
3394
3395pin_project! {
3396    /// Stream for the [`StreamExt::zip()`] method.
3397    #[derive(Clone, Debug)]
3398    #[must_use = "streams do nothing unless polled"]
3399    pub struct Zip<A: Stream, B> {
3400        item_slot: Option<A::Item>,
3401        #[pin]
3402        first: A,
3403        #[pin]
3404        second: B,
3405    }
3406}
3407
3408impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3409    type Item = (A::Item, B::Item);
3410
3411    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3412        let this = self.project();
3413
3414        if this.item_slot.is_none() {
3415            match this.first.poll_next(cx) {
3416                Poll::Pending => return Poll::Pending,
3417                Poll::Ready(None) => return Poll::Ready(None),
3418                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3419            }
3420        }
3421
3422        let second_item = ready!(this.second.poll_next(cx));
3423        let first_item = this.item_slot.take().unwrap();
3424        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3425    }
3426}
3427
3428pin_project! {
3429    /// Future for the [`StreamExt::unzip()`] method.
3430    #[derive(Debug)]
3431    #[must_use = "futures do nothing unless you `.await` or poll them"]
3432    pub struct UnzipFuture<S, FromA, FromB> {
3433        #[pin]
3434        stream: S,
3435        res: Option<(FromA, FromB)>,
3436    }
3437}
3438
3439impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3440where
3441    S: Stream<Item = (A, B)>,
3442    FromA: Default + Extend<A>,
3443    FromB: Default + Extend<B>,
3444{
3445    type Output = (FromA, FromB);
3446
3447    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3448        let mut this = self.project();
3449
3450        loop {
3451            match ready!(this.stream.as_mut().poll_next(cx)) {
3452                Some((a, b)) => {
3453                    let res = this.res.as_mut().unwrap();
3454                    res.0.extend(Some(a));
3455                    res.1.extend(Some(b));
3456                }
3457                None => return Poll::Ready(this.res.take().unwrap()),
3458            }
3459        }
3460    }
3461}
3462
3463/// Stream for the [`StreamExt::drain()`] method.
3464#[derive(Debug)]
3465#[must_use = "streams do nothing unless polled"]
3466pub struct Drain<'a, S: ?Sized> {
3467    stream: &'a mut S,
3468}
3469
3470impl<S: Unpin + ?Sized> Unpin for Drain<'_, S> {}
3471
3472impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3473    /// Get a reference to the underlying stream.
3474    ///
3475    /// ## Examples
3476    ///
3477    /// ```
3478    /// use futures_lite::{prelude::*, stream};
3479    ///
3480    /// # spin_on::spin_on(async {
3481    /// let mut s = stream::iter(vec![1, 2, 3]);
3482    /// let s2 = s.drain();
3483    ///
3484    /// let inner = s2.get_ref();
3485    /// // s and inner are the same.
3486    /// # });
3487    /// ```
3488    pub fn get_ref(&self) -> &S {
3489        &self.stream
3490    }
3491
3492    /// Get a mutable reference to the underlying stream.
3493    ///
3494    /// ## Examples
3495    ///
3496    /// ```
3497    /// use futures_lite::{prelude::*, stream};
3498    ///
3499    /// # spin_on::spin_on(async {
3500    /// let mut s = stream::iter(vec![1, 2, 3]);
3501    /// let mut s2 = s.drain();
3502    ///
3503    /// let inner = s2.get_mut();
3504    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3505    /// # });
3506    /// ```
3507    pub fn get_mut(&mut self) -> &mut S {
3508        &mut self.stream
3509    }
3510
3511    /// Consume this stream and get the underlying stream.
3512    ///
3513    /// ## Examples
3514    ///
3515    /// ```
3516    /// use futures_lite::{prelude::*, stream};
3517    ///
3518    /// # spin_on::spin_on(async {
3519    /// let mut s = stream::iter(vec![1, 2, 3]);
3520    /// let mut s2 = s.drain();
3521    ///
3522    /// let inner = s2.into_inner();
3523    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3524    /// # });
3525    /// ```
3526    pub fn into_inner(self) -> &'a mut S {
3527        self.stream
3528    }
3529}
3530
3531impl<S: Stream + Unpin + ?Sized> Stream for Drain<'_, S> {
3532    type Item = S::Item;
3533
3534    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3535        match self.stream.poll_next(cx) {
3536            Poll::Ready(x) => Poll::Ready(x),
3537            Poll::Pending => Poll::Ready(None),
3538        }
3539    }
3540
3541    fn size_hint(&self) -> (usize, Option<usize>) {
3542        let (_, hi) = self.stream.size_hint();
3543        (0, hi)
3544    }
3545}