stratus/eth/follower/importer/
mod.rs1mod fetchers;
2pub(crate) mod importer_config;
3#[allow(clippy::module_inception)]
4mod importer_supervisor;
5mod importers;
6use std::borrow::Cow;
7use std::sync::Arc;
8use std::sync::atomic::AtomicU64;
9use std::sync::atomic::Ordering;
10use std::time::Duration;
11
12use anyhow::bail;
13pub use importer_config::ImporterConfig;
14pub use importer_supervisor::ImporterConsensus;
15use itertools::Itertools;
16use tokio::sync::mpsc;
17use tokio::time::timeout;
18use tracing::Span;
19
20use crate::GlobalState;
21use crate::eth::primitives::Block;
22use crate::eth::primitives::BlockNumber;
23use crate::eth::primitives::ExecutionChanges;
24use crate::eth::storage::StratusStorage;
25use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
26use crate::ext::DisplayExt;
27use crate::ext::SleepReason;
28use crate::ext::traced_sleep;
29use crate::globals::IMPORTER_ONLINE_TASKS_SEMAPHORE;
30use crate::infra::BlockchainClient;
31use crate::infra::kafka::KafkaConnector;
32#[cfg(feature = "metrics")]
33use crate::infra::metrics;
34use crate::infra::tracing::SpanExt;
35use crate::ledger::events::transaction_to_events;
36use crate::log_and_err;
37
38#[derive(Clone, Copy)]
39pub enum ImporterMode {
40 ReexecutionFollower,
42 FakeLeader,
44 BlockWithChanges,
46}
47
48static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);
54
55static LATEST_FETCHED_BLOCK_TIME: AtomicU64 = AtomicU64::new(0);
57
58fn set_external_rpc_current_block(new_number: BlockNumber) {
60 LATEST_FETCHED_BLOCK_TIME.store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
61 let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
62 Some(current_number.max(new_number.as_u64()))
63 });
64}
65
66const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);
72pub const TASKS_COUNT: usize = 3;
73
74async fn receive_with_timeout<T>(rx: &mut mpsc::Receiver<T>) -> anyhow::Result<Option<T>> {
75 match timeout(Duration::from_secs(2), rx.recv()).await {
76 Ok(Some(inner)) => Ok(Some(inner)),
77 Ok(None) => bail!("channel closed"),
78 Err(_timed_out) => {
79 tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
80 Ok(None)
81 }
82 }
83}
84
85async fn send_block_to_kafka(kafka_connector: &Option<KafkaConnector>, block: &Block) -> anyhow::Result<()> {
87 if let Some(kafka_conn) = kafka_connector {
88 let events = block
89 .transactions
90 .iter()
91 .flat_map(|tx| transaction_to_events(block.header.timestamp, Cow::Borrowed(tx)));
92
93 kafka_conn.send_buffered(events, 50).await?;
94 }
95 Ok(())
96}
97
98#[cfg(feature = "metrics")]
100fn record_import_metrics(block_tx_len: usize, start: std::time::Instant) {
101 metrics::inc_n_importer_online_transactions_total(block_tx_len as u64);
102 metrics::inc_import_online_mined_block(start.elapsed());
103}
104
105async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) -> anyhow::Result<()> {
111 const TASK_NAME: &str = "external-number-fetcher";
112 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
113
114 let mut sub_new_heads = if chain.supports_ws() {
117 tracing::info!("{} subscribing to newHeads event", TASK_NAME);
118
119 match chain.subscribe_new_heads().await {
120 Ok(sub) => {
121 tracing::info!("{} subscribed to newHeads events", TASK_NAME);
122 Some(sub)
123 }
124 Err(e) => {
125 let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
126 return log_and_err!(reason = e, message);
127 }
128 }
129 } else {
130 tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME);
131 None
132 };
133
134 loop {
136 if should_shutdown(TASK_NAME) {
137 return Ok(());
138 }
139
140 if let Some(sub) = &mut sub_new_heads {
143 tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME);
144 match timeout(TIMEOUT_NEW_HEADS, sub.next()).await {
145 Ok(Some(Ok(block))) => {
146 tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME);
147 set_external_rpc_current_block(block.number());
148 continue;
149 }
150 Ok(None) =>
151 if !should_shutdown(TASK_NAME) {
152 tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME);
153 },
154 Ok(Some(Err(e))) =>
155 if !should_shutdown(TASK_NAME) {
156 tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME);
157 },
158 Err(_) =>
159 if !should_shutdown(TASK_NAME) {
160 tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME);
161 },
162 }
163
164 if should_shutdown(TASK_NAME) {
165 return Ok(());
166 }
167
168 if chain.supports_ws() {
171 tracing::info!("{} resubscribing to newHeads event", TASK_NAME);
172 match chain.subscribe_new_heads().await {
173 Ok(sub) => {
174 tracing::info!("{} resubscribed to newHeads event", TASK_NAME);
175 sub_new_heads = Some(sub);
176 }
177 Err(e) =>
178 if !should_shutdown(TASK_NAME) {
179 tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME);
180 },
181 }
182 }
183 }
184
185 if should_shutdown(TASK_NAME) {
186 return Ok(());
187 }
188
189 tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME);
191 match chain.fetch_block_number().await {
192 Ok(block_number) => {
193 tracing::info!(
194 %block_number,
195 sync_interval = %sync_interval.to_string_ext(),
196 "fetched current block number via http. awaiting sync interval to retrieve again."
197 );
198 set_external_rpc_current_block(block_number);
199 traced_sleep(sync_interval, SleepReason::SyncData).await;
200 }
201 Err(e) =>
202 if !should_shutdown(TASK_NAME) {
203 tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
204 },
205 }
206 }
207}
208
209fn should_shutdown(task_name: &str) -> bool {
210 GlobalState::is_shutdown_warn(task_name) || GlobalState::is_importer_shutdown_warn(task_name)
211}
212
213fn create_execution_changes(storage: &Arc<StratusStorage>, changes: BlockChangesRocksdb) -> anyhow::Result<ExecutionChanges> {
214 let addresses = changes.account_changes.keys().copied().map_into().collect_vec();
215 let accounts = storage.perm.read_accounts(addresses)?;
216 let mut exec_changes = changes.to_incomplete_execution_changes();
217 for (addr, acc) in accounts {
218 match exec_changes.accounts.entry(addr) {
219 std::collections::hash_map::Entry::Occupied(mut entry) => {
220 let item = entry.get_mut();
221 item.apply_original(acc);
222 }
223 std::collections::hash_map::Entry::Vacant(_) => unreachable!("we got the addresses from the changes"),
224 }
225 }
226 Ok(exec_changes)
227}
228
229async fn fetch_with_retry<T, F, Fut>(block_number: BlockNumber, fetch_fn: F, operation_name: &str) -> T
235where
236 F: Fn(BlockNumber) -> Fut,
237 Fut: std::future::Future<Output = anyhow::Result<Option<T>>>,
238{
239 const RETRY_DELAY: Duration = Duration::from_millis(10);
240 Span::with(|s| {
241 s.rec_str("block_number", &block_number);
242 });
243
244 loop {
245 tracing::info!(%block_number, "fetching {}", operation_name);
246
247 match fetch_fn(block_number).await {
248 Ok(Some(response)) => return response,
249 Ok(None) => {
250 tracing::warn!(
251 %block_number,
252 delay_ms = %RETRY_DELAY.as_millis(),
253 "{} not available yet, retrying with delay.",
254 operation_name
255 );
256 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
257 }
258 Err(e) => {
259 tracing::warn!(
260 reason = ?e,
261 %block_number,
262 delay_ms = %RETRY_DELAY.as_millis(),
263 "failed to fetch {}, retrying with delay.",
264 operation_name
265 );
266 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
267 }
268 };
269 }
270}