aws_smithy_async/future/pagination_stream/
fn_stream.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Module to define utility to drive a stream with an async function and a channel.
7
8use crate::future::pagination_stream::collect::sealed::Collectable;
9use crate::future::rendezvous;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::future::poll_fn;
13use std::future::Future;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17pin_project! {
18    /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages
19    /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function
20    /// will not proceed until the stream is read.
21    ///
22    /// This utility is used by generated paginators to generate a stream of paginated results.
23    ///
24    /// If `tx.send` returns an error, the function MUST return immediately.
25    ///
26    /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that
27    /// is `Send` and returns `()` as output when it is done.
28    ///
29    /// # Examples
30    /// ```no_run
31    /// # async fn docs() {
32    /// use aws_smithy_async::future::pagination_stream::fn_stream::FnStream;
33    /// let mut stream = FnStream::new(|tx| Box::pin(async move {
34    ///     if let Err(_) = tx.send("Hello!").await {
35    ///         return;
36    ///     }
37    ///     if let Err(_) = tx.send("Goodbye!").await {
38    ///         return;
39    ///     }
40    /// }));
41    /// assert_eq!(stream.collect::<Vec<_>>().await, vec!["Hello!", "Goodbye!"]);
42    /// # }
43    pub struct FnStream<Item> {
44        #[pin]
45        rx: rendezvous::Receiver<Item>,
46        generator: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
47    }
48}
49
50impl<Item> fmt::Debug for FnStream<Item> {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        let item_typename = std::any::type_name::<Item>();
53        write!(f, "FnStream<{item_typename}>")
54    }
55}
56
57impl<Item> FnStream<Item> {
58    /// Creates a new function based stream driven by `generator`.
59    ///
60    /// For examples, see the documentation for [`FnStream`]
61    pub fn new<T>(generator: T) -> Self
62    where
63        T: FnOnce(rendezvous::Sender<Item>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
64    {
65        let (tx, rx) = rendezvous::channel::<Item>();
66        Self {
67            rx,
68            generator: Some(Box::pin(generator(tx))),
69        }
70    }
71
72    /// Consumes and returns the next `Item` from this stream.
73    pub async fn next(&mut self) -> Option<Item>
74    where
75        Self: Unpin,
76    {
77        let mut me = Pin::new(self);
78        poll_fn(|cx| me.as_mut().poll_next(cx)).await
79    }
80
81    /// Attempts to pull out the next value of this stream, returning `None` if the stream is
82    /// exhausted.
83    pub(crate) fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Item>> {
84        let mut me = self.project();
85        match me.rx.poll_recv(cx) {
86            Poll::Ready(item) => Poll::Ready(item),
87            Poll::Pending => {
88                if let Some(generator) = me.generator {
89                    if generator.as_mut().poll(cx).is_ready() {
90                        // `generator` keeps writing items to `tx` and will not be `Poll::Ready`
91                        // until it is done writing to `tx`. Once it is done, it returns `()`
92                        // as output and is `Poll::Ready`, at which point we MUST NOT poll it again
93                        // since doing so will cause a panic.
94                        *me.generator = None;
95                    }
96                }
97                Poll::Pending
98            }
99        }
100    }
101
102    /// Consumes this stream and gathers elements into a collection.
103    pub async fn collect<T: Collectable<Item>>(mut self) -> T {
104        let mut collection = T::initialize();
105        while let Some(item) = self.next().await {
106            if !T::extend(&mut collection, item) {
107                break;
108            }
109        }
110        T::finalize(collection)
111    }
112}
113
114impl<T, E> FnStream<Result<T, E>> {
115    /// Yields the next item in the stream or returns an error if an error is encountered.
116    pub async fn try_next(&mut self) -> Result<Option<T>, E> {
117        self.next().await.transpose()
118    }
119
120    /// Convenience method for `.collect::<Result<Vec<_>, _>()`.
121    pub async fn try_collect(self) -> Result<Vec<T>, E> {
122        self.collect::<Result<Vec<T>, E>>().await
123    }
124}