alloy_provider/
heart.rs

1//! Block heartbeat and pending transaction watcher.
2
3use crate::{blocks::Paused, Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8    map::{B256HashMap, B256HashSet},
9    TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{future::pending, stream::StreamExt, FutureExt, Stream};
13use std::{
14    collections::{BTreeMap, VecDeque},
15    fmt,
16    future::Future,
17    sync::Arc,
18    time::Duration,
19};
20use tokio::{
21    select,
22    sync::{mpsc, oneshot},
23};
24
25#[cfg(target_family = "wasm")]
26use wasmtimer::{
27    std::Instant,
28    tokio::{interval, sleep_until},
29};
30
31#[cfg(not(target_family = "wasm"))]
32use {
33    std::time::Instant,
34    tokio::time::{interval, sleep_until},
35};
36
37/// Errors which may occur when watching a pending transaction.
38#[derive(Debug, thiserror::Error)]
39pub enum PendingTransactionError {
40    /// Failed to register pending transaction in heartbeat.
41    #[error("failed to register pending transaction to watch")]
42    FailedToRegister,
43
44    /// Underlying transport error.
45    #[error(transparent)]
46    TransportError(#[from] TransportError),
47
48    /// Error occurred while getting response from the heartbeat.
49    #[error(transparent)]
50    Recv(#[from] oneshot::error::RecvError),
51
52    /// Errors that may occur when watching a transaction.
53    #[error(transparent)]
54    TxWatcher(#[from] WatchTxError),
55}
56
57/// A builder for configuring a pending transaction watcher.
58///
59/// # Examples
60///
61/// Send and wait for a transaction to be confirmed 2 times, with a timeout of 60 seconds:
62///
63/// ```no_run
64/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
65/// // Send a transaction, and configure the pending transaction.
66/// let builder = provider.send_transaction(tx)
67///     .await?
68///     .with_required_confirmations(2)
69///     .with_timeout(Some(std::time::Duration::from_secs(60)));
70/// // Register the pending transaction with the provider.
71/// let pending_tx = builder.register().await?;
72/// // Wait for the transaction to be confirmed 2 times.
73/// let tx_hash = pending_tx.await?;
74/// # Ok(())
75/// # }
76/// ```
77///
78/// This can also be more concisely written using `watch`:
79/// ```no_run
80/// # async fn example<N: alloy_network::Network>(provider: impl alloy_provider::Provider, tx: alloy_rpc_types_eth::transaction::TransactionRequest) -> Result<(), Box<dyn std::error::Error>> {
81/// let tx_hash = provider.send_transaction(tx)
82///     .await?
83///     .with_required_confirmations(2)
84///     .with_timeout(Some(std::time::Duration::from_secs(60)))
85///     .watch()
86///     .await?;
87/// # Ok(())
88/// # }
89/// ```
90#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
91#[derive(Debug)]
92#[doc(alias = "PendingTxBuilder")]
93pub struct PendingTransactionBuilder<N: Network> {
94    config: PendingTransactionConfig,
95    provider: RootProvider<N>,
96}
97
98impl<N: Network> PendingTransactionBuilder<N> {
99    /// Creates a new pending transaction builder.
100    pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
101        Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
102    }
103
104    /// Creates a new pending transaction builder from the given configuration.
105    pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
106        Self { config, provider }
107    }
108
109    /// Returns the inner configuration.
110    pub const fn inner(&self) -> &PendingTransactionConfig {
111        &self.config
112    }
113
114    /// Consumes this builder, returning the inner configuration.
115    pub fn into_inner(self) -> PendingTransactionConfig {
116        self.config
117    }
118
119    /// Returns the provider.
120    pub const fn provider(&self) -> &RootProvider<N> {
121        &self.provider
122    }
123
124    /// Consumes this builder, returning the provider and the configuration.
125    pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
126        (self.provider, self.config)
127    }
128
129    /// Calls a function with a reference to the value.
130    pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
131        f(&self);
132        self
133    }
134
135    /// Returns the transaction hash.
136    #[doc(alias = "transaction_hash")]
137    pub const fn tx_hash(&self) -> &TxHash {
138        self.config.tx_hash()
139    }
140
141    /// Sets the transaction hash.
142    #[doc(alias = "set_transaction_hash")]
143    pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
144        self.config.set_tx_hash(tx_hash);
145    }
146
147    /// Sets the transaction hash.
148    #[doc(alias = "with_transaction_hash")]
149    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
150        self.config.tx_hash = tx_hash;
151        self
152    }
153
154    /// Returns the number of confirmations to wait for.
155    #[doc(alias = "confirmations")]
156    pub const fn required_confirmations(&self) -> u64 {
157        self.config.required_confirmations()
158    }
159
160    /// Sets the number of confirmations to wait for.
161    #[doc(alias = "set_confirmations")]
162    pub const fn set_required_confirmations(&mut self, confirmations: u64) {
163        self.config.set_required_confirmations(confirmations);
164    }
165
166    /// Sets the number of confirmations to wait for.
167    #[doc(alias = "with_confirmations")]
168    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
169        self.config.required_confirmations = confirmations;
170        self
171    }
172
173    /// Returns the timeout.
174    pub const fn timeout(&self) -> Option<Duration> {
175        self.config.timeout()
176    }
177
178    /// Sets the timeout.
179    pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
180        self.config.set_timeout(timeout);
181    }
182
183    /// Sets the timeout.
184    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185        self.config.timeout = timeout;
186        self
187    }
188
189    /// Registers the watching configuration with the provider.
190    ///
191    /// This does not wait for the transaction to be confirmed, but returns a [`PendingTransaction`]
192    /// that can be awaited at a later moment.
193    ///
194    /// See:
195    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
196    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
197    ///   confirmed.
198    #[doc(alias = "build")]
199    pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
200        self.provider.watch_pending_transaction(self.config).await
201    }
202
203    /// Waits for the transaction to confirm with the given number of confirmations.
204    ///
205    /// See:
206    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
207    ///   confirmed.
208    /// - [`get_receipt`](Self::get_receipt) for fetching the receipt after the transaction has been
209    ///   confirmed.
210    pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
211        self.register().await?.await
212    }
213
214    /// Waits for the transaction to confirm with the given number of confirmations, and
215    /// then fetches its receipt.
216    ///
217    /// Note that this method will call `eth_getTransactionReceipt` on the [**root
218    /// provider**](RootProvider), and not on a specific network provider. This means that any
219    /// overrides or customizations made to the network provider will not be used.
220    ///
221    /// See:
222    /// - [`register`](Self::register): for registering the transaction without waiting for it to be
223    ///   confirmed.
224    /// - [`watch`](Self::watch) for watching the transaction without fetching the receipt.
225    pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
226        let hash = self.config.tx_hash;
227        let required_confirmations = self.config.required_confirmations;
228        let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
229
230        // FIXME: this is a hotfix to prevent a race condition where the heartbeat would miss the
231        // block the tx was mined in. Only apply this for single confirmation to respect the
232        // confirmation setting.
233        let mut interval = interval(self.provider.client().poll_interval());
234
235        loop {
236            let mut confirmed = false;
237
238            // If more than 1 block confirmations is specified then we can rely on the regular
239            // watch_pending_transaction and dont need this workaround for the above mentioned race
240            // condition
241            let tick_fut = if required_confirmations > 1 {
242                pending::<()>().right_future()
243            } else {
244                interval.tick().map(|_| ()).left_future()
245            };
246
247            select! {
248                _ = tick_fut => {},
249                res = &mut pending_tx => {
250                    let _ = res?;
251                    confirmed = true;
252                }
253            }
254
255            // try to fetch the receipt
256            let receipt = self.provider.get_transaction_receipt(hash).await?;
257            if let Some(receipt) = receipt {
258                return Ok(receipt);
259            }
260
261            if confirmed {
262                return Err(RpcError::NullResp.into());
263            }
264        }
265    }
266}
267
268/// Configuration for watching a pending transaction.
269///
270/// This type can be used to create a [`PendingTransactionBuilder`], but in general it is only used
271/// internally.
272#[must_use = "this type does nothing unless you call `with_provider`"]
273#[derive(Clone, Debug)]
274#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
275pub struct PendingTransactionConfig {
276    /// The transaction hash to watch for.
277    #[doc(alias = "transaction_hash")]
278    tx_hash: TxHash,
279
280    /// Require a number of confirmations.
281    required_confirmations: u64,
282
283    /// Optional timeout for the transaction.
284    timeout: Option<Duration>,
285}
286
287impl PendingTransactionConfig {
288    /// Create a new watch for a transaction.
289    pub const fn new(tx_hash: TxHash) -> Self {
290        Self { tx_hash, required_confirmations: 1, timeout: None }
291    }
292
293    /// Returns the transaction hash.
294    #[doc(alias = "transaction_hash")]
295    pub const fn tx_hash(&self) -> &TxHash {
296        &self.tx_hash
297    }
298
299    /// Sets the transaction hash.
300    #[doc(alias = "set_transaction_hash")]
301    pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
302        self.tx_hash = tx_hash;
303    }
304
305    /// Sets the transaction hash.
306    #[doc(alias = "with_transaction_hash")]
307    pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
308        self.tx_hash = tx_hash;
309        self
310    }
311
312    /// Returns the number of confirmations to wait for.
313    #[doc(alias = "confirmations")]
314    pub const fn required_confirmations(&self) -> u64 {
315        self.required_confirmations
316    }
317
318    /// Sets the number of confirmations to wait for.
319    #[doc(alias = "set_confirmations")]
320    pub const fn set_required_confirmations(&mut self, confirmations: u64) {
321        self.required_confirmations = confirmations;
322    }
323
324    /// Sets the number of confirmations to wait for.
325    #[doc(alias = "with_confirmations")]
326    pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
327        self.required_confirmations = confirmations;
328        self
329    }
330
331    /// Returns the timeout.
332    pub const fn timeout(&self) -> Option<Duration> {
333        self.timeout
334    }
335
336    /// Sets the timeout.
337    pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
338        self.timeout = timeout;
339    }
340
341    /// Sets the timeout.
342    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
343        self.timeout = timeout;
344        self
345    }
346
347    /// Wraps this configuration with a provider to expose watching methods.
348    pub const fn with_provider<N: Network>(
349        self,
350        provider: RootProvider<N>,
351    ) -> PendingTransactionBuilder<N> {
352        PendingTransactionBuilder::from_config(provider, self)
353    }
354}
355
356impl From<TxHash> for PendingTransactionConfig {
357    fn from(tx_hash: TxHash) -> Self {
358        Self::new(tx_hash)
359    }
360}
361
362/// Errors which may occur in heartbeat when watching a transaction.
363#[derive(Debug, thiserror::Error)]
364pub enum WatchTxError {
365    /// Transaction was not confirmed after configured timeout.
366    #[error("transaction was not confirmed within the timeout")]
367    Timeout,
368}
369
370/// The type sent by the [`HeartbeatHandle`] to the [`Heartbeat`] background task.
371#[doc(alias = "TransactionWatcher")]
372struct TxWatcher {
373    config: PendingTransactionConfig,
374    /// The block at which the transaction was received. To be filled once known.
375    /// Invariant: any confirmed transaction in `Heart` has this value set.
376    received_at_block: Option<u64>,
377    tx: oneshot::Sender<Result<(), WatchTxError>>,
378}
379
380impl TxWatcher {
381    /// Notify the waiter.
382    fn notify(self, result: Result<(), WatchTxError>) {
383        debug!(tx=%self.config.tx_hash, "notifying");
384        let _ = self.tx.send(result);
385    }
386}
387
388/// Represents a transaction that is yet to be confirmed a specified number of times.
389///
390/// This struct is a future created by [`PendingTransactionBuilder`] that resolves to the
391/// transaction hash once the underlying transaction has been confirmed the specified number of
392/// times in the network.
393#[doc(alias = "PendingTx", alias = "TxPending")]
394pub struct PendingTransaction {
395    /// The transaction hash.
396    #[doc(alias = "transaction_hash")]
397    pub(crate) tx_hash: TxHash,
398    /// The receiver for the notification.
399    // TODO: send a receipt?
400    pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
401}
402
403impl fmt::Debug for PendingTransaction {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
406    }
407}
408
409impl PendingTransaction {
410    /// Creates a ready pending transaction.
411    pub fn ready(tx_hash: TxHash) -> Self {
412        let (tx, rx) = oneshot::channel();
413        tx.send(Ok(())).ok(); // Make sure that the receiver is notified already.
414        Self { tx_hash, rx }
415    }
416
417    /// Returns this transaction's hash.
418    #[doc(alias = "transaction_hash")]
419    pub const fn tx_hash(&self) -> &TxHash {
420        &self.tx_hash
421    }
422}
423
424impl Future for PendingTransaction {
425    type Output = Result<TxHash, PendingTransactionError>;
426
427    fn poll(
428        mut self: std::pin::Pin<&mut Self>,
429        cx: &mut std::task::Context<'_>,
430    ) -> std::task::Poll<Self::Output> {
431        self.rx.poll_unpin(cx).map(|res| {
432            res??;
433            Ok(self.tx_hash)
434        })
435    }
436}
437
438/// A handle to the heartbeat task.
439#[derive(Clone, Debug)]
440pub(crate) struct HeartbeatHandle {
441    tx: mpsc::Sender<TxWatcher>,
442}
443
444impl HeartbeatHandle {
445    /// Watch for a transaction to be confirmed with the given config.
446    #[doc(alias = "watch_transaction")]
447    pub(crate) async fn watch_tx(
448        &self,
449        config: PendingTransactionConfig,
450        received_at_block: Option<u64>,
451    ) -> Result<PendingTransaction, PendingTransactionConfig> {
452        let (tx, rx) = oneshot::channel();
453        let tx_hash = config.tx_hash;
454        match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
455            Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
456            Err(e) => Err(e.0.config),
457        }
458    }
459}
460
461/// A heartbeat task that receives blocks and watches for transactions.
462pub(crate) struct Heartbeat<N, S> {
463    /// The stream of incoming blocks to watch.
464    stream: futures::stream::Fuse<S>,
465
466    /// Lookbehind blocks in form of mapping block number -> vector of transaction hashes.
467    past_blocks: VecDeque<(u64, B256HashSet)>,
468
469    /// Transactions to watch for.
470    unconfirmed: B256HashMap<TxWatcher>,
471
472    /// Ordered map of transactions waiting for confirmations.
473    waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
474
475    /// Ordered map of transactions to reap at a certain time.
476    reap_at: BTreeMap<Instant, B256>,
477
478    /// Whether the heartbeat is currently paused.
479    paused: Arc<Paused>,
480
481    _network: std::marker::PhantomData<N>,
482}
483
484impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
485    /// Create a new heartbeat task.
486    pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
487        Self {
488            stream: stream.fuse(),
489            past_blocks: Default::default(),
490            unconfirmed: Default::default(),
491            waiting_confs: Default::default(),
492            reap_at: Default::default(),
493            paused: is_paused,
494            _network: Default::default(),
495        }
496    }
497
498    /// Check if any transactions have enough confirmations to notify.
499    fn check_confirmations(&mut self, current_height: u64) {
500        let to_keep = self.waiting_confs.split_off(&(current_height + 1));
501        let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
502        for watcher in to_notify.into_values().flatten() {
503            watcher.notify(Ok(()));
504        }
505    }
506
507    /// Get the next time to reap a transaction. If no reaps, this is a very
508    /// long time from now (i.e. will not be woken).
509    fn next_reap(&self) -> Instant {
510        self.reap_at
511            .first_key_value()
512            .map(|(k, _)| *k)
513            .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
514    }
515
516    /// Reap any timeout
517    fn reap_timeouts(&mut self) {
518        let now = Instant::now();
519        let to_keep = self.reap_at.split_off(&now);
520        let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
521
522        for tx_hash in to_reap.values() {
523            if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
524                debug!(tx=%tx_hash, "reaped");
525                watcher.notify(Err(WatchTxError::Timeout));
526            }
527        }
528    }
529
530    /// Reap transactions overridden by the reorg.
531    /// Accepts new chain height as an argument, and drops any subscriptions
532    /// that were received in blocks affected by the reorg (e.g. >= new_height).
533    fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
534        for waiters in self.waiting_confs.values_mut() {
535            *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
536                if let Some(received_at_block) = watcher.received_at_block {
537                    // All blocks after and _including_ the new height are reaped.
538                    if received_at_block >= new_height {
539                        let hash = watcher.config.tx_hash;
540                        debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
541                        self.unconfirmed.insert(hash, watcher);
542                        return None;
543                    }
544                }
545                Some(watcher)
546            }).collect();
547        }
548    }
549
550    /// Check if we have any pending transactions.
551    fn has_pending_transactions(&self) -> bool {
552        !self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
553    }
554
555    /// Update the pause state based on whether we have pending transactions.
556    fn update_pause_state(&mut self) {
557        let should_pause = !self.has_pending_transactions();
558        if self.paused.is_paused() != should_pause {
559            debug!(paused = should_pause, "updating heartbeat pause state");
560            self.paused.set_paused(should_pause);
561        }
562    }
563
564    /// Handle a watch instruction by adding it to the watch list, and
565    /// potentially adding it to our `reap_at` list.
566    fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
567        // Start watching for the transaction.
568        debug!(tx=%to_watch.config.tx_hash, "watching");
569        trace!(?to_watch.config, ?to_watch.received_at_block);
570        if let Some(received_at_block) = to_watch.received_at_block {
571            // Transaction is already confirmed, we just need to wait for the required
572            // confirmations.
573            let confirmations = to_watch.config.required_confirmations;
574            let confirmed_at = received_at_block + confirmations - 1;
575            let current_height =
576                self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
577
578            if confirmed_at <= current_height {
579                to_watch.notify(Ok(()));
580            } else {
581                self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
582            }
583            return;
584        }
585
586        if let Some(timeout) = to_watch.config.timeout {
587            self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
588        }
589        // Transaction may be confirmed already, check the lookbehind history first.
590        // If so, insert it into the waiting list.
591        for (block_height, txs) in self.past_blocks.iter().rev() {
592            if txs.contains(&to_watch.config.tx_hash) {
593                let confirmations = to_watch.config.required_confirmations;
594                let confirmed_at = *block_height + confirmations - 1;
595                let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
596
597                if confirmed_at <= current_height {
598                    to_watch.notify(Ok(()));
599                } else {
600                    debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
601                    // Ensure reorg handling can move this watcher back if needed.
602                    let mut to_watch = to_watch;
603                    if to_watch.received_at_block.is_none() {
604                        to_watch.received_at_block = Some(*block_height);
605                    }
606                    self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
607                }
608                return;
609            }
610        }
611
612        self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
613    }
614
615    fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
616        let confirmations = watcher.config.required_confirmations;
617        debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
618        self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
619    }
620
621    /// Handle a new block by checking if any of the transactions we're
622    /// watching are in it, and if so, notifying the watcher. Also updates
623    /// the latest block.
624    fn handle_new_block(&mut self, block: N::BlockResponse) {
625        let block_height = block.header().as_ref().number();
626        debug!(%block_height, "handling block");
627
628        // Add the block the lookbehind.
629        // The value is chosen arbitrarily to not have a huge memory footprint but still
630        // catch most cases where user subscribes for an already mined transaction.
631        // Note that we expect provider to check whether transaction is already mined
632        // before subscribing, so here we only need to consider time before sending a notification
633        // and processing it.
634        const MAX_BLOCKS_TO_RETAIN: usize = 10;
635        if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
636            self.past_blocks.pop_front();
637        }
638        if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
639            // Check that the chain is continuous.
640            if *last_height + 1 != block_height {
641                // Move all the transactions that were reset by the reorg to the unconfirmed list.
642                // This can also happen if we unpaused the heartbeat after some time.
643                debug!(block_height, last_height, "reorg/unpause detected");
644                self.move_reorg_to_unconfirmed(block_height);
645                // Remove past blocks that are now invalid.
646                self.past_blocks.retain(|(h, _)| *h < block_height);
647            }
648        }
649        self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
650
651        // Check if we are watching for any of the transactions in this block.
652        let to_check: Vec<_> = block
653            .transactions()
654            .hashes()
655            .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
656            .collect();
657        for mut watcher in to_check {
658            // If `confirmations` is not more than 1 we can notify the watcher immediately.
659            let confirmations = watcher.config.required_confirmations;
660            if confirmations <= 1 {
661                watcher.notify(Ok(()));
662                continue;
663            }
664            // Otherwise add it to the waiting list.
665
666            // Set the block at which the transaction was received.
667            if let Some(set_block) = watcher.received_at_block {
668                warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
669                // We don't override the set value.
670            } else {
671                watcher.received_at_block = Some(block_height);
672            }
673            self.add_to_waiting_list(watcher, block_height);
674        }
675
676        self.check_confirmations(block_height);
677    }
678}
679
680#[cfg(target_family = "wasm")]
681impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
682    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
683    pub(crate) fn spawn(self) -> HeartbeatHandle {
684        let (task, handle) = self.consume();
685        task.spawn_task();
686        handle
687    }
688}
689
690#[cfg(not(target_family = "wasm"))]
691impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
692    /// Spawn the heartbeat task, returning a [`HeartbeatHandle`].
693    pub(crate) fn spawn(self) -> HeartbeatHandle {
694        let (task, handle) = self.consume();
695        task.spawn_task();
696        handle
697    }
698}
699
700impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
701    fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
702        let (ix_tx, ixns) = mpsc::channel(64);
703        (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
704    }
705
706    async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
707        'shutdown: loop {
708            {
709                self.update_pause_state();
710
711                let next_reap = self.next_reap();
712                let sleep = std::pin::pin!(sleep_until(next_reap.into()));
713
714                // We bias the select so that we always handle new messages
715                // before checking blocks, and reap timeouts are last.
716                select! {
717                    biased;
718
719                    // Watch for new transactions.
720                    ix_opt = ixns.recv() => match ix_opt {
721                        Some(to_watch) => self.handle_watch_ix(to_watch),
722                        None => break 'shutdown, // ix channel is closed
723                    },
724
725                    // Wake up to handle new blocks.
726                    Some(block) = self.stream.next() => {
727                        self.handle_new_block(block);
728                    },
729
730                    // This arm ensures we always wake up to reap timeouts,
731                    // even if there are no other events.
732                    _ = sleep => {},
733                }
734            }
735
736            // Always reap timeouts
737            self.reap_timeouts();
738        }
739    }
740}