stratus/eth/follower/importer/
importer_supervisor.rs

1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4
5use anyhow::bail;
6use futures::try_join;
7use tokio::sync::mpsc;
8
9use crate::eth::executor::Executor;
10use crate::eth::follower::consensus::Consensus;
11use crate::eth::follower::consensus::LagDirection;
12use crate::eth::follower::consensus::LagStatus;
13use crate::eth::follower::importer::EXTERNAL_RPC_CURRENT_BLOCK;
14use crate::eth::follower::importer::ImporterMode;
15use crate::eth::follower::importer::LATEST_FETCHED_BLOCK_TIME;
16use crate::eth::follower::importer::fetchers::DataFetcher;
17use crate::eth::follower::importer::fetchers::block_with_changes::BlockWithChangesFetcher;
18use crate::eth::follower::importer::fetchers::block_with_receipts::BlockWithReceiptsFetcher;
19use crate::eth::follower::importer::importers::ImporterWorker;
20use crate::eth::follower::importer::importers::execution::ReexecutionWorker;
21use crate::eth::follower::importer::importers::fake_leader::FakeLeaderWorker;
22use crate::eth::follower::importer::importers::replication::ReplicationWorker;
23use crate::eth::follower::importer::start_number_fetcher;
24use crate::eth::miner::Miner;
25use crate::eth::primitives::BlockNumber;
26use crate::eth::storage::StratusStorage;
27use crate::ext::spawn;
28use crate::infra::BlockchainClient;
29use crate::infra::kafka::KafkaConnector;
30#[cfg(feature = "metrics")]
31use crate::infra::metrics;
32use crate::utils::DropTimer;
33
34type ReexecutionFollower = ImporterSupervisor<BlockWithReceiptsFetcher, ReexecutionWorker>;
35type FakeLeader = ImporterSupervisor<BlockWithReceiptsFetcher, FakeLeaderWorker>;
36type ReplicationFollower = ImporterSupervisor<BlockWithChangesFetcher, ReplicationWorker>;
37
38pub struct ImporterSupervisor<Fetcher, Importer>
39where
40    Fetcher: DataFetcher,
41    Importer: ImporterWorker<DataType = Fetcher::PostProcessType>,
42{
43    fetcher: Fetcher,
44    importer: Importer,
45}
46
47impl ReexecutionFollower {
48    fn new(executor: Arc<Executor>, miner: Arc<Miner>, chain: Arc<BlockchainClient>, kafka_connector: Option<KafkaConnector>) -> Self {
49        let importer = ReexecutionWorker {
50            executor,
51            miner,
52            kafka_connector,
53        };
54
55        let fetcher = BlockWithReceiptsFetcher { chain: Arc::clone(&chain) };
56
57        Self { fetcher, importer }
58    }
59}
60
61impl FakeLeader {
62    fn new(executor: Arc<Executor>, miner: Arc<Miner>, chain: Arc<BlockchainClient>) -> Self {
63        let importer = FakeLeaderWorker { executor, miner };
64
65        let fetcher = BlockWithReceiptsFetcher { chain: Arc::clone(&chain) };
66
67        Self { fetcher, importer }
68    }
69}
70
71impl ReplicationFollower {
72    fn new(storage: Arc<StratusStorage>, miner: Arc<Miner>, chain: Arc<BlockchainClient>, kafka_connector: Option<KafkaConnector>) -> Self {
73        let importer = ReplicationWorker { miner, kafka_connector };
74
75        let fetcher = BlockWithChangesFetcher { chain, storage };
76
77        Self { fetcher, importer }
78    }
79}
80
81impl<Fetcher, Importer> ImporterSupervisor<Fetcher, Importer>
82where
83    Fetcher: DataFetcher + 'static,
84    Importer: ImporterWorker<DataType = Fetcher::PostProcessType> + 'static,
85{
86    async fn run(self, resume_from: BlockNumber, sync_interval: Duration, chain: Arc<BlockchainClient>) -> anyhow::Result<()> {
87        let _timer = DropTimer::start("importer-online::run_importer_online");
88
89        // Spawn common tasks: number fetcher
90        let number_fetcher_task = spawn("importer::number-fetcher", start_number_fetcher(Arc::clone(&chain), sync_interval));
91
92        let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
93        let importer_task = spawn("importer::importer", self.importer.run(backlog_rx));
94
95        let fetcher_task = spawn("importer::fetcher", self.fetcher.run(backlog_tx, resume_from));
96
97        let results = try_join!(importer_task, fetcher_task, number_fetcher_task)?;
98        results.0?;
99        results.1?;
100        results.2?;
101
102        Ok(())
103    }
104}
105
106pub async fn start_importer(
107    importer_mode: ImporterMode,
108    storage: Arc<StratusStorage>,
109    executor: Arc<Executor>,
110    miner: Arc<Miner>,
111    chain: Arc<BlockchainClient>,
112    kafka_connector: Option<KafkaConnector>,
113    sync_interval: Duration,
114) -> anyhow::Result<()> {
115    let resume_from: BlockNumber = storage.read_block_number_to_resume_import()?;
116
117    match importer_mode {
118        ImporterMode::BlockWithChanges => {
119            ReplicationFollower::new(storage, miner, Arc::clone(&chain), kafka_connector)
120                .run(resume_from, sync_interval, chain)
121                .await?;
122        }
123        ImporterMode::ReexecutionFollower => {
124            ReexecutionFollower::new(executor, miner, Arc::clone(&chain), kafka_connector)
125                .run(resume_from, sync_interval, chain)
126                .await?;
127        }
128        ImporterMode::FakeLeader =>
129            FakeLeader::new(executor, miner, Arc::clone(&chain))
130                .run(resume_from, sync_interval, chain)
131                .await?,
132    }
133    Ok(())
134}
135pub struct ImporterConsensus {
136    pub storage: Arc<StratusStorage>,
137    pub chain: Arc<BlockchainClient>,
138}
139
140impl Consensus for ImporterConsensus {
141    async fn lag(&self) -> anyhow::Result<LagStatus> {
142        let last_fetched_time = LATEST_FETCHED_BLOCK_TIME.load(Ordering::Relaxed);
143
144        if last_fetched_time == 0 {
145            bail!("stratus has not been able to connect to the leader yet");
146        }
147
148        let elapsed = chrono::Utc::now().timestamp() as u64 - last_fetched_time;
149        if elapsed > 4 {
150            Err(anyhow::anyhow!(
151                "too much time elapsed without communicating with the leader. elapsed: {elapsed}s"
152            ))
153        } else {
154            let leader_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst);
155            let follower_block = self.storage.read_mined_block_number().as_u64();
156
157            let direction = if follower_block > leader_block {
158                LagDirection::Ahead
159            } else {
160                LagDirection::Behind
161            };
162            let distance = leader_block.abs_diff(follower_block);
163            #[cfg(feature = "metrics")]
164            metrics::set_importer_online_lag_blocks(distance, direction.as_ref());
165
166            Ok(LagStatus { distance, direction })
167        }
168    }
169
170    fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
171        Ok(&self.chain)
172    }
173}