futures_lite/stream.rs
1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(all(not(feature = "std"), feature = "alloc"))]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34#[cfg(feature = "race")]
35use fastrand::Rng;
36
37use pin_project_lite::pin_project;
38
39use crate::ready;
40
41/// Converts a stream into a blocking iterator.
42///
43/// # Examples
44///
45/// ```
46/// use futures_lite::{pin, stream};
47///
48/// let stream = stream::once(7);
49/// pin!(stream);
50///
51/// let mut iter = stream::block_on(stream);
52/// assert_eq!(iter.next(), Some(7));
53/// assert_eq!(iter.next(), None);
54/// ```
55#[cfg(feature = "std")]
56pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
57 BlockOn(stream)
58}
59
60/// Iterator for the [`block_on()`] function.
61#[derive(Debug)]
62pub struct BlockOn<S>(S);
63
64#[cfg(feature = "std")]
65impl<S: Stream + Unpin> Iterator for BlockOn<S> {
66 type Item = S::Item;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 crate::future::block_on(self.0.next())
70 }
71
72 fn size_hint(&self) -> (usize, Option<usize>) {
73 self.0.size_hint()
74 }
75
76 fn count(self) -> usize {
77 crate::future::block_on(self.0.count())
78 }
79
80 fn last(self) -> Option<Self::Item> {
81 crate::future::block_on(self.0.last())
82 }
83
84 fn nth(&mut self, n: usize) -> Option<Self::Item> {
85 crate::future::block_on(self.0.nth(n))
86 }
87
88 fn fold<B, F>(self, init: B, f: F) -> B
89 where
90 F: FnMut(B, Self::Item) -> B,
91 {
92 crate::future::block_on(self.0.fold(init, f))
93 }
94
95 fn for_each<F>(self, f: F) -> F::Output
96 where
97 F: FnMut(Self::Item),
98 {
99 crate::future::block_on(self.0.for_each(f))
100 }
101
102 fn all<F>(&mut self, f: F) -> bool
103 where
104 F: FnMut(Self::Item) -> bool,
105 {
106 crate::future::block_on(self.0.all(f))
107 }
108
109 fn any<F>(&mut self, f: F) -> bool
110 where
111 F: FnMut(Self::Item) -> bool,
112 {
113 crate::future::block_on(self.0.any(f))
114 }
115
116 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
117 where
118 P: FnMut(&Self::Item) -> bool,
119 {
120 crate::future::block_on(self.0.find(predicate))
121 }
122
123 fn find_map<B, F>(&mut self, f: F) -> Option<B>
124 where
125 F: FnMut(Self::Item) -> Option<B>,
126 {
127 crate::future::block_on(self.0.find_map(f))
128 }
129
130 fn position<P>(&mut self, predicate: P) -> Option<usize>
131 where
132 P: FnMut(Self::Item) -> bool,
133 {
134 crate::future::block_on(self.0.position(predicate))
135 }
136}
137
138/// Creates an empty stream.
139///
140/// # Examples
141///
142/// ```
143/// use futures_lite::stream::{self, StreamExt};
144///
145/// # spin_on::spin_on(async {
146/// let mut s = stream::empty::<i32>();
147/// assert_eq!(s.next().await, None);
148/// # })
149/// ```
150pub fn empty<T>() -> Empty<T> {
151 Empty {
152 _marker: PhantomData,
153 }
154}
155
156/// Stream for the [`empty()`] function.
157#[derive(Clone, Debug)]
158#[must_use = "streams do nothing unless polled"]
159pub struct Empty<T> {
160 _marker: PhantomData<T>,
161}
162
163impl<T> Unpin for Empty<T> {}
164
165impl<T> Stream for Empty<T> {
166 type Item = T;
167
168 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 Poll::Ready(None)
170 }
171
172 fn size_hint(&self) -> (usize, Option<usize>) {
173 (0, Some(0))
174 }
175}
176
177/// Creates a stream from an iterator.
178///
179/// # Examples
180///
181/// ```
182/// use futures_lite::stream::{self, StreamExt};
183///
184/// # spin_on::spin_on(async {
185/// let mut s = stream::iter(vec![1, 2]);
186///
187/// assert_eq!(s.next().await, Some(1));
188/// assert_eq!(s.next().await, Some(2));
189/// assert_eq!(s.next().await, None);
190/// # })
191/// ```
192pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
193 Iter {
194 iter: iter.into_iter(),
195 }
196}
197
198/// Stream for the [`iter()`] function.
199#[derive(Clone, Debug)]
200#[must_use = "streams do nothing unless polled"]
201pub struct Iter<I> {
202 iter: I,
203}
204
205impl<I> Unpin for Iter<I> {}
206
207impl<I: Iterator> Stream for Iter<I> {
208 type Item = I::Item;
209
210 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 Poll::Ready(self.iter.next())
212 }
213
214 fn size_hint(&self) -> (usize, Option<usize>) {
215 self.iter.size_hint()
216 }
217}
218
219/// Creates a stream that yields a single item.
220///
221/// # Examples
222///
223/// ```
224/// use futures_lite::stream::{self, StreamExt};
225///
226/// # spin_on::spin_on(async {
227/// let mut s = stream::once(7);
228///
229/// assert_eq!(s.next().await, Some(7));
230/// assert_eq!(s.next().await, None);
231/// # })
232/// ```
233pub fn once<T>(t: T) -> Once<T> {
234 Once { value: Some(t) }
235}
236
237pin_project! {
238 /// Stream for the [`once()`] function.
239 #[derive(Clone, Debug)]
240 #[must_use = "streams do nothing unless polled"]
241 pub struct Once<T> {
242 value: Option<T>,
243 }
244}
245
246impl<T> Stream for Once<T> {
247 type Item = T;
248
249 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
250 Poll::Ready(self.project().value.take())
251 }
252
253 fn size_hint(&self) -> (usize, Option<usize>) {
254 if self.value.is_some() {
255 (1, Some(1))
256 } else {
257 (0, Some(0))
258 }
259 }
260}
261
262/// Creates a stream that is always pending.
263///
264/// # Examples
265///
266/// ```no_run
267/// use futures_lite::stream::{self, StreamExt};
268///
269/// # spin_on::spin_on(async {
270/// let mut s = stream::pending::<i32>();
271/// s.next().await;
272/// unreachable!();
273/// # })
274/// ```
275pub fn pending<T>() -> Pending<T> {
276 Pending {
277 _marker: PhantomData,
278 }
279}
280
281/// Stream for the [`pending()`] function.
282#[derive(Clone, Debug)]
283#[must_use = "streams do nothing unless polled"]
284pub struct Pending<T> {
285 _marker: PhantomData<T>,
286}
287
288impl<T> Unpin for Pending<T> {}
289
290impl<T> Stream for Pending<T> {
291 type Item = T;
292
293 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
294 Poll::Pending
295 }
296
297 fn size_hint(&self) -> (usize, Option<usize>) {
298 (0, Some(0))
299 }
300}
301
302/// Creates a stream from a function returning [`Poll`].
303///
304/// # Examples
305///
306/// ```
307/// use futures_lite::stream::{self, StreamExt};
308/// use std::task::{Context, Poll};
309///
310/// # spin_on::spin_on(async {
311/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
312/// Poll::Ready(Some(7))
313/// }
314///
315/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
316/// # })
317/// ```
318pub fn poll_fn<T, F>(f: F) -> PollFn<F>
319where
320 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
321{
322 PollFn { f }
323}
324
325/// Stream for the [`poll_fn()`] function.
326#[derive(Clone)]
327#[must_use = "streams do nothing unless polled"]
328pub struct PollFn<F> {
329 f: F,
330}
331
332impl<F> Unpin for PollFn<F> {}
333
334impl<F> fmt::Debug for PollFn<F> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.debug_struct("PollFn").finish()
337 }
338}
339
340impl<T, F> Stream for PollFn<F>
341where
342 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
343{
344 type Item = T;
345
346 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
347 (&mut self.f)(cx)
348 }
349}
350
351/// Creates an infinite stream that yields the same item repeatedly.
352///
353/// # Examples
354///
355/// ```
356/// use futures_lite::stream::{self, StreamExt};
357///
358/// # spin_on::spin_on(async {
359/// let mut s = stream::repeat(7);
360///
361/// assert_eq!(s.next().await, Some(7));
362/// assert_eq!(s.next().await, Some(7));
363/// # })
364/// ```
365pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
366 Repeat { item }
367}
368
369/// Stream for the [`repeat()`] function.
370#[derive(Clone, Debug)]
371#[must_use = "streams do nothing unless polled"]
372pub struct Repeat<T> {
373 item: T,
374}
375
376impl<T> Unpin for Repeat<T> {}
377
378impl<T: Clone> Stream for Repeat<T> {
379 type Item = T;
380
381 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
382 Poll::Ready(Some(self.item.clone()))
383 }
384
385 fn size_hint(&self) -> (usize, Option<usize>) {
386 (usize::MAX, None)
387 }
388}
389
390/// Creates an infinite stream from a closure that generates items.
391///
392/// # Examples
393///
394/// ```
395/// use futures_lite::stream::{self, StreamExt};
396///
397/// # spin_on::spin_on(async {
398/// let mut s = stream::repeat_with(|| 7);
399///
400/// assert_eq!(s.next().await, Some(7));
401/// assert_eq!(s.next().await, Some(7));
402/// # })
403/// ```
404pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
405where
406 F: FnMut() -> T,
407{
408 RepeatWith { f: repeater }
409}
410
411/// Stream for the [`repeat_with()`] function.
412#[derive(Clone, Debug)]
413#[must_use = "streams do nothing unless polled"]
414pub struct RepeatWith<F> {
415 f: F,
416}
417
418impl<F> Unpin for RepeatWith<F> {}
419
420impl<T, F> Stream for RepeatWith<F>
421where
422 F: FnMut() -> T,
423{
424 type Item = T;
425
426 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427 let item = (&mut self.f)();
428 Poll::Ready(Some(item))
429 }
430
431 fn size_hint(&self) -> (usize, Option<usize>) {
432 (usize::MAX, None)
433 }
434}
435
436/// Creates a stream from a seed value and an async closure operating on it.
437///
438/// # Examples
439///
440/// ```
441/// use futures_lite::stream::{self, StreamExt};
442///
443/// # spin_on::spin_on(async {
444/// let s = stream::unfold(0, |mut n| async move {
445/// if n < 2 {
446/// let m = n + 1;
447/// Some((n, m))
448/// } else {
449/// None
450/// }
451/// });
452///
453/// let v: Vec<i32> = s.collect().await;
454/// assert_eq!(v, [0, 1]);
455/// # })
456/// ```
457pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
458where
459 F: FnMut(T) -> Fut,
460 Fut: Future<Output = Option<(Item, T)>>,
461{
462 Unfold {
463 f,
464 state: Some(seed),
465 fut: None,
466 }
467}
468
469pin_project! {
470 /// Stream for the [`unfold()`] function.
471 #[derive(Clone)]
472 #[must_use = "streams do nothing unless polled"]
473 pub struct Unfold<T, F, Fut> {
474 f: F,
475 state: Option<T>,
476 #[pin]
477 fut: Option<Fut>,
478 }
479}
480
481impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
482where
483 T: fmt::Debug,
484 Fut: fmt::Debug,
485{
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 f.debug_struct("Unfold")
488 .field("state", &self.state)
489 .field("fut", &self.fut)
490 .finish()
491 }
492}
493
494impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
495where
496 F: FnMut(T) -> Fut,
497 Fut: Future<Output = Option<(Item, T)>>,
498{
499 type Item = Item;
500
501 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502 let mut this = self.project();
503
504 if let Some(state) = this.state.take() {
505 this.fut.set(Some((this.f)(state)));
506 }
507
508 let step = ready!(this
509 .fut
510 .as_mut()
511 .as_pin_mut()
512 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
513 .poll(cx));
514 this.fut.set(None);
515
516 if let Some((item, next_state)) = step {
517 *this.state = Some(next_state);
518 Poll::Ready(Some(item))
519 } else {
520 Poll::Ready(None)
521 }
522 }
523}
524
525/// Creates a stream from a seed value and a fallible async closure operating on it.
526///
527/// # Examples
528///
529/// ```
530/// use futures_lite::stream::{self, StreamExt};
531///
532/// # spin_on::spin_on(async {
533/// let s = stream::try_unfold(0, |mut n| async move {
534/// if n < 2 {
535/// let m = n + 1;
536/// Ok(Some((n, m)))
537/// } else {
538/// std::io::Result::Ok(None)
539/// }
540/// });
541///
542/// let v: Vec<i32> = s.try_collect().await?;
543/// assert_eq!(v, [0, 1]);
544/// # std::io::Result::Ok(()) });
545/// ```
546pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
547where
548 F: FnMut(T) -> Fut,
549 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
550{
551 TryUnfold {
552 f,
553 state: Some(init),
554 fut: None,
555 }
556}
557
558pin_project! {
559 /// Stream for the [`try_unfold()`] function.
560 #[derive(Clone)]
561 #[must_use = "streams do nothing unless polled"]
562 pub struct TryUnfold<T, F, Fut> {
563 f: F,
564 state: Option<T>,
565 #[pin]
566 fut: Option<Fut>,
567 }
568}
569
570impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
571where
572 T: fmt::Debug,
573 Fut: fmt::Debug,
574{
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 f.debug_struct("TryUnfold")
577 .field("state", &self.state)
578 .field("fut", &self.fut)
579 .finish()
580 }
581}
582
583impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
584where
585 F: FnMut(T) -> Fut,
586 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
587{
588 type Item = Result<Item, E>;
589
590 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591 let mut this = self.project();
592
593 if let Some(state) = this.state.take() {
594 this.fut.set(Some((this.f)(state)));
595 }
596
597 match this.fut.as_mut().as_pin_mut() {
598 None => {
599 // The future previously errored
600 Poll::Ready(None)
601 }
602 Some(future) => {
603 let step = ready!(future.poll(cx));
604 this.fut.set(None);
605
606 match step {
607 Ok(Some((item, next_state))) => {
608 *this.state = Some(next_state);
609 Poll::Ready(Some(Ok(item)))
610 }
611 Ok(None) => Poll::Ready(None),
612 Err(e) => Poll::Ready(Some(Err(e))),
613 }
614 }
615 }
616 }
617}
618
619/// Creates a stream that invokes the given future as its first item, and then
620/// produces no more items.
621///
622/// # Example
623///
624/// ```
625/// use futures_lite::{stream, prelude::*};
626///
627/// # spin_on::spin_on(async {
628/// let mut stream = Box::pin(stream::once_future(async { 1 }));
629/// assert_eq!(stream.next().await, Some(1));
630/// assert_eq!(stream.next().await, None);
631/// # });
632/// ```
633pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
634 OnceFuture {
635 future: Some(future),
636 }
637}
638
639pin_project! {
640 /// Stream for the [`once_future()`] method.
641 #[derive(Debug)]
642 #[must_use = "futures do nothing unless you `.await` or poll them"]
643 pub struct OnceFuture<F> {
644 #[pin]
645 future: Option<F>,
646 }
647}
648
649impl<F: Future> Stream for OnceFuture<F> {
650 type Item = F::Output;
651
652 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
653 let mut this = self.project();
654
655 match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
656 Some(Poll::Ready(t)) => {
657 this.future.set(None);
658 Poll::Ready(Some(t))
659 }
660 Some(Poll::Pending) => Poll::Pending,
661 None => Poll::Ready(None),
662 }
663 }
664}
665
666/// Take elements from this stream until the provided future resolves.
667///
668/// This function will take elements from the stream until the provided
669/// stopping future `fut` resolves. Once the `fut` future becomes ready,
670/// this stream combinator will always return that the stream is done.
671///
672/// The stopping future may return any type. Once the stream is stopped
673/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
674/// The stream may also be resumed with `StopAfterFuture::take_future()`.
675/// See the documentation of [`StopAfterFuture`] for more information.
676///
677/// ```
678/// use futures_lite::stream::{self, StreamExt, stop_after_future};
679/// use futures_lite::future;
680/// use std::task::Poll;
681///
682/// let stream = stream::iter(1..=10);
683///
684/// # spin_on::spin_on(async {
685/// let mut i = 0;
686/// let stop_fut = future::poll_fn(|_cx| {
687/// i += 1;
688/// if i <= 5 {
689/// Poll::Pending
690/// } else {
691/// Poll::Ready(())
692/// }
693/// });
694///
695/// let stream = stop_after_future(stream, stop_fut);
696///
697/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
698/// # });
699pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
700where
701 S: Sized + Stream,
702 F: Future,
703{
704 StopAfterFuture {
705 stream,
706 fut: Some(future),
707 fut_result: None,
708 free: false,
709 }
710}
711
712pin_project! {
713 /// Stream for the [`StreamExt::stop_after_future()`] method.
714 #[derive(Clone, Debug)]
715 #[must_use = "streams do nothing unless polled"]
716 pub struct StopAfterFuture<S: Stream, Fut: Future> {
717 #[pin]
718 stream: S,
719 // Contains the inner Future on start and None once the inner Future is resolved
720 // or taken out by the user.
721 #[pin]
722 fut: Option<Fut>,
723 // Contains fut's return value once fut is resolved
724 fut_result: Option<Fut::Output>,
725 // Whether the future was taken out by the user.
726 free: bool,
727 }
728}
729
730impl<St, Fut> StopAfterFuture<St, Fut>
731where
732 St: Stream,
733 Fut: Future,
734{
735 /// Extract the stopping future out of the combinator.
736 ///
737 /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
738 /// Taking out the future means the combinator will be yielding
739 /// elements from the wrapped stream without ever stopping it.
740 pub fn take_future(&mut self) -> Option<Fut> {
741 if self.fut.is_some() {
742 self.free = true;
743 }
744
745 self.fut.take()
746 }
747
748 /// Once the stopping future is resolved, this method can be used
749 /// to extract the value returned by the stopping future.
750 ///
751 /// This may be used to retrieve arbitrary data from the stopping
752 /// future, for example a reason why the stream was stopped.
753 ///
754 /// This method will return `None` if the future isn't resolved yet,
755 /// or if the result was already taken out.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// # spin_on::spin_on(async {
761 /// use futures_lite::stream::{self, StreamExt, stop_after_future};
762 /// use futures_lite::future;
763 /// use std::task::Poll;
764 ///
765 /// let stream = stream::iter(1..=10);
766 ///
767 /// let mut i = 0;
768 /// let stop_fut = future::poll_fn(|_cx| {
769 /// i += 1;
770 /// if i <= 5 {
771 /// Poll::Pending
772 /// } else {
773 /// Poll::Ready("reason")
774 /// }
775 /// });
776 ///
777 /// let mut stream = stop_after_future(stream, stop_fut);
778 /// let _ = (&mut stream).collect::<Vec<_>>().await;
779 ///
780 /// let result = stream.take_result().unwrap();
781 /// assert_eq!(result, "reason");
782 /// # });
783 /// ```
784 pub fn take_result(&mut self) -> Option<Fut::Output> {
785 self.fut_result.take()
786 }
787
788 /// Whether the stream was stopped yet by the stopping future
789 /// being resolved.
790 pub fn is_stopped(&self) -> bool {
791 !self.free && self.fut.is_none()
792 }
793}
794
795impl<St, Fut> Stream for StopAfterFuture<St, Fut>
796where
797 St: Stream,
798 Fut: Future,
799{
800 type Item = St::Item;
801
802 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
803 let mut this = self.project();
804
805 if let Some(f) = this.fut.as_mut().as_pin_mut() {
806 if let Poll::Ready(result) = f.poll(cx) {
807 this.fut.set(None);
808 *this.fut_result = Some(result);
809 }
810 }
811
812 if !*this.free && this.fut.is_none() {
813 // Future resolved, inner stream stopped
814 Poll::Ready(None)
815 } else {
816 // Future either not resolved yet or taken out by the user
817 let item = ready!(this.stream.poll_next(cx));
818 if item.is_none() {
819 this.fut.set(None);
820 }
821 Poll::Ready(item)
822 }
823 }
824
825 fn size_hint(&self) -> (usize, Option<usize>) {
826 if self.is_stopped() {
827 return (0, Some(0));
828 }
829
830 // Original stream can be truncated at any moment, so the lower bound isn't reliable.
831 let (_, upper_bound) = self.stream.size_hint();
832 (0, upper_bound)
833 }
834}
835
836/// Extension trait for [`Stream`].
837pub trait StreamExt: Stream {
838 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
839 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
840 where
841 Self: Unpin,
842 {
843 Stream::poll_next(Pin::new(self), cx)
844 }
845
846 /// Retrieves the next item in the stream.
847 ///
848 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
849 /// resume iteration after that.
850 ///
851 /// # Examples
852 ///
853 /// ```
854 /// use futures_lite::stream::{self, StreamExt};
855 ///
856 /// # spin_on::spin_on(async {
857 /// let mut s = stream::iter(1..=3);
858 ///
859 /// assert_eq!(s.next().await, Some(1));
860 /// assert_eq!(s.next().await, Some(2));
861 /// assert_eq!(s.next().await, Some(3));
862 /// assert_eq!(s.next().await, None);
863 /// # });
864 /// ```
865 fn next(&mut self) -> NextFuture<'_, Self>
866 where
867 Self: Unpin,
868 {
869 NextFuture { stream: self }
870 }
871
872 /// Retrieves the next item in the stream.
873 ///
874 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
875 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
876 ///
877 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use futures_lite::stream::{self, StreamExt};
883 ///
884 /// # spin_on::spin_on(async {
885 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
886 ///
887 /// assert_eq!(s.try_next().await, Ok(Some(1)));
888 /// assert_eq!(s.try_next().await, Ok(Some(2)));
889 /// assert_eq!(s.try_next().await, Err("error"));
890 /// assert_eq!(s.try_next().await, Ok(None));
891 /// # });
892 /// ```
893 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
894 where
895 Self: Stream<Item = Result<T, E>> + Unpin,
896 {
897 TryNextFuture { stream: self }
898 }
899
900 /// Counts the number of items in the stream.
901 ///
902 /// # Examples
903 ///
904 /// ```
905 /// use futures_lite::stream::{self, StreamExt};
906 ///
907 /// # spin_on::spin_on(async {
908 /// let s1 = stream::iter(vec![0]);
909 /// let s2 = stream::iter(vec![1, 2, 3]);
910 ///
911 /// assert_eq!(s1.count().await, 1);
912 /// assert_eq!(s2.count().await, 3);
913 /// # });
914 /// ```
915 fn count(self) -> CountFuture<Self>
916 where
917 Self: Sized,
918 {
919 CountFuture {
920 stream: self,
921 count: 0,
922 }
923 }
924
925 /// Maps items of the stream to new values using a closure.
926 ///
927 /// # Examples
928 ///
929 /// ```
930 /// use futures_lite::stream::{self, StreamExt};
931 ///
932 /// # spin_on::spin_on(async {
933 /// let s = stream::iter(vec![1, 2, 3]);
934 /// let mut s = s.map(|x| 2 * x);
935 ///
936 /// assert_eq!(s.next().await, Some(2));
937 /// assert_eq!(s.next().await, Some(4));
938 /// assert_eq!(s.next().await, Some(6));
939 /// assert_eq!(s.next().await, None);
940 /// # });
941 /// ```
942 fn map<T, F>(self, f: F) -> Map<Self, F>
943 where
944 Self: Sized,
945 F: FnMut(Self::Item) -> T,
946 {
947 Map { stream: self, f }
948 }
949
950 /// Maps items to streams and then concatenates them.
951 ///
952 /// # Examples
953 ///
954 /// ```
955 /// use futures_lite::stream::{self, StreamExt};
956 ///
957 /// # spin_on::spin_on(async {
958 /// let words = stream::iter(vec!["one", "two"]);
959 ///
960 /// let s: String = words
961 /// .flat_map(|s| stream::iter(s.chars()))
962 /// .collect()
963 /// .await;
964 ///
965 /// assert_eq!(s, "onetwo");
966 /// # });
967 /// ```
968 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
969 where
970 Self: Sized,
971 U: Stream,
972 F: FnMut(Self::Item) -> U,
973 {
974 FlatMap {
975 stream: self.map(f),
976 inner_stream: None,
977 }
978 }
979
980 /// Concatenates inner streams.
981 ///
982 /// # Examples
983 ///
984 /// ```
985 /// use futures_lite::stream::{self, StreamExt};
986 ///
987 /// # spin_on::spin_on(async {
988 /// let s1 = stream::iter(vec![1, 2, 3]);
989 /// let s2 = stream::iter(vec![4, 5]);
990 ///
991 /// let s = stream::iter(vec![s1, s2]);
992 /// let v: Vec<_> = s.flatten().collect().await;
993 /// assert_eq!(v, [1, 2, 3, 4, 5]);
994 /// # });
995 /// ```
996 fn flatten(self) -> Flatten<Self>
997 where
998 Self: Sized,
999 Self::Item: Stream,
1000 {
1001 Flatten {
1002 stream: self,
1003 inner_stream: None,
1004 }
1005 }
1006
1007 /// Maps items of the stream to new values using an async closure.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ```
1012 /// use futures_lite::pin;
1013 /// use futures_lite::stream::{self, StreamExt};
1014 ///
1015 /// # spin_on::spin_on(async {
1016 /// let s = stream::iter(vec![1, 2, 3]);
1017 /// let mut s = s.then(|x| async move { 2 * x });
1018 ///
1019 /// pin!(s);
1020 /// assert_eq!(s.next().await, Some(2));
1021 /// assert_eq!(s.next().await, Some(4));
1022 /// assert_eq!(s.next().await, Some(6));
1023 /// assert_eq!(s.next().await, None);
1024 /// # });
1025 /// ```
1026 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1027 where
1028 Self: Sized,
1029 F: FnMut(Self::Item) -> Fut,
1030 Fut: Future,
1031 {
1032 Then {
1033 stream: self,
1034 future: None,
1035 f,
1036 }
1037 }
1038
1039 /// Keeps items of the stream for which `predicate` returns `true`.
1040 ///
1041 /// # Examples
1042 ///
1043 /// ```
1044 /// use futures_lite::stream::{self, StreamExt};
1045 ///
1046 /// # spin_on::spin_on(async {
1047 /// let s = stream::iter(vec![1, 2, 3, 4]);
1048 /// let mut s = s.filter(|i| i % 2 == 0);
1049 ///
1050 /// assert_eq!(s.next().await, Some(2));
1051 /// assert_eq!(s.next().await, Some(4));
1052 /// assert_eq!(s.next().await, None);
1053 /// # });
1054 /// ```
1055 fn filter<P>(self, predicate: P) -> Filter<Self, P>
1056 where
1057 Self: Sized,
1058 P: FnMut(&Self::Item) -> bool,
1059 {
1060 Filter {
1061 stream: self,
1062 predicate,
1063 }
1064 }
1065
1066 /// Filters and maps items of the stream using a closure.
1067 ///
1068 /// # Examples
1069 ///
1070 /// ```
1071 /// use futures_lite::stream::{self, StreamExt};
1072 ///
1073 /// # spin_on::spin_on(async {
1074 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1075 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1076 ///
1077 /// assert_eq!(s.next().await, Some(1));
1078 /// assert_eq!(s.next().await, Some(3));
1079 /// assert_eq!(s.next().await, Some(5));
1080 /// assert_eq!(s.next().await, None);
1081 /// # });
1082 /// ```
1083 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1084 where
1085 Self: Sized,
1086 F: FnMut(Self::Item) -> Option<T>,
1087 {
1088 FilterMap { stream: self, f }
1089 }
1090
1091 /// Takes only the first `n` items of the stream.
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use futures_lite::stream::{self, StreamExt};
1097 ///
1098 /// # spin_on::spin_on(async {
1099 /// let mut s = stream::repeat(7).take(2);
1100 ///
1101 /// assert_eq!(s.next().await, Some(7));
1102 /// assert_eq!(s.next().await, Some(7));
1103 /// assert_eq!(s.next().await, None);
1104 /// # });
1105 /// ```
1106 fn take(self, n: usize) -> Take<Self>
1107 where
1108 Self: Sized,
1109 {
1110 Take { stream: self, n }
1111 }
1112
1113 /// Takes items while `predicate` returns `true`.
1114 ///
1115 /// # Examples
1116 ///
1117 /// ```
1118 /// use futures_lite::stream::{self, StreamExt};
1119 ///
1120 /// # spin_on::spin_on(async {
1121 /// let s = stream::iter(vec![1, 2, 3, 4]);
1122 /// let mut s = s.take_while(|x| *x < 3);
1123 ///
1124 /// assert_eq!(s.next().await, Some(1));
1125 /// assert_eq!(s.next().await, Some(2));
1126 /// assert_eq!(s.next().await, None);
1127 /// # });
1128 /// ```
1129 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1130 where
1131 Self: Sized,
1132 P: FnMut(&Self::Item) -> bool,
1133 {
1134 TakeWhile {
1135 stream: self,
1136 predicate,
1137 }
1138 }
1139
1140 /// Maps items while `predicate` returns [`Some`].
1141 ///
1142 /// This stream is not fused. After the predicate returns [`None`] the stream still
1143 /// contains remaining items that can be obtained by subsequent `next` calls.
1144 /// You can [`fuse`](StreamExt::fuse) the stream if this behavior is undesirable.
1145 ///
1146 /// # Examples
1147 ///
1148 /// ```
1149 /// use futures_lite::stream::{self, StreamExt};
1150 ///
1151 /// # spin_on::spin_on(async {
1152 /// let s = stream::iter(vec![1, 2, 0, 3]);
1153 /// let mut s = s.map_while(|x: u32| x.checked_sub(1));
1154 ///
1155 /// assert_eq!(s.next().await, Some(0));
1156 /// assert_eq!(s.next().await, Some(1));
1157 /// assert_eq!(s.next().await, None);
1158 ///
1159 /// // Continue to iterate the stream.
1160 /// assert_eq!(s.next().await, Some(2));
1161 /// assert_eq!(s.next().await, None);
1162 /// # });
1163 /// ```
1164 fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1165 where
1166 Self: Sized,
1167 P: FnMut(Self::Item) -> Option<B>,
1168 {
1169 MapWhile {
1170 stream: self,
1171 predicate,
1172 }
1173 }
1174
1175 /// Skips the first `n` items of the stream.
1176 ///
1177 /// # Examples
1178 ///
1179 /// ```
1180 /// use futures_lite::stream::{self, StreamExt};
1181 ///
1182 /// # spin_on::spin_on(async {
1183 /// let s = stream::iter(vec![1, 2, 3]);
1184 /// let mut s = s.skip(2);
1185 ///
1186 /// assert_eq!(s.next().await, Some(3));
1187 /// assert_eq!(s.next().await, None);
1188 /// # });
1189 /// ```
1190 fn skip(self, n: usize) -> Skip<Self>
1191 where
1192 Self: Sized,
1193 {
1194 Skip { stream: self, n }
1195 }
1196
1197 /// Skips items while `predicate` returns `true`.
1198 ///
1199 /// # Examples
1200 ///
1201 /// ```
1202 /// use futures_lite::stream::{self, StreamExt};
1203 ///
1204 /// # spin_on::spin_on(async {
1205 /// let s = stream::iter(vec![-1i32, 0, 1]);
1206 /// let mut s = s.skip_while(|x| x.is_negative());
1207 ///
1208 /// assert_eq!(s.next().await, Some(0));
1209 /// assert_eq!(s.next().await, Some(1));
1210 /// assert_eq!(s.next().await, None);
1211 /// # });
1212 /// ```
1213 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1214 where
1215 Self: Sized,
1216 P: FnMut(&Self::Item) -> bool,
1217 {
1218 SkipWhile {
1219 stream: self,
1220 predicate: Some(predicate),
1221 }
1222 }
1223
1224 /// Yields every `step`th item.
1225 ///
1226 /// # Panics
1227 ///
1228 /// This method will panic if the `step` is 0.
1229 ///
1230 /// # Examples
1231 ///
1232 /// ```
1233 /// use futures_lite::stream::{self, StreamExt};
1234 ///
1235 /// # spin_on::spin_on(async {
1236 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1237 /// let mut s = s.step_by(2);
1238 ///
1239 /// assert_eq!(s.next().await, Some(0));
1240 /// assert_eq!(s.next().await, Some(2));
1241 /// assert_eq!(s.next().await, Some(4));
1242 /// assert_eq!(s.next().await, None);
1243 /// # });
1244 /// ```
1245 fn step_by(self, step: usize) -> StepBy<Self>
1246 where
1247 Self: Sized,
1248 {
1249 assert!(step > 0, "`step` must be greater than zero");
1250 StepBy {
1251 stream: self,
1252 step,
1253 i: 0,
1254 }
1255 }
1256
1257 /// Appends another stream to the end of this one.
1258 ///
1259 /// # Examples
1260 ///
1261 /// ```
1262 /// use futures_lite::stream::{self, StreamExt};
1263 ///
1264 /// # spin_on::spin_on(async {
1265 /// let s1 = stream::iter(vec![1, 2]);
1266 /// let s2 = stream::iter(vec![7, 8]);
1267 /// let mut s = s1.chain(s2);
1268 ///
1269 /// assert_eq!(s.next().await, Some(1));
1270 /// assert_eq!(s.next().await, Some(2));
1271 /// assert_eq!(s.next().await, Some(7));
1272 /// assert_eq!(s.next().await, Some(8));
1273 /// assert_eq!(s.next().await, None);
1274 /// # });
1275 /// ```
1276 fn chain<U>(self, other: U) -> Chain<Self, U>
1277 where
1278 Self: Sized,
1279 U: Stream<Item = Self::Item> + Sized,
1280 {
1281 Chain {
1282 first: self.fuse(),
1283 second: other.fuse(),
1284 }
1285 }
1286
1287 /// Clones all items.
1288 ///
1289 /// # Examples
1290 ///
1291 /// ```
1292 /// use futures_lite::stream::{self, StreamExt};
1293 ///
1294 /// # spin_on::spin_on(async {
1295 /// let s = stream::iter(vec![&1, &2]);
1296 /// let mut s = s.cloned();
1297 ///
1298 /// assert_eq!(s.next().await, Some(1));
1299 /// assert_eq!(s.next().await, Some(2));
1300 /// assert_eq!(s.next().await, None);
1301 /// # });
1302 /// ```
1303 fn cloned<'a, T>(self) -> Cloned<Self>
1304 where
1305 Self: Stream<Item = &'a T> + Sized,
1306 T: Clone + 'a,
1307 {
1308 Cloned { stream: self }
1309 }
1310
1311 /// Copies all items.
1312 ///
1313 /// # Examples
1314 ///
1315 /// ```
1316 /// use futures_lite::stream::{self, StreamExt};
1317 ///
1318 /// # spin_on::spin_on(async {
1319 /// let s = stream::iter(vec![&1, &2]);
1320 /// let mut s = s.copied();
1321 ///
1322 /// assert_eq!(s.next().await, Some(1));
1323 /// assert_eq!(s.next().await, Some(2));
1324 /// assert_eq!(s.next().await, None);
1325 /// # });
1326 /// ```
1327 fn copied<'a, T>(self) -> Copied<Self>
1328 where
1329 Self: Stream<Item = &'a T> + Sized,
1330 T: Copy + 'a,
1331 {
1332 Copied { stream: self }
1333 }
1334
1335 /// Collects all items in the stream into a collection.
1336 ///
1337 /// # Examples
1338 ///
1339 /// ```
1340 /// use futures_lite::stream::{self, StreamExt};
1341 ///
1342 /// # spin_on::spin_on(async {
1343 /// let mut s = stream::iter(1..=3);
1344 ///
1345 /// let items: Vec<_> = s.collect().await;
1346 /// assert_eq!(items, [1, 2, 3]);
1347 /// # });
1348 /// ```
1349 fn collect<C>(self) -> CollectFuture<Self, C>
1350 where
1351 Self: Sized,
1352 C: Default + Extend<Self::Item>,
1353 {
1354 CollectFuture {
1355 stream: self,
1356 collection: Default::default(),
1357 }
1358 }
1359
1360 /// Collects all items in the fallible stream into a collection.
1361 ///
1362 /// ```
1363 /// use futures_lite::stream::{self, StreamExt};
1364 ///
1365 /// # spin_on::spin_on(async {
1366 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1367 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1368 /// assert_eq!(res, Err(2));
1369 ///
1370 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1371 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1372 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1373 /// # })
1374 /// ```
1375 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1376 where
1377 Self: Stream<Item = Result<T, E>> + Sized,
1378 C: Default + Extend<T>,
1379 {
1380 TryCollectFuture {
1381 stream: self,
1382 items: Default::default(),
1383 }
1384 }
1385
1386 /// Partitions items into those for which `predicate` is `true` and those for which it is
1387 /// `false`, and then collects them into two collections.
1388 ///
1389 /// # Examples
1390 ///
1391 /// ```
1392 /// use futures_lite::stream::{self, StreamExt};
1393 ///
1394 /// # spin_on::spin_on(async {
1395 /// let s = stream::iter(vec![1, 2, 3]);
1396 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1397 ///
1398 /// assert_eq!(even, &[2]);
1399 /// assert_eq!(odd, &[1, 3]);
1400 /// # })
1401 /// ```
1402 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1403 where
1404 Self: Sized,
1405 B: Default + Extend<Self::Item>,
1406 P: FnMut(&Self::Item) -> bool,
1407 {
1408 PartitionFuture {
1409 stream: self,
1410 predicate,
1411 res: Some(Default::default()),
1412 }
1413 }
1414
1415 /// Accumulates a computation over the stream.
1416 ///
1417 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1418 /// the accumulator and each item in the stream. The final accumulator value is returned.
1419 ///
1420 /// # Examples
1421 ///
1422 /// ```
1423 /// use futures_lite::stream::{self, StreamExt};
1424 ///
1425 /// # spin_on::spin_on(async {
1426 /// let s = stream::iter(vec![1, 2, 3]);
1427 /// let sum = s.fold(0, |acc, x| acc + x).await;
1428 ///
1429 /// assert_eq!(sum, 6);
1430 /// # })
1431 /// ```
1432 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1433 where
1434 Self: Sized,
1435 F: FnMut(T, Self::Item) -> T,
1436 {
1437 FoldFuture {
1438 stream: self,
1439 f,
1440 acc: Some(init),
1441 }
1442 }
1443
1444 /// Accumulates a fallible computation over the stream.
1445 ///
1446 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1447 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1448 /// error if `f` failed the computation.
1449 ///
1450 /// # Examples
1451 ///
1452 /// ```
1453 /// use futures_lite::stream::{self, StreamExt};
1454 ///
1455 /// # spin_on::spin_on(async {
1456 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1457 ///
1458 /// let sum = s.try_fold(0, |acc, v| {
1459 /// if (acc + v) % 2 == 1 {
1460 /// Ok(acc + v)
1461 /// } else {
1462 /// Err("fail")
1463 /// }
1464 /// })
1465 /// .await;
1466 ///
1467 /// assert_eq!(sum, Err("fail"));
1468 /// # })
1469 /// ```
1470 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1471 where
1472 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1473 F: FnMut(B, T) -> Result<B, E>,
1474 {
1475 TryFoldFuture {
1476 stream: self,
1477 f,
1478 acc: Some(init),
1479 }
1480 }
1481
1482 /// Maps items of the stream to new values using a state value and a closure.
1483 ///
1484 /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1485 /// state and each item in the stream. The stream stops when `f` returns `None`.
1486 ///
1487 /// # Examples
1488 ///
1489 /// ```
1490 /// use futures_lite::stream::{self, StreamExt};
1491 ///
1492 /// # spin_on::spin_on(async {
1493 /// let s = stream::iter(vec![1, 2, 3]);
1494 /// let mut s = s.scan(1, |state, x| {
1495 /// *state = *state * x;
1496 /// Some(-*state)
1497 /// });
1498 ///
1499 /// assert_eq!(s.next().await, Some(-1));
1500 /// assert_eq!(s.next().await, Some(-2));
1501 /// assert_eq!(s.next().await, Some(-6));
1502 /// assert_eq!(s.next().await, None);
1503 /// # })
1504 /// ```
1505 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1506 where
1507 Self: Sized,
1508 F: FnMut(&mut St, Self::Item) -> Option<B>,
1509 {
1510 Scan {
1511 stream: self,
1512 state_f: (initial_state, f),
1513 }
1514 }
1515
1516 /// Fuses the stream so that it stops yielding items after the first [`None`].
1517 ///
1518 /// # Examples
1519 ///
1520 /// ```
1521 /// use futures_lite::stream::{self, StreamExt};
1522 ///
1523 /// # spin_on::spin_on(async {
1524 /// let mut s = stream::once(1).fuse();
1525 ///
1526 /// assert_eq!(s.next().await, Some(1));
1527 /// assert_eq!(s.next().await, None);
1528 /// assert_eq!(s.next().await, None);
1529 /// # })
1530 /// ```
1531 fn fuse(self) -> Fuse<Self>
1532 where
1533 Self: Sized,
1534 {
1535 Fuse {
1536 stream: self,
1537 done: false,
1538 }
1539 }
1540
1541 /// Repeats the stream from beginning to end, forever.
1542 ///
1543 /// # Examples
1544 ///
1545 /// ```
1546 /// use futures_lite::stream::{self, StreamExt};
1547 ///
1548 /// # spin_on::spin_on(async {
1549 /// let mut s = stream::iter(vec![1, 2]).cycle();
1550 ///
1551 /// assert_eq!(s.next().await, Some(1));
1552 /// assert_eq!(s.next().await, Some(2));
1553 /// assert_eq!(s.next().await, Some(1));
1554 /// assert_eq!(s.next().await, Some(2));
1555 /// # });
1556 /// ```
1557 fn cycle(self) -> Cycle<Self>
1558 where
1559 Self: Clone + Sized,
1560 {
1561 Cycle {
1562 orig: self.clone(),
1563 stream: self,
1564 }
1565 }
1566
1567 /// Enumerates items, mapping them to `(index, item)`.
1568 ///
1569 /// # Examples
1570 ///
1571 /// ```
1572 /// use futures_lite::stream::{self, StreamExt};
1573 ///
1574 /// # spin_on::spin_on(async {
1575 /// let s = stream::iter(vec!['a', 'b', 'c']);
1576 /// let mut s = s.enumerate();
1577 ///
1578 /// assert_eq!(s.next().await, Some((0, 'a')));
1579 /// assert_eq!(s.next().await, Some((1, 'b')));
1580 /// assert_eq!(s.next().await, Some((2, 'c')));
1581 /// assert_eq!(s.next().await, None);
1582 /// # });
1583 /// ```
1584 fn enumerate(self) -> Enumerate<Self>
1585 where
1586 Self: Sized,
1587 {
1588 Enumerate { stream: self, i: 0 }
1589 }
1590
1591 /// Calls a closure on each item and passes it on.
1592 ///
1593 /// # Examples
1594 ///
1595 /// ```
1596 /// use futures_lite::stream::{self, StreamExt};
1597 ///
1598 /// # spin_on::spin_on(async {
1599 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1600 ///
1601 /// let sum = s
1602 /// .inspect(|x| println!("about to filter {}", x))
1603 /// .filter(|x| x % 2 == 0)
1604 /// .inspect(|x| println!("made it through filter: {}", x))
1605 /// .fold(0, |sum, i| sum + i)
1606 /// .await;
1607 /// # });
1608 /// ```
1609 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1610 where
1611 Self: Sized,
1612 F: FnMut(&Self::Item),
1613 {
1614 Inspect { stream: self, f }
1615 }
1616
1617 /// Gets the `n`th item of the stream.
1618 ///
1619 /// In the end, `n+1` items of the stream will be consumed.
1620 ///
1621 /// # Examples
1622 ///
1623 /// ```
1624 /// use futures_lite::stream::{self, StreamExt};
1625 ///
1626 /// # spin_on::spin_on(async {
1627 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1628 ///
1629 /// assert_eq!(s.nth(2).await, Some(2));
1630 /// assert_eq!(s.nth(2).await, Some(5));
1631 /// assert_eq!(s.nth(2).await, None);
1632 /// # });
1633 /// ```
1634 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1635 where
1636 Self: Unpin,
1637 {
1638 NthFuture { stream: self, n }
1639 }
1640
1641 /// Returns the last item in the stream.
1642 ///
1643 /// # Examples
1644 ///
1645 /// ```
1646 /// use futures_lite::stream::{self, StreamExt};
1647 ///
1648 /// # spin_on::spin_on(async {
1649 /// let s = stream::iter(vec![1, 2, 3, 4]);
1650 /// assert_eq!(s.last().await, Some(4));
1651 ///
1652 /// let s = stream::empty::<i32>();
1653 /// assert_eq!(s.last().await, None);
1654 /// # });
1655 /// ```
1656 fn last(self) -> LastFuture<Self>
1657 where
1658 Self: Sized,
1659 {
1660 LastFuture {
1661 stream: self,
1662 last: None,
1663 }
1664 }
1665
1666 /// Finds the first item of the stream for which `predicate` returns `true`.
1667 ///
1668 /// # Examples
1669 ///
1670 /// ```
1671 /// use futures_lite::stream::{self, StreamExt};
1672 ///
1673 /// # spin_on::spin_on(async {
1674 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1675 ///
1676 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1677 /// assert_eq!(s.next().await, Some(13));
1678 /// # });
1679 /// ```
1680 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1681 where
1682 Self: Unpin,
1683 P: FnMut(&Self::Item) -> bool,
1684 {
1685 FindFuture {
1686 stream: self,
1687 predicate,
1688 }
1689 }
1690
1691 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1692 ///
1693 /// # Examples
1694 ///
1695 /// ```
1696 /// use futures_lite::stream::{self, StreamExt};
1697 ///
1698 /// # spin_on::spin_on(async {
1699 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1700 /// let number = s.find_map(|s| s.parse().ok()).await;
1701 ///
1702 /// assert_eq!(number, Some(2));
1703 /// # });
1704 /// ```
1705 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1706 where
1707 Self: Unpin,
1708 F: FnMut(Self::Item) -> Option<B>,
1709 {
1710 FindMapFuture { stream: self, f }
1711 }
1712
1713 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1714 ///
1715 /// # Examples
1716 ///
1717 /// ```
1718 /// use futures_lite::stream::{self, StreamExt};
1719 ///
1720 /// # spin_on::spin_on(async {
1721 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1722 ///
1723 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1724 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1725 /// assert_eq!(s.position(|x| x == 9).await, None);
1726 /// # });
1727 /// ```
1728 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1729 where
1730 Self: Unpin,
1731 P: FnMut(Self::Item) -> bool,
1732 {
1733 PositionFuture {
1734 stream: self,
1735 predicate,
1736 index: 0,
1737 }
1738 }
1739
1740 /// Tests if `predicate` returns `true` for all items in the stream.
1741 ///
1742 /// The result is `true` for an empty stream.
1743 ///
1744 /// # Examples
1745 ///
1746 /// ```
1747 /// use futures_lite::stream::{self, StreamExt};
1748 ///
1749 /// # spin_on::spin_on(async {
1750 /// let mut s = stream::iter(vec![1, 2, 3]);
1751 /// assert!(!s.all(|x| x % 2 == 0).await);
1752 ///
1753 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1754 /// assert!(s.all(|x| x % 2 == 0).await);
1755 ///
1756 /// let mut s = stream::empty::<i32>();
1757 /// assert!(s.all(|x| x % 2 == 0).await);
1758 /// # });
1759 /// ```
1760 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1761 where
1762 Self: Unpin,
1763 P: FnMut(Self::Item) -> bool,
1764 {
1765 AllFuture {
1766 stream: self,
1767 predicate,
1768 }
1769 }
1770
1771 /// Tests if `predicate` returns `true` for any item in the stream.
1772 ///
1773 /// The result is `false` for an empty stream.
1774 ///
1775 /// # Examples
1776 ///
1777 /// ```
1778 /// use futures_lite::stream::{self, StreamExt};
1779 ///
1780 /// # spin_on::spin_on(async {
1781 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1782 /// assert!(!s.any(|x| x % 2 == 0).await);
1783 ///
1784 /// let mut s = stream::iter(vec![1, 2, 3]);
1785 /// assert!(s.any(|x| x % 2 == 0).await);
1786 ///
1787 /// let mut s = stream::empty::<i32>();
1788 /// assert!(!s.any(|x| x % 2 == 0).await);
1789 /// # });
1790 /// ```
1791 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1792 where
1793 Self: Unpin,
1794 P: FnMut(Self::Item) -> bool,
1795 {
1796 AnyFuture {
1797 stream: self,
1798 predicate,
1799 }
1800 }
1801
1802 /// Calls a closure on each item of the stream.
1803 ///
1804 /// # Examples
1805 ///
1806 /// ```
1807 /// use futures_lite::stream::{self, StreamExt};
1808 ///
1809 /// # spin_on::spin_on(async {
1810 /// let mut s = stream::iter(vec![1, 2, 3]);
1811 /// s.for_each(|s| println!("{}", s)).await;
1812 /// # });
1813 /// ```
1814 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1815 where
1816 Self: Sized,
1817 F: FnMut(Self::Item),
1818 {
1819 ForEachFuture { stream: self, f }
1820 }
1821
1822 /// Calls a fallible closure on each item of the stream, stopping on first error.
1823 ///
1824 /// # Examples
1825 ///
1826 /// ```
1827 /// use futures_lite::stream::{self, StreamExt};
1828 ///
1829 /// # spin_on::spin_on(async {
1830 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1831 ///
1832 /// let mut v = vec![];
1833 /// let res = s
1834 /// .try_for_each(|n| {
1835 /// if n < 2 {
1836 /// v.push(n);
1837 /// Ok(())
1838 /// } else {
1839 /// Err("too big")
1840 /// }
1841 /// })
1842 /// .await;
1843 ///
1844 /// assert_eq!(v, &[0, 1]);
1845 /// assert_eq!(res, Err("too big"));
1846 /// # });
1847 /// ```
1848 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1849 where
1850 Self: Unpin,
1851 F: FnMut(Self::Item) -> Result<(), E>,
1852 {
1853 TryForEachFuture { stream: self, f }
1854 }
1855
1856 /// Zips up two streams into a single stream of pairs.
1857 ///
1858 /// The stream of pairs stops when either of the original two streams is exhausted.
1859 ///
1860 /// # Examples
1861 ///
1862 /// ```
1863 /// use futures_lite::stream::{self, StreamExt};
1864 ///
1865 /// # spin_on::spin_on(async {
1866 /// let l = stream::iter(vec![1, 2, 3]);
1867 /// let r = stream::iter(vec![4, 5, 6, 7]);
1868 /// let mut s = l.zip(r);
1869 ///
1870 /// assert_eq!(s.next().await, Some((1, 4)));
1871 /// assert_eq!(s.next().await, Some((2, 5)));
1872 /// assert_eq!(s.next().await, Some((3, 6)));
1873 /// assert_eq!(s.next().await, None);
1874 /// # });
1875 /// ```
1876 fn zip<U>(self, other: U) -> Zip<Self, U>
1877 where
1878 Self: Sized,
1879 U: Stream,
1880 {
1881 Zip {
1882 item_slot: None,
1883 first: self,
1884 second: other,
1885 }
1886 }
1887
1888 /// Collects a stream of pairs into a pair of collections.
1889 ///
1890 /// # Examples
1891 ///
1892 /// ```
1893 /// use futures_lite::stream::{self, StreamExt};
1894 ///
1895 /// # spin_on::spin_on(async {
1896 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1897 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1898 ///
1899 /// assert_eq!(left, [1, 3]);
1900 /// assert_eq!(right, [2, 4]);
1901 /// # });
1902 /// ```
1903 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1904 where
1905 FromA: Default + Extend<A>,
1906 FromB: Default + Extend<B>,
1907 Self: Stream<Item = (A, B)> + Sized,
1908 {
1909 UnzipFuture {
1910 stream: self,
1911 res: Some(Default::default()),
1912 }
1913 }
1914
1915 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1916 ///
1917 /// # Examples
1918 ///
1919 /// ```
1920 /// use futures_lite::stream::{self, StreamExt};
1921 /// use futures_lite::stream::{once, pending};
1922 ///
1923 /// # spin_on::spin_on(async {
1924 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1925 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1926 ///
1927 /// // The first future wins.
1928 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1929 /// # })
1930 /// ```
1931 fn or<S>(self, other: S) -> Or<Self, S>
1932 where
1933 Self: Sized,
1934 S: Stream<Item = Self::Item>,
1935 {
1936 Or {
1937 stream1: self,
1938 stream2: other,
1939 }
1940 }
1941
1942 /// Merges with `other` stream, with no preference for either stream when both are ready.
1943 ///
1944 /// # Examples
1945 ///
1946 /// ```
1947 /// use futures_lite::stream::{self, StreamExt};
1948 /// use futures_lite::stream::{once, pending};
1949 ///
1950 /// # spin_on::spin_on(async {
1951 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1952 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1953 ///
1954 /// // One of the two stream is randomly chosen as the winner.
1955 /// let res = once(1).race(once(2)).next().await;
1956 /// # })
1957 /// ```
1958 #[cfg(all(feature = "std", feature = "race"))]
1959 fn race<S>(self, other: S) -> Race<Self, S>
1960 where
1961 Self: Sized,
1962 S: Stream<Item = Self::Item>,
1963 {
1964 Race {
1965 stream1: self,
1966 stream2: other,
1967 rng: Rng::new(),
1968 }
1969 }
1970
1971 /// Yields all immediately available values from a stream.
1972 ///
1973 /// This is intended to be used as a way of polling a stream without waiting, similar to the
1974 /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1975 /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1976 /// channel, but will not wait for new messages.
1977 ///
1978 /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1979 /// polling context in order to poll the underlying stream. Since this stream will never return
1980 /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1981 /// [`Iterator`].
1982 ///
1983 /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1984 /// the future if it is polled again.
1985 ///
1986 /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1987 /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1988 /// [`Stream`]: crate::stream::Stream
1989 /// [`Iterator`]: std::iter::Iterator
1990 ///
1991 /// # Examples
1992 ///
1993 /// ```
1994 /// use futures_lite::{future, pin};
1995 /// use futures_lite::stream::{self, StreamExt};
1996 ///
1997 /// # #[cfg(feature = "std")] {
1998 /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1999 /// let pend_once = stream::once_future(async {
2000 /// future::yield_now().await;
2001 /// 3
2002 /// });
2003 /// let s = stream::iter(vec![1, 2]).chain(pend_once);
2004 /// pin!(s);
2005 ///
2006 /// // This will return the first two values, and then `None` because the stream returns
2007 /// // `Pending` after that.
2008 /// let mut iter = stream::block_on(s.drain());
2009 /// assert_eq!(iter.next(), Some(1));
2010 /// assert_eq!(iter.next(), Some(2));
2011 /// assert_eq!(iter.next(), None);
2012 ///
2013 /// // This will return the last value, because the stream returns `Ready` when polled.
2014 /// assert_eq!(iter.next(), Some(3));
2015 /// assert_eq!(iter.next(), None);
2016 /// # }
2017 /// ```
2018 fn drain(&mut self) -> Drain<'_, Self> {
2019 Drain { stream: self }
2020 }
2021
2022 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
2023 ///
2024 /// # Examples
2025 ///
2026 /// ```
2027 /// use futures_lite::stream::{self, StreamExt};
2028 ///
2029 /// # spin_on::spin_on(async {
2030 /// let a = stream::once(1);
2031 /// let b = stream::empty();
2032 ///
2033 /// // Streams of different types can be stored in
2034 /// // the same collection when they are boxed:
2035 /// let streams = vec![a.boxed(), b.boxed()];
2036 /// # })
2037 /// ```
2038 #[cfg(feature = "alloc")]
2039 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2040 where
2041 Self: Send + Sized + 'a,
2042 {
2043 Box::pin(self)
2044 }
2045
2046 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2047 ///
2048 /// # Examples
2049 ///
2050 /// ```
2051 /// use futures_lite::stream::{self, StreamExt};
2052 ///
2053 /// # spin_on::spin_on(async {
2054 /// let a = stream::once(1);
2055 /// let b = stream::empty();
2056 ///
2057 /// // Streams of different types can be stored in
2058 /// // the same collection when they are boxed:
2059 /// let streams = vec![a.boxed_local(), b.boxed_local()];
2060 /// # })
2061 /// ```
2062 #[cfg(feature = "alloc")]
2063 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2064 where
2065 Self: Sized + 'a,
2066 {
2067 Box::pin(self)
2068 }
2069}
2070
2071impl<S: Stream + ?Sized> StreamExt for S {}
2072
2073/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2074///
2075/// # Examples
2076///
2077/// ```
2078/// use futures_lite::stream::{self, StreamExt};
2079///
2080/// // These two lines are equivalent:
2081/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2082/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2083/// ```
2084#[cfg(feature = "alloc")]
2085pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2086
2087/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2088///
2089/// # Examples
2090///
2091/// ```
2092/// use futures_lite::stream::{self, StreamExt};
2093///
2094/// // These two lines are equivalent:
2095/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2096/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2097/// ```
2098#[cfg(feature = "alloc")]
2099pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2100
2101/// Future for the [`StreamExt::next()`] method.
2102#[derive(Debug)]
2103#[must_use = "futures do nothing unless you `.await` or poll them"]
2104pub struct NextFuture<'a, S: ?Sized> {
2105 stream: &'a mut S,
2106}
2107
2108impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2109
2110impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2111 type Output = Option<S::Item>;
2112
2113 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2114 self.stream.poll_next(cx)
2115 }
2116}
2117
2118/// Future for the [`StreamExt::try_next()`] method.
2119#[derive(Debug)]
2120#[must_use = "futures do nothing unless you `.await` or poll them"]
2121pub struct TryNextFuture<'a, S: ?Sized> {
2122 stream: &'a mut S,
2123}
2124
2125impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2126
2127impl<T, E, S> Future for TryNextFuture<'_, S>
2128where
2129 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2130{
2131 type Output = Result<Option<T>, E>;
2132
2133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2134 let res = ready!(self.stream.poll_next(cx));
2135 Poll::Ready(res.transpose())
2136 }
2137}
2138
2139pin_project! {
2140 /// Future for the [`StreamExt::count()`] method.
2141 #[derive(Debug)]
2142 #[must_use = "futures do nothing unless you `.await` or poll them"]
2143 pub struct CountFuture<S: ?Sized> {
2144 count: usize,
2145 #[pin]
2146 stream: S,
2147 }
2148}
2149
2150impl<S: Stream + ?Sized> Future for CountFuture<S> {
2151 type Output = usize;
2152
2153 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2154 loop {
2155 match ready!(self.as_mut().project().stream.poll_next(cx)) {
2156 None => return Poll::Ready(self.count),
2157 Some(_) => *self.as_mut().project().count += 1,
2158 }
2159 }
2160 }
2161}
2162
2163pin_project! {
2164 /// Future for the [`StreamExt::collect()`] method.
2165 #[derive(Debug)]
2166 #[must_use = "futures do nothing unless you `.await` or poll them"]
2167 pub struct CollectFuture<S, C> {
2168 #[pin]
2169 stream: S,
2170 collection: C,
2171 }
2172}
2173
2174impl<S, C> Future for CollectFuture<S, C>
2175where
2176 S: Stream,
2177 C: Default + Extend<S::Item>,
2178{
2179 type Output = C;
2180
2181 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2182 let mut this = self.as_mut().project();
2183 loop {
2184 match ready!(this.stream.as_mut().poll_next(cx)) {
2185 Some(e) => this.collection.extend(Some(e)),
2186 None => return Poll::Ready(mem::take(self.project().collection)),
2187 }
2188 }
2189 }
2190}
2191
2192pin_project! {
2193 /// Future for the [`StreamExt::try_collect()`] method.
2194 #[derive(Debug)]
2195 #[must_use = "futures do nothing unless you `.await` or poll them"]
2196 pub struct TryCollectFuture<S, C> {
2197 #[pin]
2198 stream: S,
2199 items: C,
2200 }
2201}
2202
2203impl<T, E, S, C> Future for TryCollectFuture<S, C>
2204where
2205 S: Stream<Item = Result<T, E>>,
2206 C: Default + Extend<T>,
2207{
2208 type Output = Result<C, E>;
2209
2210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2211 let mut this = self.project();
2212 Poll::Ready(Ok(loop {
2213 match ready!(this.stream.as_mut().poll_next(cx)?) {
2214 Some(x) => this.items.extend(Some(x)),
2215 None => break mem::take(this.items),
2216 }
2217 }))
2218 }
2219}
2220
2221pin_project! {
2222 /// Future for the [`StreamExt::partition()`] method.
2223 #[derive(Debug)]
2224 #[must_use = "futures do nothing unless you `.await` or poll them"]
2225 pub struct PartitionFuture<S, P, B> {
2226 #[pin]
2227 stream: S,
2228 predicate: P,
2229 res: Option<(B, B)>,
2230 }
2231}
2232
2233impl<S, P, B> Future for PartitionFuture<S, P, B>
2234where
2235 S: Stream + Sized,
2236 P: FnMut(&S::Item) -> bool,
2237 B: Default + Extend<S::Item>,
2238{
2239 type Output = (B, B);
2240
2241 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2242 let mut this = self.project();
2243 loop {
2244 match ready!(this.stream.as_mut().poll_next(cx)) {
2245 Some(v) => {
2246 let res = this.res.as_mut().unwrap();
2247 if (this.predicate)(&v) {
2248 res.0.extend(Some(v))
2249 } else {
2250 res.1.extend(Some(v))
2251 }
2252 }
2253 None => return Poll::Ready(this.res.take().unwrap()),
2254 }
2255 }
2256 }
2257}
2258
2259pin_project! {
2260 /// Future for the [`StreamExt::fold()`] method.
2261 #[derive(Debug)]
2262 #[must_use = "futures do nothing unless you `.await` or poll them"]
2263 pub struct FoldFuture<S, F, T> {
2264 #[pin]
2265 stream: S,
2266 f: F,
2267 acc: Option<T>,
2268 }
2269}
2270
2271impl<S, F, T> Future for FoldFuture<S, F, T>
2272where
2273 S: Stream,
2274 F: FnMut(T, S::Item) -> T,
2275{
2276 type Output = T;
2277
2278 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2279 let mut this = self.project();
2280 loop {
2281 match ready!(this.stream.as_mut().poll_next(cx)) {
2282 Some(v) => {
2283 let old = this.acc.take().unwrap();
2284 let new = (this.f)(old, v);
2285 *this.acc = Some(new);
2286 }
2287 None => return Poll::Ready(this.acc.take().unwrap()),
2288 }
2289 }
2290 }
2291}
2292
2293/// Future for the [`StreamExt::try_fold()`] method.
2294#[derive(Debug)]
2295#[must_use = "futures do nothing unless you `.await` or poll them"]
2296pub struct TryFoldFuture<'a, S, F, B> {
2297 stream: &'a mut S,
2298 f: F,
2299 acc: Option<B>,
2300}
2301
2302impl<S, F, B> Unpin for TryFoldFuture<'_, S, F, B> {}
2303
2304impl<T, E, S, F, B> Future for TryFoldFuture<'_, S, F, B>
2305where
2306 S: Stream<Item = Result<T, E>> + Unpin,
2307 F: FnMut(B, T) -> Result<B, E>,
2308{
2309 type Output = Result<B, E>;
2310
2311 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2312 loop {
2313 match ready!(self.stream.poll_next(cx)) {
2314 Some(Err(e)) => return Poll::Ready(Err(e)),
2315 Some(Ok(t)) => {
2316 let old = self.acc.take().unwrap();
2317 let new = (&mut self.f)(old, t);
2318
2319 match new {
2320 Ok(t) => self.acc = Some(t),
2321 Err(e) => return Poll::Ready(Err(e)),
2322 }
2323 }
2324 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2325 }
2326 }
2327 }
2328}
2329
2330pin_project! {
2331 /// Stream for the [`StreamExt::scan()`] method.
2332 #[derive(Clone, Debug)]
2333 #[must_use = "streams do nothing unless polled"]
2334 pub struct Scan<S, St, F> {
2335 #[pin]
2336 stream: S,
2337 state_f: (St, F),
2338 }
2339}
2340
2341impl<S, St, F, B> Stream for Scan<S, St, F>
2342where
2343 S: Stream,
2344 F: FnMut(&mut St, S::Item) -> Option<B>,
2345{
2346 type Item = B;
2347
2348 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2349 let mut this = self.project();
2350 this.stream.as_mut().poll_next(cx).map(|item| {
2351 item.and_then(|item| {
2352 let (state, f) = this.state_f;
2353 f(state, item)
2354 })
2355 })
2356 }
2357}
2358
2359pin_project! {
2360 /// Stream for the [`StreamExt::fuse()`] method.
2361 #[derive(Clone, Debug)]
2362 #[must_use = "streams do nothing unless polled"]
2363 pub struct Fuse<S> {
2364 #[pin]
2365 stream: S,
2366 done: bool,
2367 }
2368}
2369
2370impl<S: Stream> Stream for Fuse<S> {
2371 type Item = S::Item;
2372
2373 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2374 let this = self.project();
2375
2376 if *this.done {
2377 Poll::Ready(None)
2378 } else {
2379 let next = ready!(this.stream.poll_next(cx));
2380 if next.is_none() {
2381 *this.done = true;
2382 }
2383 Poll::Ready(next)
2384 }
2385 }
2386}
2387
2388pin_project! {
2389 /// Stream for the [`StreamExt::map()`] method.
2390 #[derive(Clone, Debug)]
2391 #[must_use = "streams do nothing unless polled"]
2392 pub struct Map<S, F> {
2393 #[pin]
2394 stream: S,
2395 f: F,
2396 }
2397}
2398
2399impl<S, F, T> Stream for Map<S, F>
2400where
2401 S: Stream,
2402 F: FnMut(S::Item) -> T,
2403{
2404 type Item = T;
2405
2406 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2407 let this = self.project();
2408 let next = ready!(this.stream.poll_next(cx));
2409 Poll::Ready(next.map(this.f))
2410 }
2411
2412 fn size_hint(&self) -> (usize, Option<usize>) {
2413 self.stream.size_hint()
2414 }
2415}
2416
2417pin_project! {
2418 /// Stream for the [`StreamExt::flat_map()`] method.
2419 #[derive(Clone, Debug)]
2420 #[must_use = "streams do nothing unless polled"]
2421 pub struct FlatMap<S, U, F> {
2422 #[pin]
2423 stream: Map<S, F>,
2424 #[pin]
2425 inner_stream: Option<U>,
2426 }
2427}
2428
2429impl<S, U, F> Stream for FlatMap<S, U, F>
2430where
2431 S: Stream,
2432 U: Stream,
2433 F: FnMut(S::Item) -> U,
2434{
2435 type Item = U::Item;
2436
2437 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2438 let mut this = self.project();
2439 loop {
2440 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2441 match ready!(inner.poll_next(cx)) {
2442 Some(item) => return Poll::Ready(Some(item)),
2443 None => this.inner_stream.set(None),
2444 }
2445 }
2446
2447 match ready!(this.stream.as_mut().poll_next(cx)) {
2448 Some(stream) => this.inner_stream.set(Some(stream)),
2449 None => return Poll::Ready(None),
2450 }
2451 }
2452 }
2453}
2454
2455pin_project! {
2456 /// Stream for the [`StreamExt::flatten()`] method.
2457 #[derive(Clone, Debug)]
2458 #[must_use = "streams do nothing unless polled"]
2459 pub struct Flatten<S: Stream> {
2460 #[pin]
2461 stream: S,
2462 #[pin]
2463 inner_stream: Option<S::Item>,
2464 }
2465}
2466
2467impl<S, U> Stream for Flatten<S>
2468where
2469 S: Stream<Item = U>,
2470 U: Stream,
2471{
2472 type Item = U::Item;
2473
2474 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2475 let mut this = self.project();
2476 loop {
2477 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2478 match ready!(inner.poll_next(cx)) {
2479 Some(item) => return Poll::Ready(Some(item)),
2480 None => this.inner_stream.set(None),
2481 }
2482 }
2483
2484 match ready!(this.stream.as_mut().poll_next(cx)) {
2485 Some(inner) => this.inner_stream.set(Some(inner)),
2486 None => return Poll::Ready(None),
2487 }
2488 }
2489 }
2490}
2491
2492pin_project! {
2493 /// Stream for the [`StreamExt::then()`] method.
2494 #[derive(Clone, Debug)]
2495 #[must_use = "streams do nothing unless polled"]
2496 pub struct Then<S, F, Fut> {
2497 #[pin]
2498 stream: S,
2499 #[pin]
2500 future: Option<Fut>,
2501 f: F,
2502 }
2503}
2504
2505impl<S, F, Fut> Stream for Then<S, F, Fut>
2506where
2507 S: Stream,
2508 F: FnMut(S::Item) -> Fut,
2509 Fut: Future,
2510{
2511 type Item = Fut::Output;
2512
2513 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2514 let mut this = self.project();
2515
2516 loop {
2517 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2518 let item = ready!(fut.poll(cx));
2519 this.future.set(None);
2520 return Poll::Ready(Some(item));
2521 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2522 this.future.set(Some((this.f)(item)));
2523 } else {
2524 return Poll::Ready(None);
2525 }
2526 }
2527 }
2528
2529 fn size_hint(&self) -> (usize, Option<usize>) {
2530 let future_len = self.future.is_some() as usize;
2531 let (lower, upper) = self.stream.size_hint();
2532 let lower = lower.saturating_add(future_len);
2533 let upper = upper.and_then(|u| u.checked_add(future_len));
2534 (lower, upper)
2535 }
2536}
2537
2538pin_project! {
2539 /// Stream for the [`StreamExt::filter()`] method.
2540 #[derive(Clone, Debug)]
2541 #[must_use = "streams do nothing unless polled"]
2542 pub struct Filter<S, P> {
2543 #[pin]
2544 stream: S,
2545 predicate: P,
2546 }
2547}
2548
2549impl<S, P> Stream for Filter<S, P>
2550where
2551 S: Stream,
2552 P: FnMut(&S::Item) -> bool,
2553{
2554 type Item = S::Item;
2555
2556 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2557 let mut this = self.project();
2558 loop {
2559 match ready!(this.stream.as_mut().poll_next(cx)) {
2560 None => return Poll::Ready(None),
2561 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2562 Some(_) => {}
2563 }
2564 }
2565 }
2566
2567 fn size_hint(&self) -> (usize, Option<usize>) {
2568 let (_, hi) = self.stream.size_hint();
2569
2570 // If the filter matches all of the elements, it will match the stream's upper bound.
2571 // If the filter matches none of the elements, there will be zero returned values.
2572 (0, hi)
2573 }
2574}
2575
2576/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2577///
2578/// # Examples
2579///
2580/// ```
2581/// use futures_lite::stream::{self, once, pending, StreamExt};
2582///
2583/// # spin_on::spin_on(async {
2584/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2585/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2586///
2587/// // The first stream wins.
2588/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2589/// # })
2590/// ```
2591pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2592where
2593 S1: Stream<Item = T>,
2594 S2: Stream<Item = T>,
2595{
2596 Or { stream1, stream2 }
2597}
2598
2599pin_project! {
2600 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2601 #[derive(Clone, Debug)]
2602 #[must_use = "streams do nothing unless polled"]
2603 pub struct Or<S1, S2> {
2604 #[pin]
2605 stream1: S1,
2606 #[pin]
2607 stream2: S2,
2608 }
2609}
2610
2611impl<T, S1, S2> Stream for Or<S1, S2>
2612where
2613 S1: Stream<Item = T>,
2614 S2: Stream<Item = T>,
2615{
2616 type Item = T;
2617
2618 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2619 let mut this = self.project();
2620
2621 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2622 return Poll::Ready(Some(t));
2623 }
2624 this.stream2.as_mut().poll_next(cx)
2625 }
2626}
2627
2628/// Merges two streams, with no preference for either stream when both are ready.
2629///
2630/// # Examples
2631///
2632/// ```
2633/// use futures_lite::stream::{self, once, pending, StreamExt};
2634///
2635/// # spin_on::spin_on(async {
2636/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2637/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2638///
2639/// // One of the two stream is randomly chosen as the winner.
2640/// let res = stream::race(once(1), once(2)).next().await;
2641/// # })
2642/// ```
2643#[cfg(all(feature = "std", feature = "race"))]
2644pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2645where
2646 S1: Stream<Item = T>,
2647 S2: Stream<Item = T>,
2648{
2649 Race {
2650 stream1,
2651 stream2,
2652 rng: Rng::new(),
2653 }
2654}
2655
2656/// Races two streams, but with a user-provided seed for randomness.
2657///
2658/// # Examples
2659///
2660/// ```
2661/// use futures_lite::stream::{self, once, pending, StreamExt};
2662///
2663/// // A fixed seed is used for reproducibility.
2664/// const SEED: u64 = 123;
2665///
2666/// # spin_on::spin_on(async {
2667/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2668/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2669///
2670/// // One of the two stream is randomly chosen as the winner.
2671/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2672/// # })
2673/// ```
2674#[cfg(feature = "race")]
2675pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2676where
2677 S1: Stream<Item = T>,
2678 S2: Stream<Item = T>,
2679{
2680 Race {
2681 stream1,
2682 stream2,
2683 rng: Rng::with_seed(seed),
2684 }
2685}
2686
2687#[cfg(feature = "race")]
2688pin_project! {
2689 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2690 #[derive(Clone, Debug)]
2691 #[must_use = "streams do nothing unless polled"]
2692 pub struct Race<S1, S2> {
2693 #[pin]
2694 stream1: S1,
2695 #[pin]
2696 stream2: S2,
2697 rng: Rng,
2698 }
2699}
2700
2701#[cfg(feature = "race")]
2702impl<T, S1, S2> Stream for Race<S1, S2>
2703where
2704 S1: Stream<Item = T>,
2705 S2: Stream<Item = T>,
2706{
2707 type Item = T;
2708
2709 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2710 let mut this = self.project();
2711
2712 if this.rng.bool() {
2713 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2714 return Poll::Ready(Some(t));
2715 }
2716 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2717 return Poll::Ready(Some(t));
2718 }
2719 } else {
2720 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2721 return Poll::Ready(Some(t));
2722 }
2723 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2724 return Poll::Ready(Some(t));
2725 }
2726 }
2727 Poll::Pending
2728 }
2729}
2730
2731pin_project! {
2732 /// Stream for the [`StreamExt::filter_map()`] method.
2733 #[derive(Clone, Debug)]
2734 #[must_use = "streams do nothing unless polled"]
2735 pub struct FilterMap<S, F> {
2736 #[pin]
2737 stream: S,
2738 f: F,
2739 }
2740}
2741
2742impl<S, F, T> Stream for FilterMap<S, F>
2743where
2744 S: Stream,
2745 F: FnMut(S::Item) -> Option<T>,
2746{
2747 type Item = T;
2748
2749 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2750 let mut this = self.project();
2751 loop {
2752 match ready!(this.stream.as_mut().poll_next(cx)) {
2753 None => return Poll::Ready(None),
2754 Some(v) => {
2755 if let Some(t) = (this.f)(v) {
2756 return Poll::Ready(Some(t));
2757 }
2758 }
2759 }
2760 }
2761 }
2762}
2763
2764pin_project! {
2765 /// Stream for the [`StreamExt::take()`] method.
2766 #[derive(Clone, Debug)]
2767 #[must_use = "streams do nothing unless polled"]
2768 pub struct Take<S> {
2769 #[pin]
2770 stream: S,
2771 n: usize,
2772 }
2773}
2774
2775impl<S: Stream> Stream for Take<S> {
2776 type Item = S::Item;
2777
2778 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2779 let this = self.project();
2780
2781 if *this.n == 0 {
2782 Poll::Ready(None)
2783 } else {
2784 let next = ready!(this.stream.poll_next(cx));
2785 match next {
2786 Some(_) => *this.n -= 1,
2787 None => *this.n = 0,
2788 }
2789 Poll::Ready(next)
2790 }
2791 }
2792}
2793
2794pin_project! {
2795 /// Stream for the [`StreamExt::take_while()`] method.
2796 #[derive(Clone, Debug)]
2797 #[must_use = "streams do nothing unless polled"]
2798 pub struct TakeWhile<S, P> {
2799 #[pin]
2800 stream: S,
2801 predicate: P,
2802 }
2803}
2804
2805impl<S, P> Stream for TakeWhile<S, P>
2806where
2807 S: Stream,
2808 P: FnMut(&S::Item) -> bool,
2809{
2810 type Item = S::Item;
2811
2812 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2813 let this = self.project();
2814
2815 match ready!(this.stream.poll_next(cx)) {
2816 Some(v) => {
2817 if (this.predicate)(&v) {
2818 Poll::Ready(Some(v))
2819 } else {
2820 Poll::Ready(None)
2821 }
2822 }
2823 None => Poll::Ready(None),
2824 }
2825 }
2826}
2827
2828pin_project! {
2829 /// Stream for the [`StreamExt::map_while()`] method.
2830 #[derive(Clone, Debug)]
2831 #[must_use = "streams do nothing unless polled"]
2832 pub struct MapWhile<S, P> {
2833 #[pin]
2834 stream: S,
2835 predicate: P,
2836 }
2837}
2838
2839impl<B, S, P> Stream for MapWhile<S, P>
2840where
2841 S: Stream,
2842 P: FnMut(S::Item) -> Option<B>,
2843{
2844 type Item = B;
2845
2846 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2847 let this = self.project();
2848
2849 match ready!(this.stream.poll_next(cx)) {
2850 Some(v) => Poll::Ready((this.predicate)(v)),
2851 None => Poll::Ready(None),
2852 }
2853 }
2854}
2855
2856pin_project! {
2857 /// Stream for the [`StreamExt::skip()`] method.
2858 #[derive(Clone, Debug)]
2859 #[must_use = "streams do nothing unless polled"]
2860 pub struct Skip<S> {
2861 #[pin]
2862 stream: S,
2863 n: usize,
2864 }
2865}
2866
2867impl<S: Stream> Stream for Skip<S> {
2868 type Item = S::Item;
2869
2870 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2871 let mut this = self.project();
2872 loop {
2873 match ready!(this.stream.as_mut().poll_next(cx)) {
2874 Some(v) => match *this.n {
2875 0 => return Poll::Ready(Some(v)),
2876 _ => *this.n -= 1,
2877 },
2878 None => return Poll::Ready(None),
2879 }
2880 }
2881 }
2882}
2883
2884pin_project! {
2885 /// Stream for the [`StreamExt::skip_while()`] method.
2886 #[derive(Clone, Debug)]
2887 #[must_use = "streams do nothing unless polled"]
2888 pub struct SkipWhile<S, P> {
2889 #[pin]
2890 stream: S,
2891 predicate: Option<P>,
2892 }
2893}
2894
2895impl<S, P> Stream for SkipWhile<S, P>
2896where
2897 S: Stream,
2898 P: FnMut(&S::Item) -> bool,
2899{
2900 type Item = S::Item;
2901
2902 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2903 let mut this = self.project();
2904 loop {
2905 match ready!(this.stream.as_mut().poll_next(cx)) {
2906 Some(v) => match this.predicate {
2907 Some(p) => {
2908 if !p(&v) {
2909 *this.predicate = None;
2910 return Poll::Ready(Some(v));
2911 }
2912 }
2913 None => return Poll::Ready(Some(v)),
2914 },
2915 None => return Poll::Ready(None),
2916 }
2917 }
2918 }
2919}
2920
2921pin_project! {
2922 /// Stream for the [`StreamExt::step_by()`] method.
2923 #[derive(Clone, Debug)]
2924 #[must_use = "streams do nothing unless polled"]
2925 pub struct StepBy<S> {
2926 #[pin]
2927 stream: S,
2928 step: usize,
2929 i: usize,
2930 }
2931}
2932
2933impl<S: Stream> Stream for StepBy<S> {
2934 type Item = S::Item;
2935
2936 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2937 let mut this = self.project();
2938 loop {
2939 match ready!(this.stream.as_mut().poll_next(cx)) {
2940 Some(v) => {
2941 if *this.i == 0 {
2942 *this.i = *this.step - 1;
2943 return Poll::Ready(Some(v));
2944 } else {
2945 *this.i -= 1;
2946 }
2947 }
2948 None => return Poll::Ready(None),
2949 }
2950 }
2951 }
2952}
2953
2954pin_project! {
2955 /// Stream for the [`StreamExt::chain()`] method.
2956 #[derive(Clone, Debug)]
2957 #[must_use = "streams do nothing unless polled"]
2958 pub struct Chain<S, U> {
2959 #[pin]
2960 first: Fuse<S>,
2961 #[pin]
2962 second: Fuse<U>,
2963 }
2964}
2965
2966impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2967 type Item = S::Item;
2968
2969 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2970 let mut this = self.project();
2971
2972 if !this.first.done {
2973 let next = ready!(this.first.as_mut().poll_next(cx));
2974 if let Some(next) = next {
2975 return Poll::Ready(Some(next));
2976 }
2977 }
2978
2979 if !this.second.done {
2980 let next = ready!(this.second.as_mut().poll_next(cx));
2981 if let Some(next) = next {
2982 return Poll::Ready(Some(next));
2983 }
2984 }
2985
2986 if this.first.done && this.second.done {
2987 Poll::Ready(None)
2988 } else {
2989 Poll::Pending
2990 }
2991 }
2992}
2993
2994pin_project! {
2995 /// Stream for the [`StreamExt::cloned()`] method.
2996 #[derive(Clone, Debug)]
2997 #[must_use = "streams do nothing unless polled"]
2998 pub struct Cloned<S> {
2999 #[pin]
3000 stream: S,
3001 }
3002}
3003
3004impl<'a, S, T: 'a> Stream for Cloned<S>
3005where
3006 S: Stream<Item = &'a T>,
3007 T: Clone,
3008{
3009 type Item = T;
3010
3011 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3012 let this = self.project();
3013 let next = ready!(this.stream.poll_next(cx));
3014 Poll::Ready(next.cloned())
3015 }
3016}
3017
3018pin_project! {
3019 /// Stream for the [`StreamExt::copied()`] method.
3020 #[derive(Clone, Debug)]
3021 #[must_use = "streams do nothing unless polled"]
3022 pub struct Copied<S> {
3023 #[pin]
3024 stream: S,
3025 }
3026}
3027
3028impl<'a, S, T: 'a> Stream for Copied<S>
3029where
3030 S: Stream<Item = &'a T>,
3031 T: Copy,
3032{
3033 type Item = T;
3034
3035 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3036 let this = self.project();
3037 let next = ready!(this.stream.poll_next(cx));
3038 Poll::Ready(next.copied())
3039 }
3040}
3041
3042pin_project! {
3043 /// Stream for the [`StreamExt::cycle()`] method.
3044 #[derive(Clone, Debug)]
3045 #[must_use = "streams do nothing unless polled"]
3046 pub struct Cycle<S> {
3047 orig: S,
3048 #[pin]
3049 stream: S,
3050 }
3051}
3052
3053impl<S> Stream for Cycle<S>
3054where
3055 S: Stream + Clone,
3056{
3057 type Item = S::Item;
3058
3059 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3060 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
3061 Some(item) => Poll::Ready(Some(item)),
3062 None => {
3063 let new = self.as_mut().orig.clone();
3064 self.as_mut().project().stream.set(new);
3065 self.project().stream.poll_next(cx)
3066 }
3067 }
3068 }
3069}
3070
3071pin_project! {
3072 /// Stream for the [`StreamExt::enumerate()`] method.
3073 #[derive(Clone, Debug)]
3074 #[must_use = "streams do nothing unless polled"]
3075 pub struct Enumerate<S> {
3076 #[pin]
3077 stream: S,
3078 i: usize,
3079 }
3080}
3081
3082impl<S> Stream for Enumerate<S>
3083where
3084 S: Stream,
3085{
3086 type Item = (usize, S::Item);
3087
3088 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3089 let this = self.project();
3090
3091 match ready!(this.stream.poll_next(cx)) {
3092 Some(v) => {
3093 let ret = (*this.i, v);
3094 *this.i += 1;
3095 Poll::Ready(Some(ret))
3096 }
3097 None => Poll::Ready(None),
3098 }
3099 }
3100}
3101
3102pin_project! {
3103 /// Stream for the [`StreamExt::inspect()`] method.
3104 #[derive(Clone, Debug)]
3105 #[must_use = "streams do nothing unless polled"]
3106 pub struct Inspect<S, F> {
3107 #[pin]
3108 stream: S,
3109 f: F,
3110 }
3111}
3112
3113impl<S, F> Stream for Inspect<S, F>
3114where
3115 S: Stream,
3116 F: FnMut(&S::Item),
3117{
3118 type Item = S::Item;
3119
3120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3121 let mut this = self.project();
3122 let next = ready!(this.stream.as_mut().poll_next(cx));
3123 if let Some(x) = &next {
3124 (this.f)(x);
3125 }
3126 Poll::Ready(next)
3127 }
3128}
3129
3130/// Future for the [`StreamExt::nth()`] method.
3131#[derive(Debug)]
3132#[must_use = "futures do nothing unless you `.await` or poll them"]
3133pub struct NthFuture<'a, S: ?Sized> {
3134 stream: &'a mut S,
3135 n: usize,
3136}
3137
3138impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3139
3140impl<S> Future for NthFuture<'_, S>
3141where
3142 S: Stream + Unpin + ?Sized,
3143{
3144 type Output = Option<S::Item>;
3145
3146 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3147 loop {
3148 match ready!(self.stream.poll_next(cx)) {
3149 Some(v) => match self.n {
3150 0 => return Poll::Ready(Some(v)),
3151 _ => self.n -= 1,
3152 },
3153 None => return Poll::Ready(None),
3154 }
3155 }
3156 }
3157}
3158
3159pin_project! {
3160 /// Future for the [`StreamExt::last()`] method.
3161 #[derive(Debug)]
3162 #[must_use = "futures do nothing unless you `.await` or poll them"]
3163 pub struct LastFuture<S: Stream> {
3164 #[pin]
3165 stream: S,
3166 last: Option<S::Item>,
3167 }
3168}
3169
3170impl<S: Stream> Future for LastFuture<S> {
3171 type Output = Option<S::Item>;
3172
3173 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3174 let mut this = self.project();
3175 loop {
3176 match ready!(this.stream.as_mut().poll_next(cx)) {
3177 Some(new) => *this.last = Some(new),
3178 None => return Poll::Ready(this.last.take()),
3179 }
3180 }
3181 }
3182}
3183
3184/// Future for the [`StreamExt::find()`] method.
3185#[derive(Debug)]
3186#[must_use = "futures do nothing unless you `.await` or poll them"]
3187pub struct FindFuture<'a, S: ?Sized, P> {
3188 stream: &'a mut S,
3189 predicate: P,
3190}
3191
3192impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3193
3194impl<S, P> Future for FindFuture<'_, S, P>
3195where
3196 S: Stream + Unpin + ?Sized,
3197 P: FnMut(&S::Item) -> bool,
3198{
3199 type Output = Option<S::Item>;
3200
3201 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3202 loop {
3203 match ready!(self.stream.poll_next(cx)) {
3204 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3205 Some(_) => {}
3206 None => return Poll::Ready(None),
3207 }
3208 }
3209 }
3210}
3211
3212/// Future for the [`StreamExt::find_map()`] method.
3213#[derive(Debug)]
3214#[must_use = "futures do nothing unless you `.await` or poll them"]
3215pub struct FindMapFuture<'a, S: ?Sized, F> {
3216 stream: &'a mut S,
3217 f: F,
3218}
3219
3220impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3221
3222impl<S, B, F> Future for FindMapFuture<'_, S, F>
3223where
3224 S: Stream + Unpin + ?Sized,
3225 F: FnMut(S::Item) -> Option<B>,
3226{
3227 type Output = Option<B>;
3228
3229 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3230 loop {
3231 match ready!(self.stream.poll_next(cx)) {
3232 Some(v) => {
3233 if let Some(v) = (&mut self.f)(v) {
3234 return Poll::Ready(Some(v));
3235 }
3236 }
3237 None => return Poll::Ready(None),
3238 }
3239 }
3240 }
3241}
3242
3243/// Future for the [`StreamExt::position()`] method.
3244#[derive(Debug)]
3245#[must_use = "futures do nothing unless you `.await` or poll them"]
3246pub struct PositionFuture<'a, S: ?Sized, P> {
3247 stream: &'a mut S,
3248 predicate: P,
3249 index: usize,
3250}
3251
3252impl<S: Unpin + ?Sized, P> Unpin for PositionFuture<'_, S, P> {}
3253
3254impl<S, P> Future for PositionFuture<'_, S, P>
3255where
3256 S: Stream + Unpin + ?Sized,
3257 P: FnMut(S::Item) -> bool,
3258{
3259 type Output = Option<usize>;
3260
3261 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3262 loop {
3263 match ready!(self.stream.poll_next(cx)) {
3264 Some(v) => {
3265 if (&mut self.predicate)(v) {
3266 return Poll::Ready(Some(self.index));
3267 } else {
3268 self.index += 1;
3269 }
3270 }
3271 None => return Poll::Ready(None),
3272 }
3273 }
3274 }
3275}
3276
3277/// Future for the [`StreamExt::all()`] method.
3278#[derive(Debug)]
3279#[must_use = "futures do nothing unless you `.await` or poll them"]
3280pub struct AllFuture<'a, S: ?Sized, P> {
3281 stream: &'a mut S,
3282 predicate: P,
3283}
3284
3285impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3286
3287impl<S, P> Future for AllFuture<'_, S, P>
3288where
3289 S: Stream + Unpin + ?Sized,
3290 P: FnMut(S::Item) -> bool,
3291{
3292 type Output = bool;
3293
3294 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3295 loop {
3296 match ready!(self.stream.poll_next(cx)) {
3297 Some(v) => {
3298 if !(&mut self.predicate)(v) {
3299 return Poll::Ready(false);
3300 }
3301 }
3302 None => return Poll::Ready(true),
3303 }
3304 }
3305 }
3306}
3307
3308/// Future for the [`StreamExt::any()`] method.
3309#[derive(Debug)]
3310#[must_use = "futures do nothing unless you `.await` or poll them"]
3311pub struct AnyFuture<'a, S: ?Sized, P> {
3312 stream: &'a mut S,
3313 predicate: P,
3314}
3315
3316impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3317
3318impl<S, P> Future for AnyFuture<'_, S, P>
3319where
3320 S: Stream + Unpin + ?Sized,
3321 P: FnMut(S::Item) -> bool,
3322{
3323 type Output = bool;
3324
3325 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3326 loop {
3327 match ready!(self.stream.poll_next(cx)) {
3328 Some(v) => {
3329 if (&mut self.predicate)(v) {
3330 return Poll::Ready(true);
3331 }
3332 }
3333 None => return Poll::Ready(false),
3334 }
3335 }
3336 }
3337}
3338
3339pin_project! {
3340 /// Future for the [`StreamExt::for_each()`] method.
3341 #[derive(Debug)]
3342 #[must_use = "futures do nothing unless you `.await` or poll them"]
3343 pub struct ForEachFuture<S, F> {
3344 #[pin]
3345 stream: S,
3346 f: F,
3347 }
3348}
3349
3350impl<S, F> Future for ForEachFuture<S, F>
3351where
3352 S: Stream,
3353 F: FnMut(S::Item),
3354{
3355 type Output = ();
3356
3357 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3358 let mut this = self.project();
3359 loop {
3360 match ready!(this.stream.as_mut().poll_next(cx)) {
3361 Some(v) => (this.f)(v),
3362 None => return Poll::Ready(()),
3363 }
3364 }
3365 }
3366}
3367
3368/// Future for the [`StreamExt::try_for_each()`] method.
3369#[derive(Debug)]
3370#[must_use = "futures do nothing unless you `.await` or poll them"]
3371pub struct TryForEachFuture<'a, S: ?Sized, F> {
3372 stream: &'a mut S,
3373 f: F,
3374}
3375
3376impl<S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'_, S, F> {}
3377
3378impl<S, F, E> Future for TryForEachFuture<'_, S, F>
3379where
3380 S: Stream + Unpin + ?Sized,
3381 F: FnMut(S::Item) -> Result<(), E>,
3382{
3383 type Output = Result<(), E>;
3384
3385 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3386 loop {
3387 match ready!(self.stream.poll_next(cx)) {
3388 None => return Poll::Ready(Ok(())),
3389 Some(v) => (&mut self.f)(v)?,
3390 }
3391 }
3392 }
3393}
3394
3395pin_project! {
3396 /// Stream for the [`StreamExt::zip()`] method.
3397 #[derive(Clone, Debug)]
3398 #[must_use = "streams do nothing unless polled"]
3399 pub struct Zip<A: Stream, B> {
3400 item_slot: Option<A::Item>,
3401 #[pin]
3402 first: A,
3403 #[pin]
3404 second: B,
3405 }
3406}
3407
3408impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3409 type Item = (A::Item, B::Item);
3410
3411 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3412 let this = self.project();
3413
3414 if this.item_slot.is_none() {
3415 match this.first.poll_next(cx) {
3416 Poll::Pending => return Poll::Pending,
3417 Poll::Ready(None) => return Poll::Ready(None),
3418 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3419 }
3420 }
3421
3422 let second_item = ready!(this.second.poll_next(cx));
3423 let first_item = this.item_slot.take().unwrap();
3424 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3425 }
3426}
3427
3428pin_project! {
3429 /// Future for the [`StreamExt::unzip()`] method.
3430 #[derive(Debug)]
3431 #[must_use = "futures do nothing unless you `.await` or poll them"]
3432 pub struct UnzipFuture<S, FromA, FromB> {
3433 #[pin]
3434 stream: S,
3435 res: Option<(FromA, FromB)>,
3436 }
3437}
3438
3439impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3440where
3441 S: Stream<Item = (A, B)>,
3442 FromA: Default + Extend<A>,
3443 FromB: Default + Extend<B>,
3444{
3445 type Output = (FromA, FromB);
3446
3447 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3448 let mut this = self.project();
3449
3450 loop {
3451 match ready!(this.stream.as_mut().poll_next(cx)) {
3452 Some((a, b)) => {
3453 let res = this.res.as_mut().unwrap();
3454 res.0.extend(Some(a));
3455 res.1.extend(Some(b));
3456 }
3457 None => return Poll::Ready(this.res.take().unwrap()),
3458 }
3459 }
3460 }
3461}
3462
3463/// Stream for the [`StreamExt::drain()`] method.
3464#[derive(Debug)]
3465#[must_use = "streams do nothing unless polled"]
3466pub struct Drain<'a, S: ?Sized> {
3467 stream: &'a mut S,
3468}
3469
3470impl<S: Unpin + ?Sized> Unpin for Drain<'_, S> {}
3471
3472impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3473 /// Get a reference to the underlying stream.
3474 ///
3475 /// ## Examples
3476 ///
3477 /// ```
3478 /// use futures_lite::{prelude::*, stream};
3479 ///
3480 /// # spin_on::spin_on(async {
3481 /// let mut s = stream::iter(vec![1, 2, 3]);
3482 /// let s2 = s.drain();
3483 ///
3484 /// let inner = s2.get_ref();
3485 /// // s and inner are the same.
3486 /// # });
3487 /// ```
3488 pub fn get_ref(&self) -> &S {
3489 &self.stream
3490 }
3491
3492 /// Get a mutable reference to the underlying stream.
3493 ///
3494 /// ## Examples
3495 ///
3496 /// ```
3497 /// use futures_lite::{prelude::*, stream};
3498 ///
3499 /// # spin_on::spin_on(async {
3500 /// let mut s = stream::iter(vec![1, 2, 3]);
3501 /// let mut s2 = s.drain();
3502 ///
3503 /// let inner = s2.get_mut();
3504 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3505 /// # });
3506 /// ```
3507 pub fn get_mut(&mut self) -> &mut S {
3508 &mut self.stream
3509 }
3510
3511 /// Consume this stream and get the underlying stream.
3512 ///
3513 /// ## Examples
3514 ///
3515 /// ```
3516 /// use futures_lite::{prelude::*, stream};
3517 ///
3518 /// # spin_on::spin_on(async {
3519 /// let mut s = stream::iter(vec![1, 2, 3]);
3520 /// let mut s2 = s.drain();
3521 ///
3522 /// let inner = s2.into_inner();
3523 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3524 /// # });
3525 /// ```
3526 pub fn into_inner(self) -> &'a mut S {
3527 self.stream
3528 }
3529}
3530
3531impl<S: Stream + Unpin + ?Sized> Stream for Drain<'_, S> {
3532 type Item = S::Item;
3533
3534 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3535 match self.stream.poll_next(cx) {
3536 Poll::Ready(x) => Poll::Ready(x),
3537 Poll::Pending => Poll::Ready(None),
3538 }
3539 }
3540
3541 fn size_hint(&self) -> (usize, Option<usize>) {
3542 let (_, hi) = self.stream.size_hint();
3543 (0, hi)
3544 }
3545}