stratus/eth/follower/
consensus.rs

1use std::sync::Arc;
2
3use strum::AsRefStr;
4
5use crate::eth::primitives::Bytes;
6use crate::eth::primitives::Hash;
7use crate::eth::primitives::StratusError;
8use crate::eth::rpc::RpcClientApp;
9use crate::infra::BlockchainClient;
10#[cfg(feature = "metrics")]
11use crate::infra::metrics;
12
13const MAX_ALLOWED_LAG_BLOCKS: u64 = 3;
14
15#[derive(Clone, Copy, Debug, AsRefStr)]
16#[strum(serialize_all = "lowercase")]
17pub enum LagDirection {
18    Ahead,
19    Behind,
20}
21
22#[derive(Clone, Copy, Debug)]
23pub struct LagStatus {
24    pub distance: u64,
25    pub direction: LagDirection,
26}
27
28impl LagStatus {
29    pub fn is_ahead(&self) -> bool {
30        matches!(self.direction, LagDirection::Ahead)
31    }
32
33    pub fn is_far_behind(&self) -> bool {
34        matches!(self.direction, LagDirection::Behind) && self.distance > MAX_ALLOWED_LAG_BLOCKS
35    }
36}
37
38#[allow(async_fn_in_trait)]
39pub trait Consensus: Send + Sync {
40    /// Whether this node should serve requests.
41    async fn should_serve(&self) -> bool {
42        let lag = match self.lag().await {
43            Ok(lag) => lag,
44            Err(err) => {
45                tracing::error!(?err, "failed to get the lag between this node and the leader");
46                return false;
47            }
48        };
49
50        if lag.is_far_behind() {
51            tracing::warn!(blocks_behind = lag.distance, "validator and replica are too far apart");
52        }
53
54        if lag.is_ahead() {
55            tracing::warn!(distance = lag.distance, "follower is ahead of the leader");
56        }
57
58        !(lag.is_far_behind() || lag.is_ahead())
59    }
60
61    /// Forwards a transaction to leader.
62    async fn forward_to_leader(&self, tx_hash: Hash, tx_data: Bytes, rpc_client: &RpcClientApp) -> Result<Hash, StratusError> {
63        #[cfg(feature = "metrics")]
64        let start = metrics::now();
65
66        tracing::info!(%tx_hash, %rpc_client, "forwarding transaction to leader");
67
68        let hash = self.get_chain()?.send_raw_transaction_to_leader(tx_data.into(), rpc_client).await?;
69
70        #[cfg(feature = "metrics")]
71        metrics::inc_consensus_forward(start.elapsed());
72
73        Ok(hash)
74    }
75
76    fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>>;
77
78    /// Get the lag status between this node and the leader.
79    async fn lag(&self) -> anyhow::Result<LagStatus>;
80}