alloy_rpc_types_eth/
pubsub.rs

1//! Ethereum types for pub-sub
2
3use crate::{Filter, Header, Log, Transaction};
4use alloc::{boxed::Box, format};
5use alloy_primitives::B256;
6use alloy_serde::WithOtherFields;
7
8/// Subscription result.
9#[derive(Clone, Debug, PartialEq, Eq)]
10#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
11#[cfg_attr(feature = "serde", serde(untagged))]
12pub enum SubscriptionResult<T = Transaction> {
13    /// New block header.
14    Header(Box<WithOtherFields<Header>>),
15    /// Log
16    Log(Box<Log>),
17    /// Transaction hash
18    TransactionHash(B256),
19    /// Full Transaction
20    FullTransaction(Box<T>),
21    /// SyncStatus
22    SyncState(PubSubSyncStatus),
23}
24
25/// Response type for a SyncStatus subscription.
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
28#[cfg_attr(feature = "serde", serde(untagged))]
29pub enum PubSubSyncStatus {
30    /// If not currently syncing, this should always be `false`.
31    Simple(bool),
32    /// Syncing metadata.
33    Detailed(SyncStatusMetadata),
34}
35
36/// Sync status metadata.
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
39#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
40pub struct SyncStatusMetadata {
41    /// Whether the node is currently syncing.
42    pub syncing: bool,
43    /// The starting block.
44    pub starting_block: u64,
45    /// The current block.
46    pub current_block: u64,
47    /// The highest block.
48    #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "Option::is_none"))]
49    pub highest_block: Option<u64>,
50}
51
52#[cfg(feature = "serde")]
53impl<T> serde::Serialize for SubscriptionResult<T>
54where
55    T: serde::Serialize,
56{
57    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
58    where
59        S: serde::Serializer,
60    {
61        match *self {
62            Self::Header(ref header) => header.serialize(serializer),
63            Self::Log(ref log) => log.serialize(serializer),
64            Self::TransactionHash(ref hash) => hash.serialize(serializer),
65            Self::FullTransaction(ref tx) => tx.serialize(serializer),
66            Self::SyncState(ref sync) => sync.serialize(serializer),
67        }
68    }
69}
70
71/// Subscription kind.
72#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
73#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
74#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
75pub enum SubscriptionKind {
76    /// New block headers subscription.
77    ///
78    /// Fires a notification each time a new header is appended to the chain, including chain
79    /// reorganizations. In case of a chain reorganization the subscription will emit all new
80    /// headers for the new chain. Therefore the subscription can emit multiple headers on the same
81    /// height.
82    NewHeads,
83    /// Logs subscription.
84    ///
85    /// Returns logs that are included in new imported blocks and match the given filter criteria.
86    /// In case of a chain reorganization previous sent logs that are on the old chain will be
87    /// resent with the removed property set to true. Logs from transactions that ended up in the
88    /// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
89    /// multiple times.
90    Logs,
91    /// New Pending Transactions subscription.
92    ///
93    /// Returns the hash or full tx for all transactions that are added to the pending state and
94    /// are signed with a key that is available in the node. When a transaction that was
95    /// previously part of the canonical chain isn't part of the new canonical chain after a
96    /// reorganization its again emitted.
97    NewPendingTransactions,
98    /// Node syncing status subscription.
99    ///
100    /// Indicates when the node starts or stops synchronizing. The result can either be a boolean
101    /// indicating that the synchronization has started (true), finished (false) or an object with
102    /// various progress indicators.
103    Syncing,
104}
105
106/// Any additional parameters for a subscription.
107#[derive(Clone, Debug, Default, PartialEq, Eq)]
108pub enum Params {
109    /// No parameters passed.
110    #[default]
111    None,
112    /// Log parameters.
113    Logs(Box<Filter>),
114    /// Boolean parameter for new pending transactions.
115    Bool(bool),
116}
117
118impl Params {
119    /// Returns true if it's a bool parameter.
120    #[inline]
121    pub const fn is_bool(&self) -> bool {
122        matches!(self, Self::Bool(_))
123    }
124
125    /// Returns true if it's a log parameter.
126    #[inline]
127    pub const fn is_logs(&self) -> bool {
128        matches!(self, Self::Logs(_))
129    }
130}
131
132impl From<Filter> for Params {
133    fn from(filter: Filter) -> Self {
134        Self::Logs(Box::new(filter))
135    }
136}
137
138impl From<bool> for Params {
139    fn from(value: bool) -> Self {
140        Self::Bool(value)
141    }
142}
143
144#[cfg(feature = "serde")]
145impl serde::Serialize for Params {
146    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
147    where
148        S: serde::Serializer,
149    {
150        match self {
151            Self::None => (&[] as &[serde_json::Value]).serialize(serializer),
152            Self::Logs(logs) => logs.serialize(serializer),
153            Self::Bool(full) => full.serialize(serializer),
154        }
155    }
156}
157
158#[cfg(feature = "serde")]
159impl<'a> serde::Deserialize<'a> for Params {
160    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
161    where
162        D: serde::Deserializer<'a>,
163    {
164        use serde::de::Error;
165
166        let v = serde_json::Value::deserialize(deserializer)?;
167
168        if v.is_null() {
169            return Ok(Self::None);
170        }
171
172        if let Some(val) = v.as_bool() {
173            return Ok(val.into());
174        }
175
176        serde_json::from_value::<Filter>(v)
177            .map(Into::into)
178            .map_err(|e| D::Error::custom(format!("Invalid Pub-Sub parameters: {e}")))
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use similar_asserts::assert_eq;
186
187    #[test]
188    #[cfg(feature = "serde")]
189    fn params_serde() {
190        // Test deserialization of boolean parameter
191        let s: Params = serde_json::from_str("true").unwrap();
192        assert_eq!(s, Params::Bool(true));
193
194        // Test deserialization of null (None) parameter
195        let s: Params = serde_json::from_str("null").unwrap();
196        assert_eq!(s, Params::None);
197
198        // Test deserialization of log parameters
199        let filter = Filter::default();
200        let s: Params = serde_json::from_str(&serde_json::to_string(&filter).unwrap()).unwrap();
201        assert_eq!(s, Params::Logs(Box::new(filter)));
202    }
203
204    #[test]
205    fn params_is_bool() {
206        // Check if the `is_bool` method correctly identifies boolean parameters
207        let param = Params::Bool(true);
208        assert!(param.is_bool());
209
210        let param = Params::None;
211        assert!(!param.is_bool());
212
213        let param = Params::Logs(Box::default());
214        assert!(!param.is_bool());
215    }
216
217    #[test]
218    fn params_is_logs() {
219        // Check if the `is_logs` method correctly identifies log parameters
220        let param = Params::Logs(Box::default());
221        assert!(param.is_logs());
222
223        let param = Params::None;
224        assert!(!param.is_logs());
225
226        let param = Params::Bool(true);
227        assert!(!param.is_logs());
228    }
229
230    #[test]
231    fn params_from_filter() {
232        let filter = Filter::default();
233        let param: Params = filter.clone().into();
234        assert_eq!(param, Params::Logs(Box::new(filter)));
235    }
236
237    #[test]
238    fn params_from_bool() {
239        let param: Params = true.into();
240        assert_eq!(param, Params::Bool(true));
241
242        let param: Params = false.into();
243        assert_eq!(param, Params::Bool(false));
244    }
245
246    #[test]
247    #[cfg(feature = "serde")]
248    fn params_serialize_none() {
249        let param = Params::None;
250        let serialized = serde_json::to_string(&param).unwrap();
251        assert_eq!(serialized, "[]");
252    }
253
254    #[test]
255    #[cfg(feature = "serde")]
256    fn params_serialize_bool() {
257        let param = Params::Bool(true);
258        let serialized = serde_json::to_string(&param).unwrap();
259        assert_eq!(serialized, "true");
260
261        let param = Params::Bool(false);
262        let serialized = serde_json::to_string(&param).unwrap();
263        assert_eq!(serialized, "false");
264    }
265
266    #[test]
267    #[cfg(feature = "serde")]
268    fn params_serialize_logs() {
269        let filter = Filter::default();
270        let param = Params::Logs(Box::new(filter.clone()));
271        let serialized = serde_json::to_string(&param).unwrap();
272        let expected = serde_json::to_string(&filter).unwrap();
273        assert_eq!(serialized, expected);
274    }
275}