stratus/eth/rpc/
rpc_subscriptions.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering;
6
7use futures::join;
8use itertools::Itertools;
9use jsonrpsee::ConnectionId;
10use jsonrpsee::SubscriptionMessage;
11use jsonrpsee::SubscriptionSink;
12use serde::ser::SerializeMap;
13use tokio::sync::RwLock;
14use tokio::sync::broadcast;
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tokio::time::timeout;
18
19use crate::GlobalState;
20use crate::eth::primitives::BlockHeader;
21use crate::eth::primitives::Hash;
22use crate::eth::primitives::LogFilter;
23use crate::eth::primitives::LogFilterInput;
24use crate::eth::primitives::LogMined;
25use crate::eth::primitives::RpcError;
26use crate::eth::primitives::StratusError;
27use crate::eth::primitives::UnixTimeNow;
28use crate::eth::rpc::RpcClientApp;
29use crate::ext::DisplayExt;
30use crate::ext::SleepReason;
31use crate::ext::not;
32use crate::ext::spawn;
33use crate::ext::traced_sleep;
34use crate::if_else;
35#[cfg(feature = "metrics")]
36use crate::infra::metrics;
37use crate::infra::tracing::warn_task_rx_closed;
38
39/// Frequency of cleaning up closed subscriptions.
40const CLEANING_FREQUENCY: Duration = Duration::from_secs(10);
41
42/// Timeout used when sending notifications to subscribers.
43const NOTIFICATION_TIMEOUT: Duration = Duration::from_secs(10);
44
45/// Max wait since last checked shutdown in notifier.
46const NOTIFIER_SHUTDOWN_CHECK_INTERVAL: Duration = Duration::from_secs(2);
47
48#[cfg(feature = "metrics")]
49mod label {
50    pub(super) const PENDING_TXS: &str = "newPendingTransactions";
51    pub(super) const NEW_HEADS: &str = "newHeads";
52    pub(super) const LOGS: &str = "logs";
53}
54
55/// State of JSON-RPC websocket subscriptions.
56#[derive(Debug)]
57pub struct RpcSubscriptions {
58    pub connected: Arc<RpcSubscriptionsConnected>,
59    pub handles: RpcSubscriptionsHandles,
60}
61
62impl RpcSubscriptions {
63    /// Creates a new subscription manager that automatically spawns all necessary tasks in background.
64    pub fn spawn(rx_pending_txs: broadcast::Receiver<Hash>, rx_blocks: broadcast::Receiver<BlockHeader>, rx_logs: broadcast::Receiver<LogMined>) -> Self {
65        let connected = Arc::new(RpcSubscriptionsConnected::default());
66
67        Self::spawn_subscriptions_cleaner(Arc::clone(&connected));
68        let handles = RpcSubscriptionsHandles {
69            new_pending_txs: Self::spawn_new_pending_txs_notifier(Arc::clone(&connected), rx_pending_txs),
70            new_heads: Self::spawn_new_heads_notifier(Arc::clone(&connected), rx_blocks),
71            logs: Self::spawn_logs_notifier(Arc::clone(&connected), rx_logs),
72        };
73
74        Self { connected, handles }
75    }
76
77    /// Spawns a new task to clean up closed subscriptions from time to time.
78    fn spawn_subscriptions_cleaner(subs: Arc<RpcSubscriptionsConnected>) -> JoinHandle<anyhow::Result<()>> {
79        const TASK_NAME: &str = "rpc::sub::cleaner";
80        spawn(TASK_NAME, async move {
81            loop {
82                if GlobalState::is_shutdown_warn(TASK_NAME) {
83                    return Ok(());
84                }
85
86                // store here which subscriptions were cleaned to later log them
87                let mut pending_txs_subs_cleaned = Vec::<RpcClientApp>::new();
88                let mut new_heads_subs_cleaned = Vec::<RpcClientApp>::new();
89                let mut logs_subs_cleaned = Vec::<(RpcClientApp, LogFilterInput)>::new();
90
91                // remove closed subscriptions
92                subs.pending_txs.write().await.retain(|_, sub| {
93                    let should_keep = not(sub.sink.is_closed());
94                    if !should_keep {
95                        pending_txs_subs_cleaned.push(sub.client.clone());
96                    }
97                    should_keep
98                });
99                subs.new_heads.write().await.retain(|_, sub| {
100                    let should_keep = not(sub.sink.is_closed());
101                    if !should_keep {
102                        new_heads_subs_cleaned.push(sub.client.clone());
103                    }
104                    should_keep
105                });
106                subs.logs.write().await.retain(|_, connection_sub_map| {
107                    // clear inner map first
108                    connection_sub_map.retain(|_, sub| {
109                        let should_keep = not(sub.inner.sink.is_closed());
110                        if !should_keep {
111                            logs_subs_cleaned.push((sub.inner.client.clone(), sub.filter.original_input.clone()));
112                        }
113                        should_keep
114                    });
115
116                    // remove empty connection maps
117                    not(connection_sub_map.is_empty())
118                });
119
120                // log cleaned subscriptions
121                let amount_cleaned = pending_txs_subs_cleaned.len() + new_heads_subs_cleaned.len() + logs_subs_cleaned.len();
122                if amount_cleaned > 0 {
123                    tracing::info!(
124                        amount_cleaned,
125                        pending_txs = ?pending_txs_subs_cleaned,
126                        new_heads = ?new_heads_subs_cleaned,
127                        logs = ?logs_subs_cleaned,
128                        "cleaned subscriptions",
129                    );
130                }
131
132                // update metrics
133                #[cfg(feature = "metrics")]
134                {
135                    // Set cleaned subscriptions gauges to zero, which might be the wrong value
136                    // they'll be set back to the correct values in the lines below
137                    for client in pending_txs_subs_cleaned {
138                        metrics::set_rpc_subscriptions_active(0, label::PENDING_TXS, client.to_string());
139                    }
140                    for client in new_heads_subs_cleaned {
141                        metrics::set_rpc_subscriptions_active(0, label::NEW_HEADS, client.to_string());
142                    }
143                    for client in logs_subs_cleaned.into_iter().map(|(client, _)| client) {
144                        metrics::set_rpc_subscriptions_active(0, label::LOGS, client.to_string());
145                    }
146
147                    sub_metrics::update_new_pending_txs_subscription_metrics(&(*subs.pending_txs.read().await));
148                    sub_metrics::update_new_heads_subscription_metrics(&(*subs.new_heads.read().await));
149                    sub_metrics::update_logs_subscription_metrics(&(*subs.logs.read().await));
150                }
151
152                // await next iteration
153                traced_sleep(CLEANING_FREQUENCY, SleepReason::Interval).await;
154            }
155        })
156    }
157
158    /// Spawns a new task that notifies subscribers about new executed transactions.
159    fn spawn_new_pending_txs_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_tx_hash: broadcast::Receiver<Hash>) -> JoinHandle<anyhow::Result<()>> {
160        const TASK_NAME: &str = "rpc::sub::newPendingTransactions";
161        spawn(TASK_NAME, async move {
162            loop {
163                if GlobalState::is_shutdown_warn(TASK_NAME) {
164                    return Ok(());
165                }
166
167                let tx_hash = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await {
168                    Ok(Ok(tx)) => tx,
169                    Ok(Err(_channel_closed)) => break,
170                    Err(_timed_out) => continue,
171                };
172
173                let interested_subs = subs.pending_txs.read().await;
174                let subscribers = interested_subs.values().collect_vec();
175
176                if !subscribers.is_empty() {
177                    // Group clients by type and count how many of each type
178                    let mut client_type_counts = std::collections::HashMap::new();
179
180                    for sub in &subscribers {
181                        let client_str = sub.client.to_string();
182                        let client_type = Self::get_client_type(&client_str);
183
184                        *client_type_counts.entry(client_type).or_insert(0) += 1;
185                    }
186
187                    // Format the log message
188                    let mut client_summary = Vec::new();
189                    for (client_type, count) in client_type_counts.iter() {
190                        client_summary.push(format!("{client_type}: {count}"));
191                    }
192
193                    tracing::info!(
194                        tx_hash = ?tx_hash,
195                        clients = ?client_summary.join(", "),
196                        "notifying subscribers about new pending transaction"
197                    );
198                }
199                let value = serde_json::value::RawValue::from_string(serde_json::to_string(&tx_hash)?)?;
200                Self::notify(subscribers, value);
201            }
202            warn_task_rx_closed(TASK_NAME);
203            Ok(())
204        })
205    }
206
207    /// Spawns a new task that notifies subscribers about new created blocks.
208    fn spawn_new_heads_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_block: broadcast::Receiver<BlockHeader>) -> JoinHandle<anyhow::Result<()>> {
209        const TASK_NAME: &str = "rpc::sub::newHeads";
210        spawn(TASK_NAME, async move {
211            loop {
212                if GlobalState::is_shutdown_warn(TASK_NAME) {
213                    return Ok(());
214                }
215
216                let block_header = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await {
217                    Ok(Ok(block)) => block,
218                    Ok(Err(_channel_closed)) => break,
219                    Err(_timed_out) => continue,
220                };
221
222                let interested_subs = subs.new_heads.read().await;
223                let subscribers = interested_subs.values().collect_vec();
224
225                if !subscribers.is_empty() {
226                    // Group clients by type and count how many of each type
227                    let mut client_type_counts = std::collections::HashMap::new();
228
229                    for sub in &subscribers {
230                        let client_str = sub.client.to_string();
231                        let client_type = Self::get_client_type(&client_str);
232
233                        *client_type_counts.entry(client_type).or_insert(0) += 1;
234                    }
235
236                    // Format the log message
237                    let mut client_summary = Vec::new();
238                    for (client_type, count) in client_type_counts.iter() {
239                        client_summary.push(format!("{client_type}: {count}"));
240                    }
241
242                    tracing::info!(
243                        block_number = ?block_header.number,
244                        block_hash = ?block_header.hash,
245                        clients = ?client_summary.join(", "),
246                        "notifying subscribers about new block"
247                    );
248                }
249
250                Self::notify(subscribers, block_header);
251            }
252            warn_task_rx_closed(TASK_NAME);
253            Ok(())
254        })
255    }
256
257    /// Spawns a new task that notifies subscribers about new transactions logs.
258    fn spawn_logs_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_log_mined: broadcast::Receiver<LogMined>) -> JoinHandle<anyhow::Result<()>> {
259        const TASK_NAME: &str = "rpc::sub::logs";
260        spawn(TASK_NAME, async move {
261            loop {
262                if GlobalState::is_shutdown_warn(TASK_NAME) {
263                    return Ok(());
264                }
265
266                let log = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_log_mined.recv()).await {
267                    Ok(Ok(log)) => log,
268                    Ok(Err(_channel_closed)) => break,
269                    Err(_timed_out) => continue,
270                };
271
272                let interested_subs = subs.logs.read().await;
273                let matching_subscribers = interested_subs
274                    .values()
275                    .flat_map(HashMap::values)
276                    .filter_map(|s| if_else!(s.filter.matches(&log), Some(&s.inner), None))
277                    .collect_vec();
278
279                if !matching_subscribers.is_empty() {
280                    // Group clients by type and count how many of each type
281                    let mut client_type_counts = std::collections::HashMap::new();
282
283                    for sub in &matching_subscribers {
284                        let client_str = sub.client.to_string();
285                        let client_type = Self::get_client_type(&client_str);
286
287                        *client_type_counts.entry(client_type).or_insert(0) += 1;
288                    }
289
290                    // Format the log message
291                    let mut client_summary = Vec::new();
292                    for (client_type, count) in client_type_counts.iter() {
293                        client_summary.push(format!("{client_type}: {count}"));
294                    }
295
296                    tracing::info!(
297                        log_block_number = ?log.block_number,
298                        log_tx_hash = ?log.transaction_hash,
299                        clients = ?client_summary.join(", "),
300                        "notifying subscribers about new logs"
301                    );
302                }
303
304                Self::notify(matching_subscribers, log);
305            }
306            warn_task_rx_closed(TASK_NAME);
307            Ok(())
308        })
309    }
310
311    // -------------------------------------------------------------------------
312    // Helpers
313    // -------------------------------------------------------------------------
314
315    fn notify<T>(subs: Vec<&Subscription>, msg: T)
316    where
317        T: TryInto<SubscriptionMessage>,
318        T::Error: fmt::Debug,
319    {
320        if subs.is_empty() {
321            return;
322        }
323
324        let msg = match msg.try_into() {
325            Ok(msg) => msg,
326            Err(e) => {
327                tracing::error!(parent: None, reason = ?e, "failed to convert message into subscription message");
328                return;
329            }
330        };
331
332        for sub in subs {
333            if not(sub.is_active()) {
334                continue;
335            }
336
337            // track metric
338            sub.inc_sent();
339
340            // send
341            let sink = Arc::clone(&sub.sink);
342            let msg_clone = msg.clone();
343            spawn("rpc::sub::notify", async move {
344                if let Err(e) = sink.send_timeout(msg_clone, NOTIFICATION_TIMEOUT).await {
345                    match e {
346                        jsonrpsee::SendTimeoutError::Timeout(msg) => tracing::error!(reason = ?msg, "failed to send subscription notification"),
347                        jsonrpsee::SendTimeoutError::Closed(msg) =>
348                            tracing::info!(reason = ?msg, "failed to send subscription notification because the connection was closed"),
349                    }
350                }
351            });
352        }
353    }
354
355    fn get_client_type(client_str: &str) -> String {
356        // Split the client string by "::" and take the first part
357        if let Some(index) = client_str.find("::") {
358            // Extract the type part (before the first "::")
359            client_str[..index].to_string()
360        } else {
361            // If there's no "::", return "other"
362            "other".to_string()
363        }
364    }
365}
366
367// -----------------------------------------------------------------------------
368// Notifier handles
369// -----------------------------------------------------------------------------
370
371/// Handles of subscription background tasks.
372#[derive(Debug)]
373pub struct RpcSubscriptionsHandles {
374    new_pending_txs: JoinHandle<anyhow::Result<()>>,
375    new_heads: JoinHandle<anyhow::Result<()>>,
376    logs: JoinHandle<anyhow::Result<()>>,
377}
378
379impl RpcSubscriptionsHandles {
380    pub async fn stopped(self) {
381        let _ = join!(self.new_pending_txs, self.new_heads, self.logs);
382    }
383
384    pub fn abort(&self) {
385        self.new_pending_txs.abort();
386        self.new_heads.abort();
387        self.logs.abort();
388    }
389}
390
391// -----------------------------------------------------------------------------
392// Connected clients
393// -----------------------------------------------------------------------------
394
395#[derive(Debug, derive_new::new)]
396pub struct Subscription {
397    #[new(default)]
398    created_at: UnixTimeNow,
399
400    client: RpcClientApp,
401    sink: Arc<SubscriptionSink>,
402
403    #[new(default)]
404    sent: AtomicUsize,
405}
406
407impl Subscription {
408    /// Checks if the subscription still active.
409    fn is_active(&self) -> bool {
410        not(self.sink.is_closed())
411    }
412
413    /// Increment the number of messages sent to this subscription.
414    fn inc_sent(&self) {
415        self.sent.fetch_add(1, Ordering::Relaxed);
416    }
417}
418
419impl serde::Serialize for Subscription {
420    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421    where
422        S: serde::Serializer,
423    {
424        let mut s = serializer.serialize_map(Some(5))?;
425        s.serialize_entry("created_at", &self.created_at)?;
426        s.serialize_entry("client", &self.client)?;
427        s.serialize_entry("id", &self.sink.subscription_id())?;
428        s.serialize_entry("active", &self.is_active())?;
429        s.serialize_entry("sent", &self.sent.load(Ordering::Relaxed))?;
430        s.end()
431    }
432}
433
434#[derive(Debug, derive_more::Deref, derive_new::new, serde::Serialize)]
435pub struct SubscriptionWithFilter {
436    #[deref]
437    #[serde(flatten)]
438    inner: Subscription,
439
440    filter: LogFilter,
441}
442
443/// Active client subscriptions.
444#[derive(Debug, Default)]
445pub struct RpcSubscriptionsConnected {
446    pub pending_txs: RwLock<HashMap<ConnectionId, Subscription>>,
447    pub new_heads: RwLock<HashMap<ConnectionId, Subscription>>,
448    pub logs: RwLock<HashMap<ConnectionId, HashMap<LogFilter, SubscriptionWithFilter>>>,
449}
450
451impl RpcSubscriptionsConnected {
452    /// Checks the number of subscriptions for a given client.
453    pub async fn check_client_subscriptions(&self, max_subscriptions: u32, client: &RpcClientApp) -> Result<(), StratusError> {
454        let pending_txs = self.pending_txs.read().await.values().filter(|s| s.client == *client).count();
455        let new_heads = self.new_heads.read().await.values().filter(|s| s.client == *client).count();
456        let logs = self
457            .logs
458            .read()
459            .await
460            .values()
461            .flat_map(HashMap::values)
462            .filter(|s| s.client == *client)
463            .count();
464        tracing::info!(%pending_txs, %new_heads, %logs, "current client subscriptions");
465
466        if pending_txs + new_heads + logs >= max_subscriptions as usize {
467            return Err(RpcError::SubscriptionLimit { max: max_subscriptions }.into());
468        }
469
470        Ok(())
471    }
472
473    /// Adds a new subscriber to `newPendingTransactions` event.
474    pub async fn add_new_pending_txs_subscription(&self, rpc_client: &RpcClientApp, sink: SubscriptionSink) {
475        tracing::info!(
476            id = sink.subscription_id().to_string_ext(),
477            %rpc_client,
478            "subscribing to newPendingTransactions event"
479        );
480        let mut subs = self.pending_txs.write().await;
481        subs.insert(sink.connection_id(), Subscription::new(rpc_client.clone(), sink.into()));
482
483        #[cfg(feature = "metrics")]
484        sub_metrics::update_new_pending_txs_subscription_metrics(&subs);
485    }
486
487    /// Adds a new subscriber to `newHeads` event.
488    pub async fn add_new_heads_subscription(&self, rpc_client: &RpcClientApp, sink: SubscriptionSink) {
489        tracing::info!(
490            id = sink.subscription_id().to_string_ext(),
491            %rpc_client,
492            "subscribing to newHeads event"
493        );
494        let mut subs = self.new_heads.write().await;
495        subs.insert(sink.connection_id(), Subscription::new(rpc_client.clone(), sink.into()));
496
497        #[cfg(feature = "metrics")]
498        sub_metrics::update_new_heads_subscription_metrics(&subs);
499    }
500
501    /// Adds a new subscriber to `logs` event.
502    ///
503    /// If the same connection is asking to subscribe with the same filter (which is redundant),
504    /// the new subscription will overwrite the newest one.
505    pub async fn add_logs_subscription(&self, rpc_client: &RpcClientApp, filter: LogFilter, sink: SubscriptionSink) {
506        tracing::info!(
507            id = sink.subscription_id().to_string_ext(), ?filter,
508            %rpc_client,
509            "subscribing to logs event"
510        );
511        let mut subs = self.logs.write().await;
512        let filter_to_subscription_map = subs.entry(sink.connection_id()).or_default();
513
514        // Insert the new subscription, if it already existed with the provided filter, overwrite
515        // the previous sink with the newest
516        let inner = Subscription::new(rpc_client.clone(), sink.into());
517        filter_to_subscription_map.insert(filter.clone(), SubscriptionWithFilter::new(inner, filter));
518
519        #[cfg(feature = "metrics")]
520        sub_metrics::update_logs_subscription_metrics(&subs);
521    }
522}
523
524#[cfg(feature = "metrics")]
525mod sub_metrics {
526    use super::ConnectionId;
527    use super::HashMap;
528    use super::Itertools;
529    use super::LogFilter;
530    use super::RpcClientApp;
531    use super::Subscription;
532    use super::SubscriptionWithFilter;
533    use super::label;
534    use super::metrics;
535
536    pub fn update_new_pending_txs_subscription_metrics(subs: &HashMap<ConnectionId, Subscription>) {
537        update_subscription_count(label::PENDING_TXS, subs.values());
538    }
539
540    pub fn update_new_heads_subscription_metrics(subs: &HashMap<ConnectionId, Subscription>) {
541        update_subscription_count(label::NEW_HEADS, subs.values());
542    }
543
544    pub fn update_logs_subscription_metrics(subs: &HashMap<ConnectionId, HashMap<LogFilter, SubscriptionWithFilter>>) {
545        update_subscription_count(
546            label::LOGS,
547            subs.values().flat_map(HashMap::values).map(|sub_with_filter| &sub_with_filter.inner),
548        );
549    }
550
551    fn update_subscription_count<'a, I>(sub_label: &str, sub_client_app_iter: I)
552    where
553        I: Iterator<Item = &'a Subscription>,
554    {
555        let client_counts: HashMap<&RpcClientApp, usize> = sub_client_app_iter.map(|sub| &sub.client).counts();
556
557        for (client, count) in client_counts {
558            metrics::set_rpc_subscriptions_active(count as u64, sub_label, client.to_string());
559        }
560    }
561}