aws_smithy_async/future/pagination_stream/fn_stream.rs
1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Module to define utility to drive a stream with an async function and a channel.
7
8use crate::future::pagination_stream::collect::sealed::Collectable;
9use crate::future::rendezvous;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::future::poll_fn;
13use std::future::Future;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17pin_project! {
18 /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages
19 /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function
20 /// will not proceed until the stream is read.
21 ///
22 /// This utility is used by generated paginators to generate a stream of paginated results.
23 ///
24 /// If `tx.send` returns an error, the function MUST return immediately.
25 ///
26 /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that
27 /// is `Send` and returns `()` as output when it is done.
28 ///
29 /// # Examples
30 /// ```no_run
31 /// # async fn docs() {
32 /// use aws_smithy_async::future::pagination_stream::fn_stream::FnStream;
33 /// let mut stream = FnStream::new(|tx| Box::pin(async move {
34 /// if let Err(_) = tx.send("Hello!").await {
35 /// return;
36 /// }
37 /// if let Err(_) = tx.send("Goodbye!").await {
38 /// return;
39 /// }
40 /// }));
41 /// assert_eq!(stream.collect::<Vec<_>>().await, vec!["Hello!", "Goodbye!"]);
42 /// # }
43 pub struct FnStream<Item> {
44 #[pin]
45 rx: rendezvous::Receiver<Item>,
46 generator: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
47 }
48}
49
50impl<Item> fmt::Debug for FnStream<Item> {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 let item_typename = std::any::type_name::<Item>();
53 write!(f, "FnStream<{item_typename}>")
54 }
55}
56
57impl<Item> FnStream<Item> {
58 /// Creates a new function based stream driven by `generator`.
59 ///
60 /// For examples, see the documentation for [`FnStream`]
61 pub fn new<T>(generator: T) -> Self
62 where
63 T: FnOnce(rendezvous::Sender<Item>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
64 {
65 let (tx, rx) = rendezvous::channel::<Item>();
66 Self {
67 rx,
68 generator: Some(Box::pin(generator(tx))),
69 }
70 }
71
72 /// Consumes and returns the next `Item` from this stream.
73 pub async fn next(&mut self) -> Option<Item>
74 where
75 Self: Unpin,
76 {
77 let mut me = Pin::new(self);
78 poll_fn(|cx| me.as_mut().poll_next(cx)).await
79 }
80
81 /// Attempts to pull out the next value of this stream, returning `None` if the stream is
82 /// exhausted.
83 pub(crate) fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Item>> {
84 let mut me = self.project();
85 match me.rx.poll_recv(cx) {
86 Poll::Ready(item) => Poll::Ready(item),
87 Poll::Pending => {
88 if let Some(generator) = me.generator {
89 if generator.as_mut().poll(cx).is_ready() {
90 // `generator` keeps writing items to `tx` and will not be `Poll::Ready`
91 // until it is done writing to `tx`. Once it is done, it returns `()`
92 // as output and is `Poll::Ready`, at which point we MUST NOT poll it again
93 // since doing so will cause a panic.
94 *me.generator = None;
95 }
96 }
97 Poll::Pending
98 }
99 }
100 }
101
102 /// Consumes this stream and gathers elements into a collection.
103 pub async fn collect<T: Collectable<Item>>(mut self) -> T {
104 let mut collection = T::initialize();
105 while let Some(item) = self.next().await {
106 if !T::extend(&mut collection, item) {
107 break;
108 }
109 }
110 T::finalize(collection)
111 }
112}
113
114impl<T, E> FnStream<Result<T, E>> {
115 /// Yields the next item in the stream or returns an error if an error is encountered.
116 pub async fn try_next(&mut self) -> Result<Option<T>, E> {
117 self.next().await.transpose()
118 }
119
120 /// Convenience method for `.collect::<Result<Vec<_>, _>()`.
121 pub async fn try_collect(self) -> Result<Vec<T>, E> {
122 self.collect::<Result<Vec<T>, E>>().await
123 }
124}