stratus/eth/follower/importer/
importer.rs

1use std::borrow::Cow;
2use std::cmp::min;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU64;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7
8use alloy_rpc_types_eth::BlockTransactions;
9use anyhow::anyhow;
10use futures::StreamExt;
11use futures::try_join;
12use tokio::sync::mpsc;
13use tokio::task::yield_now;
14use tokio::time::timeout;
15use tracing::Span;
16
17use crate::GlobalState;
18use crate::eth::executor::Executor;
19use crate::eth::follower::consensus::Consensus;
20use crate::eth::miner::Miner;
21use crate::eth::miner::miner::CommitItem;
22use crate::eth::miner::miner::interval_miner::mine_and_commit;
23use crate::eth::primitives::Block;
24use crate::eth::primitives::BlockNumber;
25use crate::eth::primitives::ExecutionChanges;
26use crate::eth::primitives::ExternalBlock;
27use crate::eth::primitives::ExternalReceipt;
28use crate::eth::primitives::ExternalReceipts;
29use crate::eth::primitives::StratusError;
30use crate::eth::primitives::TransactionError;
31use crate::eth::storage::StratusStorage;
32use crate::ext::DisplayExt;
33use crate::ext::SleepReason;
34use crate::ext::spawn;
35use crate::ext::traced_sleep;
36use crate::globals::IMPORTER_ONLINE_TASKS_SEMAPHORE;
37use crate::infra::BlockchainClient;
38use crate::infra::kafka::KafkaConnector;
39#[cfg(feature = "metrics")]
40use crate::infra::metrics;
41use crate::infra::tracing::SpanExt;
42use crate::infra::tracing::warn_task_rx_closed;
43use crate::infra::tracing::warn_task_tx_closed;
44use crate::ledger::events::transaction_to_events;
45use crate::log_and_err;
46use crate::utils::DropTimer;
47#[cfg(feature = "metrics")]
48use crate::utils::calculate_tps;
49
50#[derive(Clone, Copy)]
51pub enum ImporterMode {
52    /// A normal follower imports a mined block.
53    NormalFollower,
54    /// Fake leader feches a block, re-executes its txs and then mines it's own block.
55    FakeLeader,
56    /// Fetch a block with pre-computed changes
57    BlockWithChanges,
58}
59
60// -----------------------------------------------------------------------------
61// Globals
62// -----------------------------------------------------------------------------
63
64/// Current block number of the external RPC blockchain.
65static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);
66
67/// Timestamp of when EXTERNAL_RPC_CURRENT_BLOCK was updated last.
68static LATEST_FETCHED_BLOCK_TIME: AtomicU64 = AtomicU64::new(0);
69
70/// Only sets the external RPC current block number if it is equals or greater than the current one.
71fn set_external_rpc_current_block(new_number: BlockNumber) {
72    LATEST_FETCHED_BLOCK_TIME.store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
73    let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
74        Some(current_number.max(new_number.as_u64()))
75    });
76}
77
78// -----------------------------------------------------------------------------
79// Constants
80// -----------------------------------------------------------------------------
81/// Number of blocks that are downloaded in parallel.
82const PARALLEL_BLOCKS: usize = 3;
83
84/// Timeout awaiting for newHeads event before fallback to polling.
85const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);
86
87pub struct Importer {
88    executor: Arc<Executor>,
89
90    miner: Arc<Miner>,
91
92    storage: Arc<StratusStorage>,
93
94    chain: Arc<BlockchainClient>,
95
96    sync_interval: Duration,
97
98    kafka_connector: Option<Arc<KafkaConnector>>,
99
100    importer_mode: ImporterMode,
101}
102
103impl Importer {
104    pub fn new(
105        executor: Arc<Executor>,
106        miner: Arc<Miner>,
107        storage: Arc<StratusStorage>,
108        chain: Arc<BlockchainClient>,
109        kafka_connector: Option<Arc<KafkaConnector>>,
110        sync_interval: Duration,
111        importer_mode: ImporterMode,
112    ) -> Self {
113        tracing::info!("creating importer");
114
115        Self {
116            executor,
117            miner,
118            storage,
119            chain,
120            sync_interval,
121            kafka_connector,
122            importer_mode,
123        }
124    }
125
126    // -----------------------------------------------------------------------------
127    // Shutdown
128    // -----------------------------------------------------------------------------
129
130    /// Checks if the importer should shutdown
131    fn should_shutdown(task_name: &str) -> bool {
132        GlobalState::is_shutdown_warn(task_name) || GlobalState::is_importer_shutdown_warn(task_name)
133    }
134
135    // -----------------------------------------------------------------------------
136    // Execution
137    // -----------------------------------------------------------------------------
138
139    pub async fn run_importer_online(self: Arc<Self>) -> anyhow::Result<()> {
140        let _timer = DropTimer::start("importer-online::run_importer_online");
141
142        let storage = &self.storage;
143        let number: BlockNumber = storage.read_block_number_to_resume_import()?;
144
145        match self.importer_mode {
146            ImporterMode::NormalFollower | ImporterMode::FakeLeader => {
147                // Use existing block fetcher and executor approach
148                let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
149
150                // Spawn block executor
151                let task_executor = spawn(
152                    "importer::executor",
153                    Importer::start_block_executor(
154                        Arc::clone(&self.executor),
155                        Arc::clone(&self.miner),
156                        backlog_rx,
157                        self.kafka_connector.clone(),
158                        self.importer_mode,
159                    ),
160                );
161
162                // Spawn block number fetcher
163                let number_fetcher_chain = Arc::clone(&self.chain);
164                let task_number_fetcher = spawn(
165                    "importer::number-fetcher",
166                    Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval),
167                );
168
169                // Spawn block fetcher
170                let block_fetcher_chain = Arc::clone(&self.chain);
171                let task_block_fetcher = spawn(
172                    "importer::block-fetcher",
173                    Importer::start_block_fetcher(block_fetcher_chain, backlog_tx, number),
174                );
175
176                // Await all tasks
177                let results = try_join!(task_executor, task_block_fetcher, task_number_fetcher)?;
178                results.0?;
179                results.1?;
180                results.2?;
181            }
182            ImporterMode::BlockWithChanges => {
183                // Use existing block fetcher and executor approach
184                let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
185
186                // Spawn block executor
187                let task_saver = spawn(
188                    "importer::executor",
189                    Importer::start_block_saver(Arc::clone(&self.miner), backlog_rx, self.kafka_connector.clone()),
190                );
191
192                // Spawn block number fetcher
193                let number_fetcher_chain = Arc::clone(&self.chain);
194                let task_number_fetcher = spawn(
195                    "importer::number-fetcher",
196                    Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval),
197                );
198
199                // Spawn block fetcher
200                let block_fetcher_chain = Arc::clone(&self.chain);
201                let task_block_fetcher = spawn(
202                    "importer::block-with-changes-fetcher",
203                    Importer::start_block_with_changes_fetcher(block_fetcher_chain, backlog_tx, number),
204                );
205
206                let results = try_join!(task_saver, task_block_fetcher, task_number_fetcher)?;
207                results.0?;
208                results.1?;
209                results.2?;
210            }
211        }
212
213        Ok(())
214    }
215
216    // -----------------------------------------------------------------------------
217    // Executor
218    // -----------------------------------------------------------------------------
219
220    pub const TASKS_COUNT: usize = 3;
221
222    // Executes external blocks and persist them to storage.
223    async fn start_block_executor(
224        executor: Arc<Executor>,
225        miner: Arc<Miner>,
226        mut backlog_rx: mpsc::Receiver<(ExternalBlock, Vec<ExternalReceipt>)>,
227        kafka_connector: Option<Arc<KafkaConnector>>,
228        importer_mode: ImporterMode,
229    ) -> anyhow::Result<()> {
230        const TASK_NAME: &str = "block-executor";
231        let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
232
233        loop {
234            if Self::should_shutdown(TASK_NAME) {
235                return Ok(());
236            }
237
238            let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await {
239                Ok(Some(inner)) => inner,
240                Ok(None) => break, // channel closed
241                Err(_timed_out) => {
242                    tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
243                    continue;
244                }
245            };
246
247            #[cfg(feature = "metrics")]
248            let (start, block_number, block_tx_len, receipts_len) = (metrics::now(), block.number(), block.transactions.len(), receipts.len());
249
250            if let ImporterMode::FakeLeader = importer_mode {
251                for tx in block.0.transactions.into_transactions() {
252                    tracing::info!(?tx, "executing tx as fake miner");
253                    if let Err(e) = executor.execute_local_transaction(tx.try_into()?) {
254                        match e {
255                            StratusError::Transaction(TransactionError::Nonce { transaction: _, account: _ }) => {
256                                tracing::warn!(reason = ?e, "transaction failed, was this node restarted?");
257                            }
258                            _ => {
259                                tracing::error!(reason = ?e, "transaction failed");
260                                GlobalState::shutdown_from("Importer (FakeMiner)", "Transaction Failed");
261                                return Err(anyhow!(e));
262                            }
263                        }
264                    }
265                }
266                mine_and_commit(&miner);
267                continue;
268            }
269
270            if let Err(e) = executor.execute_external_block(block.clone(), ExternalReceipts::from(receipts)) {
271                let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block");
272                return log_and_err!(reason = e, message);
273            };
274
275            // statistics
276            #[cfg(feature = "metrics")]
277            {
278                let duration = start.elapsed();
279                let tps = calculate_tps(duration, block_tx_len);
280
281                tracing::info!(
282                    tps,
283                    %block_number,
284                    duration = %duration.to_string_ext(),
285                    %receipts_len,
286                    "reexecuted external block",
287                );
288            }
289
290            let (mined_block, changes) = match miner.mine_external(block) {
291                Ok((mined_block, changes)) => {
292                    tracing::info!(number = %mined_block.number(), "mined external block");
293                    (mined_block, changes)
294                }
295                Err(e) => {
296                    let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block");
297                    return log_and_err!(reason = e, message);
298                }
299            };
300
301            if let Some(ref kafka_conn) = kafka_connector {
302                let events = mined_block
303                    .transactions
304                    .iter()
305                    .flat_map(|tx| transaction_to_events(mined_block.header.timestamp, Cow::Borrowed(tx)));
306
307                kafka_conn.send_buffered(events, 50).await?;
308            }
309
310            match miner.commit(CommitItem::Block(mined_block), changes) {
311                Ok(_) => {
312                    tracing::info!("committed external block");
313                }
314                Err(e) => {
315                    let message = GlobalState::shutdown_from(TASK_NAME, "failed to commit external block");
316                    return log_and_err!(reason = e, message);
317                }
318            }
319
320            #[cfg(feature = "metrics")]
321            {
322                metrics::inc_n_importer_online_transactions_total(receipts_len as u64);
323                metrics::inc_import_online_mined_block(start.elapsed());
324            }
325        }
326
327        warn_task_tx_closed(TASK_NAME);
328        Ok(())
329    }
330
331    async fn start_block_saver(
332        miner: Arc<Miner>,
333        mut backlog_rx: mpsc::Receiver<(Block, ExecutionChanges)>,
334        kafka_connector: Option<Arc<KafkaConnector>>,
335    ) -> anyhow::Result<()> {
336        const TASK_NAME: &str = "block-saver";
337        let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
338
339        loop {
340            if Self::should_shutdown(TASK_NAME) {
341                return Ok(());
342            }
343
344            let (block, changes) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await {
345                Ok(Some(inner)) => inner,
346                Ok(None) => break, // channel closed
347                Err(_timed_out) => {
348                    tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
349                    continue;
350                }
351            };
352
353            tracing::info!(block_number = %block.number(), "received block with changes");
354
355            #[cfg(feature = "metrics")]
356            let (start, block_tx_len) = (metrics::now(), block.transactions.len());
357
358            if let Some(ref kafka_conn) = kafka_connector {
359                let events = block
360                    .transactions
361                    .iter()
362                    .flat_map(|tx| transaction_to_events(block.header.timestamp, Cow::Borrowed(tx)));
363
364                kafka_conn.send_buffered(events, 50).await?;
365            }
366
367            miner.commit(CommitItem::ReplicationBlock(block), changes)?;
368
369            #[cfg(feature = "metrics")]
370            {
371                metrics::inc_n_importer_online_transactions_total(block_tx_len as u64);
372                metrics::inc_import_online_mined_block(start.elapsed());
373            }
374        }
375
376        warn_task_tx_closed(TASK_NAME);
377        Ok(())
378    }
379
380    // -----------------------------------------------------------------------------
381    // Number fetcher
382    // -----------------------------------------------------------------------------
383
384    /// Retrieves the blockchain current block number.
385    async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) -> anyhow::Result<()> {
386        const TASK_NAME: &str = "external-number-fetcher";
387        let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
388
389        // initial newHeads subscriptions.
390        // abort application if cannot subscribe.
391        let mut sub_new_heads = if chain.supports_ws() {
392            tracing::info!("{} subscribing to newHeads event", TASK_NAME);
393
394            match chain.subscribe_new_heads().await {
395                Ok(sub) => {
396                    tracing::info!("{} subscribed to newHeads events", TASK_NAME);
397                    Some(sub)
398                }
399                Err(e) => {
400                    let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
401                    return log_and_err!(reason = e, message);
402                }
403            }
404        } else {
405            tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME);
406            None
407        };
408
409        // keep reading websocket subscription or polling via http.
410        loop {
411            if Self::should_shutdown(TASK_NAME) {
412                return Ok(());
413            }
414
415            // if we have a subscription, try to read from subscription.
416            // in case of failure, re-subscribe because current subscription may have been closed in the server.
417            if let Some(sub) = &mut sub_new_heads {
418                tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME);
419                match timeout(TIMEOUT_NEW_HEADS, sub.next()).await {
420                    Ok(Some(Ok(block))) => {
421                        tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME);
422                        set_external_rpc_current_block(block.number());
423                        continue;
424                    }
425                    Ok(None) =>
426                        if !Self::should_shutdown(TASK_NAME) {
427                            tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME);
428                        },
429                    Ok(Some(Err(e))) =>
430                        if !Self::should_shutdown(TASK_NAME) {
431                            tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME);
432                        },
433                    Err(_) =>
434                        if !Self::should_shutdown(TASK_NAME) {
435                            tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME);
436                        },
437                }
438
439                if Self::should_shutdown(TASK_NAME) {
440                    return Ok(());
441                }
442
443                // resubscribe if necessary.
444                // only update the existing subscription if succedeed, otherwise we will try again in the next iteration.
445                if chain.supports_ws() {
446                    tracing::info!("{} resubscribing to newHeads event", TASK_NAME);
447                    match chain.subscribe_new_heads().await {
448                        Ok(sub) => {
449                            tracing::info!("{} resubscribed to newHeads event", TASK_NAME);
450                            sub_new_heads = Some(sub);
451                        }
452                        Err(e) =>
453                            if !Self::should_shutdown(TASK_NAME) {
454                                tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME);
455                            },
456                    }
457                }
458            }
459
460            if Self::should_shutdown(TASK_NAME) {
461                return Ok(());
462            }
463
464            // fallback to polling
465            tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME);
466            match chain.fetch_block_number().await {
467                Ok(block_number) => {
468                    tracing::info!(
469                        %block_number,
470                        sync_interval = %sync_interval.to_string_ext(),
471                        "fetched current block number via http. awaiting sync interval to retrieve again."
472                    );
473                    set_external_rpc_current_block(block_number);
474                    traced_sleep(sync_interval, SleepReason::SyncData).await;
475                }
476                Err(e) =>
477                    if !Self::should_shutdown(TASK_NAME) {
478                        tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
479                    },
480            }
481        }
482    }
483
484    // -----------------------------------------------------------------------------
485    // Block fetcher
486    // -----------------------------------------------------------------------------
487
488    /// Retrieves blocks and receipts.
489    async fn start_block_fetcher(
490        chain: Arc<BlockchainClient>,
491        backlog_tx: mpsc::Sender<(ExternalBlock, Vec<ExternalReceipt>)>,
492        mut importer_block_number: BlockNumber,
493    ) -> anyhow::Result<()> {
494        const TASK_NAME: &str = "external-block-fetcher";
495        let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
496
497        loop {
498            if Self::should_shutdown(TASK_NAME) {
499                return Ok(());
500            }
501
502            // if we are ahead of current block number, await until we are behind again
503            let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
504            if importer_block_number.as_u64() > external_rpc_current_block {
505                yield_now().await;
506                continue;
507            }
508
509            // we are behind current, so we will fetch multiple blocks in parallel to catch up
510            let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber
511            let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
512            tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");
513
514            let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
515            while blocks_to_fetch > 0 {
516                blocks_to_fetch -= 1;
517                tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number));
518                importer_block_number = importer_block_number.next_block_number();
519            }
520
521            // keep fetching in order
522            let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
523            while let Some((mut block, mut receipts)) = tasks.next().await {
524                let block_number = block.number();
525                let BlockTransactions::Full(transactions) = &mut block.transactions else {
526                    return Err(anyhow!("expected full transactions, got hashes or uncle"));
527                };
528
529                if transactions.len() != receipts.len() {
530                    return Err(anyhow!(
531                        "block {} has mismatched transaction and receipt length: {} transactions but {} receipts",
532                        block_number,
533                        transactions.len(),
534                        receipts.len()
535                    ));
536                }
537
538                // Stably sort transactions and receipts by transaction_index
539                transactions.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
540                receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
541
542                // perform additional checks on the transaction index
543                for window in transactions.windows(2) {
544                    let tx_index = window[0].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
545                    let next_tx_index = window[1].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
546                    if tx_index + 1 != next_tx_index {
547                        tracing::error!(tx_index, next_tx_index, "two consecutive transactions must have consecutive indices");
548                    }
549                }
550                for window in receipts.windows(2) {
551                    let tx_index = window[0].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
552                    let next_tx_index = window[1].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
553                    if tx_index + 1 != next_tx_index {
554                        tracing::error!(tx_index, next_tx_index, "two consecutive receipts must have consecutive indices");
555                    }
556                }
557
558                if backlog_tx.send((block, receipts)).await.is_err() {
559                    warn_task_rx_closed(TASK_NAME);
560                    return Ok(());
561                }
562            }
563        }
564    }
565
566    /// Retrieves blocks with changes.
567    async fn start_block_with_changes_fetcher(
568        chain: Arc<BlockchainClient>,
569        backlog_tx: mpsc::Sender<(Block, ExecutionChanges)>,
570        mut importer_block_number: BlockNumber,
571    ) -> anyhow::Result<()> {
572        const TASK_NAME: &str = "block-with-changes-fetcher";
573        let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
574
575        loop {
576            if Self::should_shutdown(TASK_NAME) {
577                return Ok(());
578            }
579
580            // if we are ahead of current block number, await until we are behind again
581            let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
582            if importer_block_number.as_u64() > external_rpc_current_block {
583                yield_now().await;
584                continue;
585            }
586
587            // we are behind current, so we will fetch multiple blocks in parallel to catch up
588            let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber
589            let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
590            tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");
591
592            let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
593            while blocks_to_fetch > 0 {
594                blocks_to_fetch -= 1;
595                tasks.push(fetch_block_with_changes(Arc::clone(&chain), importer_block_number));
596                importer_block_number = importer_block_number.next_block_number();
597            }
598
599            // keep fetching in order
600            let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
601            while let Some(block) = tasks.next().await {
602                if backlog_tx.send(block).await.is_err() {
603                    warn_task_rx_closed(TASK_NAME);
604                    return Ok(());
605                }
606            }
607        }
608    }
609}
610
611// -----------------------------------------------------------------------------
612// Helpers
613// -----------------------------------------------------------------------------
614async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (ExternalBlock, Vec<ExternalReceipt>) {
615    const RETRY_DELAY: Duration = Duration::from_millis(10);
616    Span::with(|s| {
617        s.rec_str("block_number", &block_number);
618    });
619
620    loop {
621        tracing::info!(%block_number, "fetching block and receipts");
622
623        match chain.fetch_block_and_receipts(block_number).await {
624            Ok(Some(response)) => return (response.block, response.receipts),
625            Ok(None) => {
626                tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay.");
627                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
628            }
629            Err(e) => {
630                tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
631                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
632            }
633        };
634    }
635}
636
637async fn fetch_block_with_changes(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (Block, ExecutionChanges) {
638    const RETRY_DELAY: Duration = Duration::from_millis(10);
639    Span::with(|s| {
640        s.rec_str("block_number", &block_number);
641    });
642
643    loop {
644        tracing::info!(%block_number, "fetching block and changes");
645
646        match chain.fetch_block_with_changes(block_number).await {
647            Ok(Some(response)) => {
648                return response;
649            }
650            Ok(None) => {
651                tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay.");
652                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
653            }
654            Err(e) => {
655                tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
656                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
657            }
658        };
659    }
660}
661
662impl Consensus for Importer {
663    async fn lag(&self) -> anyhow::Result<u64> {
664        let elapsed = chrono::Utc::now().timestamp() as u64 - LATEST_FETCHED_BLOCK_TIME.load(Ordering::Relaxed);
665        if elapsed > 4 {
666            Err(anyhow::anyhow!(
667                "too much time elapsed without communicating with the leader. elapsed: {}s",
668                elapsed
669            ))
670        } else {
671            Ok(EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst) - self.storage.read_mined_block_number().as_u64())
672        }
673    }
674
675    fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
676        Ok(&self.chain)
677    }
678}