stratus/eth/follower/importer/
importer_supervisor.rs1use std::sync::Arc;
2use std::sync::atomic::Ordering;
3use std::time::Duration;
4
5use anyhow::bail;
6use futures::try_join;
7use tokio::sync::mpsc;
8
9use crate::eth::executor::Executor;
10use crate::eth::follower::consensus::Consensus;
11use crate::eth::follower::consensus::LagDirection;
12use crate::eth::follower::consensus::LagStatus;
13use crate::eth::follower::importer::EXTERNAL_RPC_CURRENT_BLOCK;
14use crate::eth::follower::importer::ImporterMode;
15use crate::eth::follower::importer::LATEST_FETCHED_BLOCK_TIME;
16use crate::eth::follower::importer::fetchers::DataFetcher;
17use crate::eth::follower::importer::fetchers::block_with_changes::BlockWithChangesFetcher;
18use crate::eth::follower::importer::fetchers::block_with_receipts::BlockWithReceiptsFetcher;
19use crate::eth::follower::importer::importers::ImporterWorker;
20use crate::eth::follower::importer::importers::execution::ReexecutionWorker;
21use crate::eth::follower::importer::importers::fake_leader::FakeLeaderWorker;
22use crate::eth::follower::importer::importers::replication::ReplicationWorker;
23use crate::eth::follower::importer::start_number_fetcher;
24use crate::eth::miner::Miner;
25use crate::eth::primitives::BlockNumber;
26use crate::eth::storage::StratusStorage;
27use crate::ext::spawn;
28use crate::infra::BlockchainClient;
29use crate::infra::kafka::KafkaConnector;
30#[cfg(feature = "metrics")]
31use crate::infra::metrics;
32use crate::utils::DropTimer;
33
34type ReexecutionFollower = ImporterSupervisor<BlockWithReceiptsFetcher, ReexecutionWorker>;
35type FakeLeader = ImporterSupervisor<BlockWithReceiptsFetcher, FakeLeaderWorker>;
36type ReplicationFollower = ImporterSupervisor<BlockWithChangesFetcher, ReplicationWorker>;
37
38pub struct ImporterSupervisor<Fetcher, Importer>
39where
40 Fetcher: DataFetcher,
41 Importer: ImporterWorker<DataType = Fetcher::PostProcessType>,
42{
43 fetcher: Fetcher,
44 importer: Importer,
45}
46
47impl ReexecutionFollower {
48 fn new(executor: Arc<Executor>, miner: Arc<Miner>, chain: Arc<BlockchainClient>, kafka_connector: Option<KafkaConnector>) -> Self {
49 let importer = ReexecutionWorker {
50 executor,
51 miner,
52 kafka_connector,
53 };
54
55 let fetcher = BlockWithReceiptsFetcher { chain: Arc::clone(&chain) };
56
57 Self { fetcher, importer }
58 }
59}
60
61impl FakeLeader {
62 fn new(executor: Arc<Executor>, miner: Arc<Miner>, chain: Arc<BlockchainClient>) -> Self {
63 let importer = FakeLeaderWorker { executor, miner };
64
65 let fetcher = BlockWithReceiptsFetcher { chain: Arc::clone(&chain) };
66
67 Self { fetcher, importer }
68 }
69}
70
71impl ReplicationFollower {
72 fn new(storage: Arc<StratusStorage>, miner: Arc<Miner>, chain: Arc<BlockchainClient>, kafka_connector: Option<KafkaConnector>) -> Self {
73 let importer = ReplicationWorker { miner, kafka_connector };
74
75 let fetcher = BlockWithChangesFetcher { chain, storage };
76
77 Self { fetcher, importer }
78 }
79}
80
81impl<Fetcher, Importer> ImporterSupervisor<Fetcher, Importer>
82where
83 Fetcher: DataFetcher + 'static,
84 Importer: ImporterWorker<DataType = Fetcher::PostProcessType> + 'static,
85{
86 async fn run(self, resume_from: BlockNumber, sync_interval: Duration, chain: Arc<BlockchainClient>) -> anyhow::Result<()> {
87 let _timer = DropTimer::start("importer-online::run_importer_online");
88
89 let number_fetcher_task = spawn("importer::number-fetcher", start_number_fetcher(Arc::clone(&chain), sync_interval));
91
92 let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
93 let importer_task = spawn("importer::importer", self.importer.run(backlog_rx));
94
95 let fetcher_task = spawn("importer::fetcher", self.fetcher.run(backlog_tx, resume_from));
96
97 let results = try_join!(importer_task, fetcher_task, number_fetcher_task)?;
98 results.0?;
99 results.1?;
100 results.2?;
101
102 Ok(())
103 }
104}
105
106pub async fn start_importer(
107 importer_mode: ImporterMode,
108 storage: Arc<StratusStorage>,
109 executor: Arc<Executor>,
110 miner: Arc<Miner>,
111 chain: Arc<BlockchainClient>,
112 kafka_connector: Option<KafkaConnector>,
113 sync_interval: Duration,
114) -> anyhow::Result<()> {
115 let resume_from: BlockNumber = storage.read_block_number_to_resume_import()?;
116
117 match importer_mode {
118 ImporterMode::BlockWithChanges => {
119 ReplicationFollower::new(storage, miner, Arc::clone(&chain), kafka_connector)
120 .run(resume_from, sync_interval, chain)
121 .await?;
122 }
123 ImporterMode::ReexecutionFollower => {
124 ReexecutionFollower::new(executor, miner, Arc::clone(&chain), kafka_connector)
125 .run(resume_from, sync_interval, chain)
126 .await?;
127 }
128 ImporterMode::FakeLeader =>
129 FakeLeader::new(executor, miner, Arc::clone(&chain))
130 .run(resume_from, sync_interval, chain)
131 .await?,
132 }
133 Ok(())
134}
135pub struct ImporterConsensus {
136 pub storage: Arc<StratusStorage>,
137 pub chain: Arc<BlockchainClient>,
138}
139
140impl Consensus for ImporterConsensus {
141 async fn lag(&self) -> anyhow::Result<LagStatus> {
142 let last_fetched_time = LATEST_FETCHED_BLOCK_TIME.load(Ordering::Relaxed);
143
144 if last_fetched_time == 0 {
145 bail!("stratus has not been able to connect to the leader yet");
146 }
147
148 let elapsed = chrono::Utc::now().timestamp() as u64 - last_fetched_time;
149 if elapsed > 4 {
150 Err(anyhow::anyhow!(
151 "too much time elapsed without communicating with the leader. elapsed: {elapsed}s"
152 ))
153 } else {
154 let leader_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst);
155 let follower_block = self.storage.read_mined_block_number().as_u64();
156
157 let direction = if follower_block > leader_block {
158 LagDirection::Ahead
159 } else {
160 LagDirection::Behind
161 };
162 let distance = leader_block.abs_diff(follower_block);
163 #[cfg(feature = "metrics")]
164 metrics::set_importer_online_lag_blocks(distance, direction.as_ref());
165
166 Ok(LagStatus { distance, direction })
167 }
168 }
169
170 fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
171 Ok(&self.chain)
172 }
173}