1use crate::{blocks::Paused, Provider, RootProvider};
4use alloy_consensus::BlockHeader;
5use alloy_json_rpc::RpcError;
6use alloy_network::{BlockResponse, Network};
7use alloy_primitives::{
8 map::{B256HashMap, B256HashSet},
9 TxHash, B256,
10};
11use alloy_transport::{utils::Spawnable, TransportError};
12use futures::{future::pending, stream::StreamExt, FutureExt, Stream};
13use std::{
14 collections::{BTreeMap, VecDeque},
15 fmt,
16 future::Future,
17 sync::Arc,
18 time::Duration,
19};
20use tokio::{
21 select,
22 sync::{mpsc, oneshot},
23};
24
25#[cfg(target_family = "wasm")]
26use wasmtimer::{
27 std::Instant,
28 tokio::{interval, sleep_until},
29};
30
31#[cfg(not(target_family = "wasm"))]
32use {
33 std::time::Instant,
34 tokio::time::{interval, sleep_until},
35};
36
37#[derive(Debug, thiserror::Error)]
39pub enum PendingTransactionError {
40 #[error("failed to register pending transaction to watch")]
42 FailedToRegister,
43
44 #[error(transparent)]
46 TransportError(#[from] TransportError),
47
48 #[error(transparent)]
50 Recv(#[from] oneshot::error::RecvError),
51
52 #[error(transparent)]
54 TxWatcher(#[from] WatchTxError),
55}
56
57#[must_use = "this type does nothing unless you call `register`, `watch` or `get_receipt`"]
91#[derive(Debug)]
92#[doc(alias = "PendingTxBuilder")]
93pub struct PendingTransactionBuilder<N: Network> {
94 config: PendingTransactionConfig,
95 provider: RootProvider<N>,
96}
97
98impl<N: Network> PendingTransactionBuilder<N> {
99 pub const fn new(provider: RootProvider<N>, tx_hash: TxHash) -> Self {
101 Self::from_config(provider, PendingTransactionConfig::new(tx_hash))
102 }
103
104 pub const fn from_config(provider: RootProvider<N>, config: PendingTransactionConfig) -> Self {
106 Self { config, provider }
107 }
108
109 pub const fn inner(&self) -> &PendingTransactionConfig {
111 &self.config
112 }
113
114 pub fn into_inner(self) -> PendingTransactionConfig {
116 self.config
117 }
118
119 pub const fn provider(&self) -> &RootProvider<N> {
121 &self.provider
122 }
123
124 pub fn split(self) -> (RootProvider<N>, PendingTransactionConfig) {
126 (self.provider, self.config)
127 }
128
129 pub fn inspect<F: FnOnce(&Self)>(self, f: F) -> Self {
131 f(&self);
132 self
133 }
134
135 #[doc(alias = "transaction_hash")]
137 pub const fn tx_hash(&self) -> &TxHash {
138 self.config.tx_hash()
139 }
140
141 #[doc(alias = "set_transaction_hash")]
143 pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
144 self.config.set_tx_hash(tx_hash);
145 }
146
147 #[doc(alias = "with_transaction_hash")]
149 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
150 self.config.tx_hash = tx_hash;
151 self
152 }
153
154 #[doc(alias = "confirmations")]
156 pub const fn required_confirmations(&self) -> u64 {
157 self.config.required_confirmations()
158 }
159
160 #[doc(alias = "set_confirmations")]
162 pub const fn set_required_confirmations(&mut self, confirmations: u64) {
163 self.config.set_required_confirmations(confirmations);
164 }
165
166 #[doc(alias = "with_confirmations")]
168 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
169 self.config.required_confirmations = confirmations;
170 self
171 }
172
173 pub const fn timeout(&self) -> Option<Duration> {
175 self.config.timeout()
176 }
177
178 pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
180 self.config.set_timeout(timeout);
181 }
182
183 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185 self.config.timeout = timeout;
186 self
187 }
188
189 #[doc(alias = "build")]
199 pub async fn register(self) -> Result<PendingTransaction, PendingTransactionError> {
200 self.provider.watch_pending_transaction(self.config).await
201 }
202
203 pub async fn watch(self) -> Result<TxHash, PendingTransactionError> {
211 self.register().await?.await
212 }
213
214 pub async fn get_receipt(self) -> Result<N::ReceiptResponse, PendingTransactionError> {
226 let hash = self.config.tx_hash;
227 let required_confirmations = self.config.required_confirmations;
228 let mut pending_tx = self.provider.watch_pending_transaction(self.config).await?;
229
230 let mut interval = interval(self.provider.client().poll_interval());
234
235 loop {
236 let mut confirmed = false;
237
238 let tick_fut = if required_confirmations > 1 {
242 pending::<()>().right_future()
243 } else {
244 interval.tick().map(|_| ()).left_future()
245 };
246
247 select! {
248 _ = tick_fut => {},
249 res = &mut pending_tx => {
250 let _ = res?;
251 confirmed = true;
252 }
253 }
254
255 let receipt = self.provider.get_transaction_receipt(hash).await?;
257 if let Some(receipt) = receipt {
258 return Ok(receipt);
259 }
260
261 if confirmed {
262 return Err(RpcError::NullResp.into());
263 }
264 }
265 }
266}
267
268#[must_use = "this type does nothing unless you call `with_provider`"]
273#[derive(Clone, Debug)]
274#[doc(alias = "PendingTxConfig", alias = "TxPendingConfig")]
275pub struct PendingTransactionConfig {
276 #[doc(alias = "transaction_hash")]
278 tx_hash: TxHash,
279
280 required_confirmations: u64,
282
283 timeout: Option<Duration>,
285}
286
287impl PendingTransactionConfig {
288 pub const fn new(tx_hash: TxHash) -> Self {
290 Self { tx_hash, required_confirmations: 1, timeout: None }
291 }
292
293 #[doc(alias = "transaction_hash")]
295 pub const fn tx_hash(&self) -> &TxHash {
296 &self.tx_hash
297 }
298
299 #[doc(alias = "set_transaction_hash")]
301 pub const fn set_tx_hash(&mut self, tx_hash: TxHash) {
302 self.tx_hash = tx_hash;
303 }
304
305 #[doc(alias = "with_transaction_hash")]
307 pub const fn with_tx_hash(mut self, tx_hash: TxHash) -> Self {
308 self.tx_hash = tx_hash;
309 self
310 }
311
312 #[doc(alias = "confirmations")]
314 pub const fn required_confirmations(&self) -> u64 {
315 self.required_confirmations
316 }
317
318 #[doc(alias = "set_confirmations")]
320 pub const fn set_required_confirmations(&mut self, confirmations: u64) {
321 self.required_confirmations = confirmations;
322 }
323
324 #[doc(alias = "with_confirmations")]
326 pub const fn with_required_confirmations(mut self, confirmations: u64) -> Self {
327 self.required_confirmations = confirmations;
328 self
329 }
330
331 pub const fn timeout(&self) -> Option<Duration> {
333 self.timeout
334 }
335
336 pub const fn set_timeout(&mut self, timeout: Option<Duration>) {
338 self.timeout = timeout;
339 }
340
341 pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
343 self.timeout = timeout;
344 self
345 }
346
347 pub const fn with_provider<N: Network>(
349 self,
350 provider: RootProvider<N>,
351 ) -> PendingTransactionBuilder<N> {
352 PendingTransactionBuilder::from_config(provider, self)
353 }
354}
355
356impl From<TxHash> for PendingTransactionConfig {
357 fn from(tx_hash: TxHash) -> Self {
358 Self::new(tx_hash)
359 }
360}
361
362#[derive(Debug, thiserror::Error)]
364pub enum WatchTxError {
365 #[error("transaction was not confirmed within the timeout")]
367 Timeout,
368}
369
370#[doc(alias = "TransactionWatcher")]
372struct TxWatcher {
373 config: PendingTransactionConfig,
374 received_at_block: Option<u64>,
377 tx: oneshot::Sender<Result<(), WatchTxError>>,
378}
379
380impl TxWatcher {
381 fn notify(self, result: Result<(), WatchTxError>) {
383 debug!(tx=%self.config.tx_hash, "notifying");
384 let _ = self.tx.send(result);
385 }
386}
387
388#[doc(alias = "PendingTx", alias = "TxPending")]
394pub struct PendingTransaction {
395 #[doc(alias = "transaction_hash")]
397 pub(crate) tx_hash: TxHash,
398 pub(crate) rx: oneshot::Receiver<Result<(), WatchTxError>>,
401}
402
403impl fmt::Debug for PendingTransaction {
404 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405 f.debug_struct("PendingTransaction").field("tx_hash", &self.tx_hash).finish()
406 }
407}
408
409impl PendingTransaction {
410 pub fn ready(tx_hash: TxHash) -> Self {
412 let (tx, rx) = oneshot::channel();
413 tx.send(Ok(())).ok(); Self { tx_hash, rx }
415 }
416
417 #[doc(alias = "transaction_hash")]
419 pub const fn tx_hash(&self) -> &TxHash {
420 &self.tx_hash
421 }
422}
423
424impl Future for PendingTransaction {
425 type Output = Result<TxHash, PendingTransactionError>;
426
427 fn poll(
428 mut self: std::pin::Pin<&mut Self>,
429 cx: &mut std::task::Context<'_>,
430 ) -> std::task::Poll<Self::Output> {
431 self.rx.poll_unpin(cx).map(|res| {
432 res??;
433 Ok(self.tx_hash)
434 })
435 }
436}
437
438#[derive(Clone, Debug)]
440pub(crate) struct HeartbeatHandle {
441 tx: mpsc::Sender<TxWatcher>,
442}
443
444impl HeartbeatHandle {
445 #[doc(alias = "watch_transaction")]
447 pub(crate) async fn watch_tx(
448 &self,
449 config: PendingTransactionConfig,
450 received_at_block: Option<u64>,
451 ) -> Result<PendingTransaction, PendingTransactionConfig> {
452 let (tx, rx) = oneshot::channel();
453 let tx_hash = config.tx_hash;
454 match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
455 Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
456 Err(e) => Err(e.0.config),
457 }
458 }
459}
460
461pub(crate) struct Heartbeat<N, S> {
463 stream: futures::stream::Fuse<S>,
465
466 past_blocks: VecDeque<(u64, B256HashSet)>,
468
469 unconfirmed: B256HashMap<TxWatcher>,
471
472 waiting_confs: BTreeMap<u64, Vec<TxWatcher>>,
474
475 reap_at: BTreeMap<Instant, B256>,
477
478 paused: Arc<Paused>,
480
481 _network: std::marker::PhantomData<N>,
482}
483
484impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
485 pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
487 Self {
488 stream: stream.fuse(),
489 past_blocks: Default::default(),
490 unconfirmed: Default::default(),
491 waiting_confs: Default::default(),
492 reap_at: Default::default(),
493 paused: is_paused,
494 _network: Default::default(),
495 }
496 }
497
498 fn check_confirmations(&mut self, current_height: u64) {
500 let to_keep = self.waiting_confs.split_off(&(current_height + 1));
501 let to_notify = std::mem::replace(&mut self.waiting_confs, to_keep);
502 for watcher in to_notify.into_values().flatten() {
503 watcher.notify(Ok(()));
504 }
505 }
506
507 fn next_reap(&self) -> Instant {
510 self.reap_at
511 .first_key_value()
512 .map(|(k, _)| *k)
513 .unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
514 }
515
516 fn reap_timeouts(&mut self) {
518 let now = Instant::now();
519 let to_keep = self.reap_at.split_off(&now);
520 let to_reap = std::mem::replace(&mut self.reap_at, to_keep);
521
522 for tx_hash in to_reap.values() {
523 if let Some(watcher) = self.unconfirmed.remove(tx_hash) {
524 debug!(tx=%tx_hash, "reaped");
525 watcher.notify(Err(WatchTxError::Timeout));
526 }
527 }
528 }
529
530 fn move_reorg_to_unconfirmed(&mut self, new_height: u64) {
534 for waiters in self.waiting_confs.values_mut() {
535 *waiters = std::mem::take(waiters).into_iter().filter_map(|watcher| {
536 if let Some(received_at_block) = watcher.received_at_block {
537 if received_at_block >= new_height {
539 let hash = watcher.config.tx_hash;
540 debug!(tx=%hash, %received_at_block, %new_height, "return to unconfirmed due to reorg");
541 self.unconfirmed.insert(hash, watcher);
542 return None;
543 }
544 }
545 Some(watcher)
546 }).collect();
547 }
548 }
549
550 fn has_pending_transactions(&self) -> bool {
552 !self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
553 }
554
555 fn update_pause_state(&mut self) {
557 let should_pause = !self.has_pending_transactions();
558 if self.paused.is_paused() != should_pause {
559 debug!(paused = should_pause, "updating heartbeat pause state");
560 self.paused.set_paused(should_pause);
561 }
562 }
563
564 fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
567 debug!(tx=%to_watch.config.tx_hash, "watching");
569 trace!(?to_watch.config, ?to_watch.received_at_block);
570 if let Some(received_at_block) = to_watch.received_at_block {
571 let confirmations = to_watch.config.required_confirmations;
574 let confirmed_at = received_at_block + confirmations - 1;
575 let current_height =
576 self.past_blocks.back().map(|(h, _)| *h).unwrap_or(received_at_block);
577
578 if confirmed_at <= current_height {
579 to_watch.notify(Ok(()));
580 } else {
581 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
582 }
583 return;
584 }
585
586 if let Some(timeout) = to_watch.config.timeout {
587 self.reap_at.insert(Instant::now() + timeout, to_watch.config.tx_hash);
588 }
589 for (block_height, txs) in self.past_blocks.iter().rev() {
592 if txs.contains(&to_watch.config.tx_hash) {
593 let confirmations = to_watch.config.required_confirmations;
594 let confirmed_at = *block_height + confirmations - 1;
595 let current_height = self.past_blocks.back().map(|(h, _)| *h).unwrap();
596
597 if confirmed_at <= current_height {
598 to_watch.notify(Ok(()));
599 } else {
600 debug!(tx=%to_watch.config.tx_hash, %block_height, confirmations, "adding to waiting list");
601 let mut to_watch = to_watch;
603 if to_watch.received_at_block.is_none() {
604 to_watch.received_at_block = Some(*block_height);
605 }
606 self.waiting_confs.entry(confirmed_at).or_default().push(to_watch);
607 }
608 return;
609 }
610 }
611
612 self.unconfirmed.insert(to_watch.config.tx_hash, to_watch);
613 }
614
615 fn add_to_waiting_list(&mut self, watcher: TxWatcher, block_height: u64) {
616 let confirmations = watcher.config.required_confirmations;
617 debug!(tx=%watcher.config.tx_hash, %block_height, confirmations, "adding to waiting list");
618 self.waiting_confs.entry(block_height + confirmations - 1).or_default().push(watcher);
619 }
620
621 fn handle_new_block(&mut self, block: N::BlockResponse) {
625 let block_height = block.header().as_ref().number();
626 debug!(%block_height, "handling block");
627
628 const MAX_BLOCKS_TO_RETAIN: usize = 10;
635 if self.past_blocks.len() >= MAX_BLOCKS_TO_RETAIN {
636 self.past_blocks.pop_front();
637 }
638 if let Some((last_height, _)) = self.past_blocks.back().as_ref() {
639 if *last_height + 1 != block_height {
641 debug!(block_height, last_height, "reorg/unpause detected");
644 self.move_reorg_to_unconfirmed(block_height);
645 self.past_blocks.retain(|(h, _)| *h < block_height);
647 }
648 }
649 self.past_blocks.push_back((block_height, block.transactions().hashes().collect()));
650
651 let to_check: Vec<_> = block
653 .transactions()
654 .hashes()
655 .filter_map(|tx_hash| self.unconfirmed.remove(&tx_hash))
656 .collect();
657 for mut watcher in to_check {
658 let confirmations = watcher.config.required_confirmations;
660 if confirmations <= 1 {
661 watcher.notify(Ok(()));
662 continue;
663 }
664 if let Some(set_block) = watcher.received_at_block {
668 warn!(tx=%watcher.config.tx_hash, set_block=%set_block, new_block=%block_height, "received_at_block already set");
669 } else {
671 watcher.received_at_block = Some(block_height);
672 }
673 self.add_to_waiting_list(watcher, block_height);
674 }
675
676 self.check_confirmations(block_height);
677 }
678}
679
680#[cfg(target_family = "wasm")]
681impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
682 pub(crate) fn spawn(self) -> HeartbeatHandle {
684 let (task, handle) = self.consume();
685 task.spawn_task();
686 handle
687 }
688}
689
690#[cfg(not(target_family = "wasm"))]
691impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + Send + 'static> Heartbeat<N, S> {
692 pub(crate) fn spawn(self) -> HeartbeatHandle {
694 let (task, handle) = self.consume();
695 task.spawn_task();
696 handle
697 }
698}
699
700impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
701 fn consume(self) -> (impl Future<Output = ()>, HeartbeatHandle) {
702 let (ix_tx, ixns) = mpsc::channel(64);
703 (self.into_future(ixns), HeartbeatHandle { tx: ix_tx })
704 }
705
706 async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
707 'shutdown: loop {
708 {
709 self.update_pause_state();
710
711 let next_reap = self.next_reap();
712 let sleep = std::pin::pin!(sleep_until(next_reap.into()));
713
714 select! {
717 biased;
718
719 ix_opt = ixns.recv() => match ix_opt {
721 Some(to_watch) => self.handle_watch_ix(to_watch),
722 None => break 'shutdown, },
724
725 Some(block) = self.stream.next() => {
727 self.handle_new_block(block);
728 },
729
730 _ = sleep => {},
733 }
734 }
735
736 self.reap_timeouts();
738 }
739 }
740}