stratus/eth/follower/importer/
mod.rs

1mod fetchers;
2pub(crate) mod importer_config;
3#[allow(clippy::module_inception)]
4mod importer_supervisor;
5mod importers;
6use std::borrow::Cow;
7use std::sync::Arc;
8use std::sync::atomic::AtomicU64;
9use std::sync::atomic::Ordering;
10use std::time::Duration;
11
12use anyhow::bail;
13pub use importer_config::ImporterConfig;
14pub use importer_supervisor::ImporterConsensus;
15use itertools::Itertools;
16use tokio::sync::mpsc;
17use tokio::time::timeout;
18use tracing::Span;
19
20use crate::GlobalState;
21use crate::eth::primitives::Block;
22use crate::eth::primitives::BlockNumber;
23use crate::eth::primitives::ExecutionChanges;
24use crate::eth::storage::StratusStorage;
25use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
26use crate::ext::DisplayExt;
27use crate::ext::SleepReason;
28use crate::ext::traced_sleep;
29use crate::globals::IMPORTER_ONLINE_TASKS_SEMAPHORE;
30use crate::infra::BlockchainClient;
31use crate::infra::kafka::KafkaConnector;
32#[cfg(feature = "metrics")]
33use crate::infra::metrics;
34use crate::infra::tracing::SpanExt;
35use crate::ledger::events::transaction_to_events;
36use crate::log_and_err;
37
38#[derive(Clone, Copy)]
39pub enum ImporterMode {
40    /// A normal follower imports a mined block.
41    ReexecutionFollower,
42    /// Fake leader feches a block, re-executes its txs and then mines it's own block.
43    FakeLeader,
44    /// Fetch a block with pre-computed changes
45    BlockWithChanges,
46}
47
48// -----------------------------------------------------------------------------
49// Globals
50// -----------------------------------------------------------------------------
51
52/// Current block number of the external RPC blockchain.
53static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);
54
55/// Timestamp of when EXTERNAL_RPC_CURRENT_BLOCK was updated last.
56static LATEST_FETCHED_BLOCK_TIME: AtomicU64 = AtomicU64::new(0);
57
58/// Only sets the external RPC current block number if it is equals or greater than the current one.
59fn set_external_rpc_current_block(new_number: BlockNumber) {
60    LATEST_FETCHED_BLOCK_TIME.store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
61    let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
62        Some(current_number.max(new_number.as_u64()))
63    });
64}
65
66// -----------------------------------------------------------------------------
67// Constants
68// -----------------------------------------------------------------------------
69
70/// Timeout awaiting for newHeads event before fallback to polling.
71const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);
72pub const TASKS_COUNT: usize = 3;
73
74async fn receive_with_timeout<T>(rx: &mut mpsc::Receiver<T>) -> anyhow::Result<Option<T>> {
75    match timeout(Duration::from_secs(2), rx.recv()).await {
76        Ok(Some(inner)) => Ok(Some(inner)),
77        Ok(None) => bail!("channel closed"),
78        Err(_timed_out) => {
79            tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
80            Ok(None)
81        }
82    }
83}
84
85/// Send block transactions to Kafka
86async fn send_block_to_kafka(kafka_connector: &Option<KafkaConnector>, block: &Block) -> anyhow::Result<()> {
87    if let Some(kafka_conn) = kafka_connector {
88        let events = block
89            .transactions
90            .iter()
91            .flat_map(|tx| transaction_to_events(block.header.timestamp, Cow::Borrowed(tx)));
92
93        kafka_conn.send_buffered(events, 50).await?;
94    }
95    Ok(())
96}
97
98/// Record metrics for imported block
99#[cfg(feature = "metrics")]
100fn record_import_metrics(block_tx_len: usize, start: std::time::Instant) {
101    metrics::inc_n_importer_online_transactions_total(block_tx_len as u64);
102    metrics::inc_import_online_mined_block(start.elapsed());
103}
104
105// -----------------------------------------------------------------------------
106// Number fetcher
107// -----------------------------------------------------------------------------
108
109/// Retrieves the blockchain current block number.
110async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) -> anyhow::Result<()> {
111    const TASK_NAME: &str = "external-number-fetcher";
112    let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
113
114    // initial newHeads subscriptions.
115    // abort application if cannot subscribe.
116    let mut sub_new_heads = if chain.supports_ws() {
117        tracing::info!("{} subscribing to newHeads event", TASK_NAME);
118
119        match chain.subscribe_new_heads().await {
120            Ok(sub) => {
121                tracing::info!("{} subscribed to newHeads events", TASK_NAME);
122                Some(sub)
123            }
124            Err(e) => {
125                let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
126                return log_and_err!(reason = e, message);
127            }
128        }
129    } else {
130        tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME);
131        None
132    };
133
134    // keep reading websocket subscription or polling via http.
135    loop {
136        if should_shutdown(TASK_NAME) {
137            return Ok(());
138        }
139
140        // if we have a subscription, try to read from subscription.
141        // in case of failure, re-subscribe because current subscription may have been closed in the server.
142        if let Some(sub) = &mut sub_new_heads {
143            tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME);
144            match timeout(TIMEOUT_NEW_HEADS, sub.next()).await {
145                Ok(Some(Ok(block))) => {
146                    tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME);
147                    set_external_rpc_current_block(block.number());
148                    continue;
149                }
150                Ok(None) =>
151                    if !should_shutdown(TASK_NAME) {
152                        tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME);
153                    },
154                Ok(Some(Err(e))) =>
155                    if !should_shutdown(TASK_NAME) {
156                        tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME);
157                    },
158                Err(_) =>
159                    if !should_shutdown(TASK_NAME) {
160                        tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME);
161                    },
162            }
163
164            if should_shutdown(TASK_NAME) {
165                return Ok(());
166            }
167
168            // resubscribe if necessary.
169            // only update the existing subscription if succedeed, otherwise we will try again in the next iteration.
170            if chain.supports_ws() {
171                tracing::info!("{} resubscribing to newHeads event", TASK_NAME);
172                match chain.subscribe_new_heads().await {
173                    Ok(sub) => {
174                        tracing::info!("{} resubscribed to newHeads event", TASK_NAME);
175                        sub_new_heads = Some(sub);
176                    }
177                    Err(e) =>
178                        if !should_shutdown(TASK_NAME) {
179                            tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME);
180                        },
181                }
182            }
183        }
184
185        if should_shutdown(TASK_NAME) {
186            return Ok(());
187        }
188
189        // fallback to polling
190        tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME);
191        match chain.fetch_block_number().await {
192            Ok(block_number) => {
193                tracing::info!(
194                    %block_number,
195                    sync_interval = %sync_interval.to_string_ext(),
196                    "fetched current block number via http. awaiting sync interval to retrieve again."
197                );
198                set_external_rpc_current_block(block_number);
199                traced_sleep(sync_interval, SleepReason::SyncData).await;
200            }
201            Err(e) =>
202                if !should_shutdown(TASK_NAME) {
203                    tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
204                },
205        }
206    }
207}
208
209fn should_shutdown(task_name: &str) -> bool {
210    GlobalState::is_shutdown_warn(task_name) || GlobalState::is_importer_shutdown_warn(task_name)
211}
212
213fn create_execution_changes(storage: &Arc<StratusStorage>, changes: BlockChangesRocksdb) -> anyhow::Result<ExecutionChanges> {
214    let addresses = changes.account_changes.keys().copied().map_into().collect_vec();
215    let accounts = storage.perm.read_accounts(addresses)?;
216    let mut exec_changes = changes.to_incomplete_execution_changes();
217    for (addr, acc) in accounts {
218        match exec_changes.accounts.entry(addr) {
219            std::collections::hash_map::Entry::Occupied(mut entry) => {
220                let item = entry.get_mut();
221                item.apply_original(acc);
222            }
223            std::collections::hash_map::Entry::Vacant(_) => unreachable!("we got the addresses from the changes"),
224        }
225    }
226    Ok(exec_changes)
227}
228
229// -----------------------------------------------------------------------------
230// Helpers
231// -----------------------------------------------------------------------------
232
233/// Generic retry logic for fetching data from blockchain
234async fn fetch_with_retry<T, F, Fut>(block_number: BlockNumber, fetch_fn: F, operation_name: &str) -> T
235where
236    F: Fn(BlockNumber) -> Fut,
237    Fut: std::future::Future<Output = anyhow::Result<Option<T>>>,
238{
239    const RETRY_DELAY: Duration = Duration::from_millis(10);
240    Span::with(|s| {
241        s.rec_str("block_number", &block_number);
242    });
243
244    loop {
245        tracing::info!(%block_number, "fetching {}", operation_name);
246
247        match fetch_fn(block_number).await {
248            Ok(Some(response)) => return response,
249            Ok(None) => {
250                tracing::warn!(
251                    %block_number,
252                    delay_ms = %RETRY_DELAY.as_millis(),
253                    "{} not available yet, retrying with delay.",
254                    operation_name
255                );
256                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
257            }
258            Err(e) => {
259                tracing::warn!(
260                    reason = ?e,
261                    %block_number,
262                    delay_ms = %RETRY_DELAY.as_millis(),
263                    "failed to fetch {}, retrying with delay.",
264                    operation_name
265                );
266                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
267            }
268        };
269    }
270}