1use std::collections::HashMap;
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::AtomicUsize;
5use std::sync::atomic::Ordering;
6
7use futures::join;
8use itertools::Itertools;
9use jsonrpsee::ConnectionId;
10use jsonrpsee::SubscriptionMessage;
11use jsonrpsee::SubscriptionSink;
12use serde::ser::SerializeMap;
13use tokio::sync::RwLock;
14use tokio::sync::broadcast;
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tokio::time::timeout;
18
19use crate::GlobalState;
20use crate::eth::primitives::BlockHeader;
21use crate::eth::primitives::Hash;
22use crate::eth::primitives::LogFilter;
23use crate::eth::primitives::LogFilterInput;
24use crate::eth::primitives::LogMined;
25use crate::eth::primitives::RpcError;
26use crate::eth::primitives::StratusError;
27use crate::eth::primitives::UnixTimeNow;
28use crate::eth::rpc::RpcClientApp;
29use crate::ext::DisplayExt;
30use crate::ext::SleepReason;
31use crate::ext::not;
32use crate::ext::spawn;
33use crate::ext::traced_sleep;
34use crate::if_else;
35#[cfg(feature = "metrics")]
36use crate::infra::metrics;
37use crate::infra::tracing::warn_task_rx_closed;
38
39const CLEANING_FREQUENCY: Duration = Duration::from_secs(10);
41
42const NOTIFICATION_TIMEOUT: Duration = Duration::from_secs(10);
44
45const NOTIFIER_SHUTDOWN_CHECK_INTERVAL: Duration = Duration::from_secs(2);
47
48#[cfg(feature = "metrics")]
49mod label {
50 pub(super) const PENDING_TXS: &str = "newPendingTransactions";
51 pub(super) const NEW_HEADS: &str = "newHeads";
52 pub(super) const LOGS: &str = "logs";
53}
54
55#[derive(Debug)]
57pub struct RpcSubscriptions {
58 pub connected: Arc<RpcSubscriptionsConnected>,
59 pub handles: RpcSubscriptionsHandles,
60}
61
62impl RpcSubscriptions {
63 pub fn spawn(rx_pending_txs: broadcast::Receiver<Hash>, rx_blocks: broadcast::Receiver<BlockHeader>, rx_logs: broadcast::Receiver<LogMined>) -> Self {
65 let connected = Arc::new(RpcSubscriptionsConnected::default());
66
67 Self::spawn_subscriptions_cleaner(Arc::clone(&connected));
68 let handles = RpcSubscriptionsHandles {
69 new_pending_txs: Self::spawn_new_pending_txs_notifier(Arc::clone(&connected), rx_pending_txs),
70 new_heads: Self::spawn_new_heads_notifier(Arc::clone(&connected), rx_blocks),
71 logs: Self::spawn_logs_notifier(Arc::clone(&connected), rx_logs),
72 };
73
74 Self { connected, handles }
75 }
76
77 fn spawn_subscriptions_cleaner(subs: Arc<RpcSubscriptionsConnected>) -> JoinHandle<anyhow::Result<()>> {
79 const TASK_NAME: &str = "rpc::sub::cleaner";
80 spawn(TASK_NAME, async move {
81 loop {
82 if GlobalState::is_shutdown_warn(TASK_NAME) {
83 return Ok(());
84 }
85
86 let mut pending_txs_subs_cleaned = Vec::<RpcClientApp>::new();
88 let mut new_heads_subs_cleaned = Vec::<RpcClientApp>::new();
89 let mut logs_subs_cleaned = Vec::<(RpcClientApp, LogFilterInput)>::new();
90
91 subs.pending_txs.write().await.retain(|_, sub| {
93 let should_keep = not(sub.sink.is_closed());
94 if !should_keep {
95 pending_txs_subs_cleaned.push(sub.client.clone());
96 }
97 should_keep
98 });
99 subs.new_heads.write().await.retain(|_, sub| {
100 let should_keep = not(sub.sink.is_closed());
101 if !should_keep {
102 new_heads_subs_cleaned.push(sub.client.clone());
103 }
104 should_keep
105 });
106 subs.logs.write().await.retain(|_, connection_sub_map| {
107 connection_sub_map.retain(|_, sub| {
109 let should_keep = not(sub.inner.sink.is_closed());
110 if !should_keep {
111 logs_subs_cleaned.push((sub.inner.client.clone(), sub.filter.original_input.clone()));
112 }
113 should_keep
114 });
115
116 not(connection_sub_map.is_empty())
118 });
119
120 let amount_cleaned = pending_txs_subs_cleaned.len() + new_heads_subs_cleaned.len() + logs_subs_cleaned.len();
122 if amount_cleaned > 0 {
123 tracing::info!(
124 amount_cleaned,
125 pending_txs = ?pending_txs_subs_cleaned,
126 new_heads = ?new_heads_subs_cleaned,
127 logs = ?logs_subs_cleaned,
128 "cleaned subscriptions",
129 );
130 }
131
132 #[cfg(feature = "metrics")]
134 {
135 for client in pending_txs_subs_cleaned {
138 metrics::set_rpc_subscriptions_active(0, label::PENDING_TXS, client.to_string());
139 }
140 for client in new_heads_subs_cleaned {
141 metrics::set_rpc_subscriptions_active(0, label::NEW_HEADS, client.to_string());
142 }
143 for client in logs_subs_cleaned.into_iter().map(|(client, _)| client) {
144 metrics::set_rpc_subscriptions_active(0, label::LOGS, client.to_string());
145 }
146
147 sub_metrics::update_new_pending_txs_subscription_metrics(&(*subs.pending_txs.read().await));
148 sub_metrics::update_new_heads_subscription_metrics(&(*subs.new_heads.read().await));
149 sub_metrics::update_logs_subscription_metrics(&(*subs.logs.read().await));
150 }
151
152 traced_sleep(CLEANING_FREQUENCY, SleepReason::Interval).await;
154 }
155 })
156 }
157
158 fn spawn_new_pending_txs_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_tx_hash: broadcast::Receiver<Hash>) -> JoinHandle<anyhow::Result<()>> {
160 const TASK_NAME: &str = "rpc::sub::newPendingTransactions";
161 spawn(TASK_NAME, async move {
162 loop {
163 if GlobalState::is_shutdown_warn(TASK_NAME) {
164 return Ok(());
165 }
166
167 let tx_hash = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await {
168 Ok(Ok(tx)) => tx,
169 Ok(Err(_channel_closed)) => break,
170 Err(_timed_out) => continue,
171 };
172
173 let interested_subs = subs.pending_txs.read().await;
174 let subscribers = interested_subs.values().collect_vec();
175
176 if !subscribers.is_empty() {
177 let mut client_type_counts = std::collections::HashMap::new();
179
180 for sub in &subscribers {
181 let client_str = sub.client.to_string();
182 let client_type = Self::get_client_type(&client_str);
183
184 *client_type_counts.entry(client_type).or_insert(0) += 1;
185 }
186
187 let mut client_summary = Vec::new();
189 for (client_type, count) in client_type_counts.iter() {
190 client_summary.push(format!("{client_type}: {count}"));
191 }
192
193 tracing::info!(
194 tx_hash = ?tx_hash,
195 clients = ?client_summary.join(", "),
196 "notifying subscribers about new pending transaction"
197 );
198 }
199 let value = serde_json::value::RawValue::from_string(serde_json::to_string(&tx_hash)?)?;
200 Self::notify(subscribers, value);
201 }
202 warn_task_rx_closed(TASK_NAME);
203 Ok(())
204 })
205 }
206
207 fn spawn_new_heads_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_block: broadcast::Receiver<BlockHeader>) -> JoinHandle<anyhow::Result<()>> {
209 const TASK_NAME: &str = "rpc::sub::newHeads";
210 spawn(TASK_NAME, async move {
211 loop {
212 if GlobalState::is_shutdown_warn(TASK_NAME) {
213 return Ok(());
214 }
215
216 let block_header = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await {
217 Ok(Ok(block)) => block,
218 Ok(Err(_channel_closed)) => break,
219 Err(_timed_out) => continue,
220 };
221
222 let interested_subs = subs.new_heads.read().await;
223 let subscribers = interested_subs.values().collect_vec();
224
225 if !subscribers.is_empty() {
226 let mut client_type_counts = std::collections::HashMap::new();
228
229 for sub in &subscribers {
230 let client_str = sub.client.to_string();
231 let client_type = Self::get_client_type(&client_str);
232
233 *client_type_counts.entry(client_type).or_insert(0) += 1;
234 }
235
236 let mut client_summary = Vec::new();
238 for (client_type, count) in client_type_counts.iter() {
239 client_summary.push(format!("{client_type}: {count}"));
240 }
241
242 tracing::info!(
243 block_number = ?block_header.number,
244 block_hash = ?block_header.hash,
245 clients = ?client_summary.join(", "),
246 "notifying subscribers about new block"
247 );
248 }
249
250 Self::notify(subscribers, block_header);
251 }
252 warn_task_rx_closed(TASK_NAME);
253 Ok(())
254 })
255 }
256
257 fn spawn_logs_notifier(subs: Arc<RpcSubscriptionsConnected>, mut rx_log_mined: broadcast::Receiver<LogMined>) -> JoinHandle<anyhow::Result<()>> {
259 const TASK_NAME: &str = "rpc::sub::logs";
260 spawn(TASK_NAME, async move {
261 loop {
262 if GlobalState::is_shutdown_warn(TASK_NAME) {
263 return Ok(());
264 }
265
266 let log = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_log_mined.recv()).await {
267 Ok(Ok(log)) => log,
268 Ok(Err(_channel_closed)) => break,
269 Err(_timed_out) => continue,
270 };
271
272 let interested_subs = subs.logs.read().await;
273 let matching_subscribers = interested_subs
274 .values()
275 .flat_map(HashMap::values)
276 .filter_map(|s| if_else!(s.filter.matches(&log), Some(&s.inner), None))
277 .collect_vec();
278
279 if !matching_subscribers.is_empty() {
280 let mut client_type_counts = std::collections::HashMap::new();
282
283 for sub in &matching_subscribers {
284 let client_str = sub.client.to_string();
285 let client_type = Self::get_client_type(&client_str);
286
287 *client_type_counts.entry(client_type).or_insert(0) += 1;
288 }
289
290 let mut client_summary = Vec::new();
292 for (client_type, count) in client_type_counts.iter() {
293 client_summary.push(format!("{client_type}: {count}"));
294 }
295
296 tracing::info!(
297 log_block_number = ?log.block_number,
298 log_tx_hash = ?log.transaction_hash,
299 clients = ?client_summary.join(", "),
300 "notifying subscribers about new logs"
301 );
302 }
303
304 Self::notify(matching_subscribers, log);
305 }
306 warn_task_rx_closed(TASK_NAME);
307 Ok(())
308 })
309 }
310
311 fn notify<T>(subs: Vec<&Subscription>, msg: T)
316 where
317 T: TryInto<SubscriptionMessage>,
318 T::Error: fmt::Debug,
319 {
320 if subs.is_empty() {
321 return;
322 }
323
324 let msg = match msg.try_into() {
325 Ok(msg) => msg,
326 Err(e) => {
327 tracing::error!(parent: None, reason = ?e, "failed to convert message into subscription message");
328 return;
329 }
330 };
331
332 for sub in subs {
333 if not(sub.is_active()) {
334 continue;
335 }
336
337 sub.inc_sent();
339
340 let sink = Arc::clone(&sub.sink);
342 let msg_clone = msg.clone();
343 spawn("rpc::sub::notify", async move {
344 if let Err(e) = sink.send_timeout(msg_clone, NOTIFICATION_TIMEOUT).await {
345 match e {
346 jsonrpsee::SendTimeoutError::Timeout(msg) => tracing::error!(reason = ?msg, "failed to send subscription notification"),
347 jsonrpsee::SendTimeoutError::Closed(msg) =>
348 tracing::info!(reason = ?msg, "failed to send subscription notification because the connection was closed"),
349 }
350 }
351 });
352 }
353 }
354
355 fn get_client_type(client_str: &str) -> String {
356 if let Some(index) = client_str.find("::") {
358 client_str[..index].to_string()
360 } else {
361 "other".to_string()
363 }
364 }
365}
366
367#[derive(Debug)]
373pub struct RpcSubscriptionsHandles {
374 new_pending_txs: JoinHandle<anyhow::Result<()>>,
375 new_heads: JoinHandle<anyhow::Result<()>>,
376 logs: JoinHandle<anyhow::Result<()>>,
377}
378
379impl RpcSubscriptionsHandles {
380 pub async fn stopped(self) {
381 let _ = join!(self.new_pending_txs, self.new_heads, self.logs);
382 }
383
384 pub fn abort(&self) {
385 self.new_pending_txs.abort();
386 self.new_heads.abort();
387 self.logs.abort();
388 }
389}
390
391#[derive(Debug, derive_new::new)]
396pub struct Subscription {
397 #[new(default)]
398 created_at: UnixTimeNow,
399
400 client: RpcClientApp,
401 sink: Arc<SubscriptionSink>,
402
403 #[new(default)]
404 sent: AtomicUsize,
405}
406
407impl Subscription {
408 fn is_active(&self) -> bool {
410 not(self.sink.is_closed())
411 }
412
413 fn inc_sent(&self) {
415 self.sent.fetch_add(1, Ordering::Relaxed);
416 }
417}
418
419impl serde::Serialize for Subscription {
420 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421 where
422 S: serde::Serializer,
423 {
424 let mut s = serializer.serialize_map(Some(5))?;
425 s.serialize_entry("created_at", &self.created_at)?;
426 s.serialize_entry("client", &self.client)?;
427 s.serialize_entry("id", &self.sink.subscription_id())?;
428 s.serialize_entry("active", &self.is_active())?;
429 s.serialize_entry("sent", &self.sent.load(Ordering::Relaxed))?;
430 s.end()
431 }
432}
433
434#[derive(Debug, derive_more::Deref, derive_new::new, serde::Serialize)]
435pub struct SubscriptionWithFilter {
436 #[deref]
437 #[serde(flatten)]
438 inner: Subscription,
439
440 filter: LogFilter,
441}
442
443#[derive(Debug, Default)]
445pub struct RpcSubscriptionsConnected {
446 pub pending_txs: RwLock<HashMap<ConnectionId, Subscription>>,
447 pub new_heads: RwLock<HashMap<ConnectionId, Subscription>>,
448 pub logs: RwLock<HashMap<ConnectionId, HashMap<LogFilter, SubscriptionWithFilter>>>,
449}
450
451impl RpcSubscriptionsConnected {
452 pub async fn check_client_subscriptions(&self, max_subscriptions: u32, client: &RpcClientApp) -> Result<(), StratusError> {
454 let pending_txs = self.pending_txs.read().await.values().filter(|s| s.client == *client).count();
455 let new_heads = self.new_heads.read().await.values().filter(|s| s.client == *client).count();
456 let logs = self
457 .logs
458 .read()
459 .await
460 .values()
461 .flat_map(HashMap::values)
462 .filter(|s| s.client == *client)
463 .count();
464 tracing::info!(%pending_txs, %new_heads, %logs, "current client subscriptions");
465
466 if pending_txs + new_heads + logs >= max_subscriptions as usize {
467 return Err(RpcError::SubscriptionLimit { max: max_subscriptions }.into());
468 }
469
470 Ok(())
471 }
472
473 pub async fn add_new_pending_txs_subscription(&self, rpc_client: &RpcClientApp, sink: SubscriptionSink) {
475 tracing::info!(
476 id = sink.subscription_id().to_string_ext(),
477 %rpc_client,
478 "subscribing to newPendingTransactions event"
479 );
480 let mut subs = self.pending_txs.write().await;
481 subs.insert(sink.connection_id(), Subscription::new(rpc_client.clone(), sink.into()));
482
483 #[cfg(feature = "metrics")]
484 sub_metrics::update_new_pending_txs_subscription_metrics(&subs);
485 }
486
487 pub async fn add_new_heads_subscription(&self, rpc_client: &RpcClientApp, sink: SubscriptionSink) {
489 tracing::info!(
490 id = sink.subscription_id().to_string_ext(),
491 %rpc_client,
492 "subscribing to newHeads event"
493 );
494 let mut subs = self.new_heads.write().await;
495 subs.insert(sink.connection_id(), Subscription::new(rpc_client.clone(), sink.into()));
496
497 #[cfg(feature = "metrics")]
498 sub_metrics::update_new_heads_subscription_metrics(&subs);
499 }
500
501 pub async fn add_logs_subscription(&self, rpc_client: &RpcClientApp, filter: LogFilter, sink: SubscriptionSink) {
506 tracing::info!(
507 id = sink.subscription_id().to_string_ext(), ?filter,
508 %rpc_client,
509 "subscribing to logs event"
510 );
511 let mut subs = self.logs.write().await;
512 let filter_to_subscription_map = subs.entry(sink.connection_id()).or_default();
513
514 let inner = Subscription::new(rpc_client.clone(), sink.into());
517 filter_to_subscription_map.insert(filter.clone(), SubscriptionWithFilter::new(inner, filter));
518
519 #[cfg(feature = "metrics")]
520 sub_metrics::update_logs_subscription_metrics(&subs);
521 }
522}
523
524#[cfg(feature = "metrics")]
525mod sub_metrics {
526 use super::ConnectionId;
527 use super::HashMap;
528 use super::Itertools;
529 use super::LogFilter;
530 use super::RpcClientApp;
531 use super::Subscription;
532 use super::SubscriptionWithFilter;
533 use super::label;
534 use super::metrics;
535
536 pub fn update_new_pending_txs_subscription_metrics(subs: &HashMap<ConnectionId, Subscription>) {
537 update_subscription_count(label::PENDING_TXS, subs.values());
538 }
539
540 pub fn update_new_heads_subscription_metrics(subs: &HashMap<ConnectionId, Subscription>) {
541 update_subscription_count(label::NEW_HEADS, subs.values());
542 }
543
544 pub fn update_logs_subscription_metrics(subs: &HashMap<ConnectionId, HashMap<LogFilter, SubscriptionWithFilter>>) {
545 update_subscription_count(
546 label::LOGS,
547 subs.values().flat_map(HashMap::values).map(|sub_with_filter| &sub_with_filter.inner),
548 );
549 }
550
551 fn update_subscription_count<'a, I>(sub_label: &str, sub_client_app_iter: I)
552 where
553 I: Iterator<Item = &'a Subscription>,
554 {
555 let client_counts: HashMap<&RpcClientApp, usize> = sub_client_app_iter.map(|sub| &sub.client).counts();
556
557 for (client, count) in client_counts {
558 metrics::set_rpc_subscriptions_active(count as u64, sub_label, client.to_string());
559 }
560 }
561}