stratus/eth/follower/
consensus.rs1use std::sync::Arc;
2
3use strum::AsRefStr;
4
5use crate::eth::primitives::Bytes;
6use crate::eth::primitives::Hash;
7use crate::eth::primitives::StratusError;
8use crate::eth::rpc::RpcClientApp;
9use crate::infra::BlockchainClient;
10#[cfg(feature = "metrics")]
11use crate::infra::metrics;
12
13const MAX_ALLOWED_LAG_BLOCKS: u64 = 3;
14
15#[derive(Clone, Copy, Debug, AsRefStr)]
16#[strum(serialize_all = "lowercase")]
17pub enum LagDirection {
18 Ahead,
19 Behind,
20}
21
22#[derive(Clone, Copy, Debug)]
23pub struct LagStatus {
24 pub distance: u64,
25 pub direction: LagDirection,
26}
27
28impl LagStatus {
29 pub fn is_ahead(&self) -> bool {
30 matches!(self.direction, LagDirection::Ahead)
31 }
32
33 pub fn is_far_behind(&self) -> bool {
34 matches!(self.direction, LagDirection::Behind) && self.distance > MAX_ALLOWED_LAG_BLOCKS
35 }
36}
37
38#[allow(async_fn_in_trait)]
39pub trait Consensus: Send + Sync {
40 async fn should_serve(&self) -> bool {
42 let lag = match self.lag().await {
43 Ok(lag) => lag,
44 Err(err) => {
45 tracing::error!(?err, "failed to get the lag between this node and the leader");
46 return false;
47 }
48 };
49
50 if lag.is_far_behind() {
51 tracing::warn!(blocks_behind = lag.distance, "validator and replica are too far apart");
52 }
53
54 if lag.is_ahead() {
55 tracing::warn!(distance = lag.distance, "follower is ahead of the leader");
56 }
57
58 !(lag.is_far_behind() || lag.is_ahead())
59 }
60
61 async fn forward_to_leader(&self, tx_hash: Hash, tx_data: Bytes, rpc_client: &RpcClientApp) -> Result<Hash, StratusError> {
63 #[cfg(feature = "metrics")]
64 let start = metrics::now();
65
66 tracing::info!(%tx_hash, %rpc_client, "forwarding transaction to leader");
67
68 let hash = self.get_chain()?.send_raw_transaction_to_leader(tx_data.into(), rpc_client).await?;
69
70 #[cfg(feature = "metrics")]
71 metrics::inc_consensus_forward(start.elapsed());
72
73 Ok(hash)
74 }
75
76 fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>>;
77
78 async fn lag(&self) -> anyhow::Result<LagStatus>;
80}