1use std::borrow::Cow;
2use std::cmp::min;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU64;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7
8use alloy_rpc_types_eth::BlockTransactions;
9use anyhow::anyhow;
10use futures::StreamExt;
11use futures::try_join;
12use tokio::sync::mpsc;
13use tokio::task::yield_now;
14use tokio::time::timeout;
15use tracing::Span;
16
17use crate::GlobalState;
18use crate::eth::executor::Executor;
19use crate::eth::follower::consensus::Consensus;
20use crate::eth::miner::Miner;
21use crate::eth::miner::miner::CommitItem;
22use crate::eth::miner::miner::interval_miner::mine_and_commit;
23use crate::eth::primitives::Block;
24use crate::eth::primitives::BlockNumber;
25use crate::eth::primitives::ExecutionChanges;
26use crate::eth::primitives::ExternalBlock;
27use crate::eth::primitives::ExternalReceipt;
28use crate::eth::primitives::ExternalReceipts;
29use crate::eth::primitives::StratusError;
30use crate::eth::primitives::TransactionError;
31use crate::eth::storage::StratusStorage;
32use crate::ext::DisplayExt;
33use crate::ext::SleepReason;
34use crate::ext::spawn;
35use crate::ext::traced_sleep;
36use crate::globals::IMPORTER_ONLINE_TASKS_SEMAPHORE;
37use crate::infra::BlockchainClient;
38use crate::infra::kafka::KafkaConnector;
39#[cfg(feature = "metrics")]
40use crate::infra::metrics;
41use crate::infra::tracing::SpanExt;
42use crate::infra::tracing::warn_task_rx_closed;
43use crate::infra::tracing::warn_task_tx_closed;
44use crate::ledger::events::transaction_to_events;
45use crate::log_and_err;
46use crate::utils::DropTimer;
47#[cfg(feature = "metrics")]
48use crate::utils::calculate_tps;
49
50#[derive(Clone, Copy)]
51pub enum ImporterMode {
52 NormalFollower,
54 FakeLeader,
56 BlockWithChanges,
58}
59
60static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);
66
67static LATEST_FETCHED_BLOCK_TIME: AtomicU64 = AtomicU64::new(0);
69
70fn set_external_rpc_current_block(new_number: BlockNumber) {
72 LATEST_FETCHED_BLOCK_TIME.store(chrono::Utc::now().timestamp() as u64, Ordering::Relaxed);
73 let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
74 Some(current_number.max(new_number.as_u64()))
75 });
76}
77
78const PARALLEL_BLOCKS: usize = 3;
83
84const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000);
86
87pub struct Importer {
88 executor: Arc<Executor>,
89
90 miner: Arc<Miner>,
91
92 storage: Arc<StratusStorage>,
93
94 chain: Arc<BlockchainClient>,
95
96 sync_interval: Duration,
97
98 kafka_connector: Option<Arc<KafkaConnector>>,
99
100 importer_mode: ImporterMode,
101}
102
103impl Importer {
104 pub fn new(
105 executor: Arc<Executor>,
106 miner: Arc<Miner>,
107 storage: Arc<StratusStorage>,
108 chain: Arc<BlockchainClient>,
109 kafka_connector: Option<Arc<KafkaConnector>>,
110 sync_interval: Duration,
111 importer_mode: ImporterMode,
112 ) -> Self {
113 tracing::info!("creating importer");
114
115 Self {
116 executor,
117 miner,
118 storage,
119 chain,
120 sync_interval,
121 kafka_connector,
122 importer_mode,
123 }
124 }
125
126 fn should_shutdown(task_name: &str) -> bool {
132 GlobalState::is_shutdown_warn(task_name) || GlobalState::is_importer_shutdown_warn(task_name)
133 }
134
135 pub async fn run_importer_online(self: Arc<Self>) -> anyhow::Result<()> {
140 let _timer = DropTimer::start("importer-online::run_importer_online");
141
142 let storage = &self.storage;
143 let number: BlockNumber = storage.read_block_number_to_resume_import()?;
144
145 match self.importer_mode {
146 ImporterMode::NormalFollower | ImporterMode::FakeLeader => {
147 let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
149
150 let task_executor = spawn(
152 "importer::executor",
153 Importer::start_block_executor(
154 Arc::clone(&self.executor),
155 Arc::clone(&self.miner),
156 backlog_rx,
157 self.kafka_connector.clone(),
158 self.importer_mode,
159 ),
160 );
161
162 let number_fetcher_chain = Arc::clone(&self.chain);
164 let task_number_fetcher = spawn(
165 "importer::number-fetcher",
166 Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval),
167 );
168
169 let block_fetcher_chain = Arc::clone(&self.chain);
171 let task_block_fetcher = spawn(
172 "importer::block-fetcher",
173 Importer::start_block_fetcher(block_fetcher_chain, backlog_tx, number),
174 );
175
176 let results = try_join!(task_executor, task_block_fetcher, task_number_fetcher)?;
178 results.0?;
179 results.1?;
180 results.2?;
181 }
182 ImporterMode::BlockWithChanges => {
183 let (backlog_tx, backlog_rx) = mpsc::channel(10_000);
185
186 let task_saver = spawn(
188 "importer::executor",
189 Importer::start_block_saver(Arc::clone(&self.miner), backlog_rx, self.kafka_connector.clone()),
190 );
191
192 let number_fetcher_chain = Arc::clone(&self.chain);
194 let task_number_fetcher = spawn(
195 "importer::number-fetcher",
196 Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval),
197 );
198
199 let block_fetcher_chain = Arc::clone(&self.chain);
201 let task_block_fetcher = spawn(
202 "importer::block-with-changes-fetcher",
203 Importer::start_block_with_changes_fetcher(block_fetcher_chain, backlog_tx, number),
204 );
205
206 let results = try_join!(task_saver, task_block_fetcher, task_number_fetcher)?;
207 results.0?;
208 results.1?;
209 results.2?;
210 }
211 }
212
213 Ok(())
214 }
215
216 pub const TASKS_COUNT: usize = 3;
221
222 async fn start_block_executor(
224 executor: Arc<Executor>,
225 miner: Arc<Miner>,
226 mut backlog_rx: mpsc::Receiver<(ExternalBlock, Vec<ExternalReceipt>)>,
227 kafka_connector: Option<Arc<KafkaConnector>>,
228 importer_mode: ImporterMode,
229 ) -> anyhow::Result<()> {
230 const TASK_NAME: &str = "block-executor";
231 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
232
233 loop {
234 if Self::should_shutdown(TASK_NAME) {
235 return Ok(());
236 }
237
238 let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await {
239 Ok(Some(inner)) => inner,
240 Ok(None) => break, Err(_timed_out) => {
242 tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
243 continue;
244 }
245 };
246
247 #[cfg(feature = "metrics")]
248 let (start, block_number, block_tx_len, receipts_len) = (metrics::now(), block.number(), block.transactions.len(), receipts.len());
249
250 if let ImporterMode::FakeLeader = importer_mode {
251 for tx in block.0.transactions.into_transactions() {
252 tracing::info!(?tx, "executing tx as fake miner");
253 if let Err(e) = executor.execute_local_transaction(tx.try_into()?) {
254 match e {
255 StratusError::Transaction(TransactionError::Nonce { transaction: _, account: _ }) => {
256 tracing::warn!(reason = ?e, "transaction failed, was this node restarted?");
257 }
258 _ => {
259 tracing::error!(reason = ?e, "transaction failed");
260 GlobalState::shutdown_from("Importer (FakeMiner)", "Transaction Failed");
261 return Err(anyhow!(e));
262 }
263 }
264 }
265 }
266 mine_and_commit(&miner);
267 continue;
268 }
269
270 if let Err(e) = executor.execute_external_block(block.clone(), ExternalReceipts::from(receipts)) {
271 let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block");
272 return log_and_err!(reason = e, message);
273 };
274
275 #[cfg(feature = "metrics")]
277 {
278 let duration = start.elapsed();
279 let tps = calculate_tps(duration, block_tx_len);
280
281 tracing::info!(
282 tps,
283 %block_number,
284 duration = %duration.to_string_ext(),
285 %receipts_len,
286 "reexecuted external block",
287 );
288 }
289
290 let (mined_block, changes) = match miner.mine_external(block) {
291 Ok((mined_block, changes)) => {
292 tracing::info!(number = %mined_block.number(), "mined external block");
293 (mined_block, changes)
294 }
295 Err(e) => {
296 let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block");
297 return log_and_err!(reason = e, message);
298 }
299 };
300
301 if let Some(ref kafka_conn) = kafka_connector {
302 let events = mined_block
303 .transactions
304 .iter()
305 .flat_map(|tx| transaction_to_events(mined_block.header.timestamp, Cow::Borrowed(tx)));
306
307 kafka_conn.send_buffered(events, 50).await?;
308 }
309
310 match miner.commit(CommitItem::Block(mined_block), changes) {
311 Ok(_) => {
312 tracing::info!("committed external block");
313 }
314 Err(e) => {
315 let message = GlobalState::shutdown_from(TASK_NAME, "failed to commit external block");
316 return log_and_err!(reason = e, message);
317 }
318 }
319
320 #[cfg(feature = "metrics")]
321 {
322 metrics::inc_n_importer_online_transactions_total(receipts_len as u64);
323 metrics::inc_import_online_mined_block(start.elapsed());
324 }
325 }
326
327 warn_task_tx_closed(TASK_NAME);
328 Ok(())
329 }
330
331 async fn start_block_saver(
332 miner: Arc<Miner>,
333 mut backlog_rx: mpsc::Receiver<(Block, ExecutionChanges)>,
334 kafka_connector: Option<Arc<KafkaConnector>>,
335 ) -> anyhow::Result<()> {
336 const TASK_NAME: &str = "block-saver";
337 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
338
339 loop {
340 if Self::should_shutdown(TASK_NAME) {
341 return Ok(());
342 }
343
344 let (block, changes) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await {
345 Ok(Some(inner)) => inner,
346 Ok(None) => break, Err(_timed_out) => {
348 tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
349 continue;
350 }
351 };
352
353 tracing::info!(block_number = %block.number(), "received block with changes");
354
355 #[cfg(feature = "metrics")]
356 let (start, block_tx_len) = (metrics::now(), block.transactions.len());
357
358 if let Some(ref kafka_conn) = kafka_connector {
359 let events = block
360 .transactions
361 .iter()
362 .flat_map(|tx| transaction_to_events(block.header.timestamp, Cow::Borrowed(tx)));
363
364 kafka_conn.send_buffered(events, 50).await?;
365 }
366
367 miner.commit(CommitItem::ReplicationBlock(block), changes)?;
368
369 #[cfg(feature = "metrics")]
370 {
371 metrics::inc_n_importer_online_transactions_total(block_tx_len as u64);
372 metrics::inc_import_online_mined_block(start.elapsed());
373 }
374 }
375
376 warn_task_tx_closed(TASK_NAME);
377 Ok(())
378 }
379
380 async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Duration) -> anyhow::Result<()> {
386 const TASK_NAME: &str = "external-number-fetcher";
387 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
388
389 let mut sub_new_heads = if chain.supports_ws() {
392 tracing::info!("{} subscribing to newHeads event", TASK_NAME);
393
394 match chain.subscribe_new_heads().await {
395 Ok(sub) => {
396 tracing::info!("{} subscribed to newHeads events", TASK_NAME);
397 Some(sub)
398 }
399 Err(e) => {
400 let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event");
401 return log_and_err!(reason = e, message);
402 }
403 }
404 } else {
405 tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME);
406 None
407 };
408
409 loop {
411 if Self::should_shutdown(TASK_NAME) {
412 return Ok(());
413 }
414
415 if let Some(sub) = &mut sub_new_heads {
418 tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME);
419 match timeout(TIMEOUT_NEW_HEADS, sub.next()).await {
420 Ok(Some(Ok(block))) => {
421 tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME);
422 set_external_rpc_current_block(block.number());
423 continue;
424 }
425 Ok(None) =>
426 if !Self::should_shutdown(TASK_NAME) {
427 tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME);
428 },
429 Ok(Some(Err(e))) =>
430 if !Self::should_shutdown(TASK_NAME) {
431 tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME);
432 },
433 Err(_) =>
434 if !Self::should_shutdown(TASK_NAME) {
435 tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME);
436 },
437 }
438
439 if Self::should_shutdown(TASK_NAME) {
440 return Ok(());
441 }
442
443 if chain.supports_ws() {
446 tracing::info!("{} resubscribing to newHeads event", TASK_NAME);
447 match chain.subscribe_new_heads().await {
448 Ok(sub) => {
449 tracing::info!("{} resubscribed to newHeads event", TASK_NAME);
450 sub_new_heads = Some(sub);
451 }
452 Err(e) =>
453 if !Self::should_shutdown(TASK_NAME) {
454 tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME);
455 },
456 }
457 }
458 }
459
460 if Self::should_shutdown(TASK_NAME) {
461 return Ok(());
462 }
463
464 tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME);
466 match chain.fetch_block_number().await {
467 Ok(block_number) => {
468 tracing::info!(
469 %block_number,
470 sync_interval = %sync_interval.to_string_ext(),
471 "fetched current block number via http. awaiting sync interval to retrieve again."
472 );
473 set_external_rpc_current_block(block_number);
474 traced_sleep(sync_interval, SleepReason::SyncData).await;
475 }
476 Err(e) =>
477 if !Self::should_shutdown(TASK_NAME) {
478 tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
479 },
480 }
481 }
482 }
483
484 async fn start_block_fetcher(
490 chain: Arc<BlockchainClient>,
491 backlog_tx: mpsc::Sender<(ExternalBlock, Vec<ExternalReceipt>)>,
492 mut importer_block_number: BlockNumber,
493 ) -> anyhow::Result<()> {
494 const TASK_NAME: &str = "external-block-fetcher";
495 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
496
497 loop {
498 if Self::should_shutdown(TASK_NAME) {
499 return Ok(());
500 }
501
502 let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
504 if importer_block_number.as_u64() > external_rpc_current_block {
505 yield_now().await;
506 continue;
507 }
508
509 let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; let mut blocks_to_fetch = min(blocks_behind, 1_000); tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");
513
514 let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
515 while blocks_to_fetch > 0 {
516 blocks_to_fetch -= 1;
517 tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number));
518 importer_block_number = importer_block_number.next_block_number();
519 }
520
521 let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
523 while let Some((mut block, mut receipts)) = tasks.next().await {
524 let block_number = block.number();
525 let BlockTransactions::Full(transactions) = &mut block.transactions else {
526 return Err(anyhow!("expected full transactions, got hashes or uncle"));
527 };
528
529 if transactions.len() != receipts.len() {
530 return Err(anyhow!(
531 "block {} has mismatched transaction and receipt length: {} transactions but {} receipts",
532 block_number,
533 transactions.len(),
534 receipts.len()
535 ));
536 }
537
538 transactions.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
540 receipts.sort_by(|a, b| a.transaction_index.cmp(&b.transaction_index));
541
542 for window in transactions.windows(2) {
544 let tx_index = window[0].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
545 let next_tx_index = window[1].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
546 if tx_index + 1 != next_tx_index {
547 tracing::error!(tx_index, next_tx_index, "two consecutive transactions must have consecutive indices");
548 }
549 }
550 for window in receipts.windows(2) {
551 let tx_index = window[0].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
552 let next_tx_index = window[1].transaction_index.ok_or(anyhow!("missing transaction index"))? as u32;
553 if tx_index + 1 != next_tx_index {
554 tracing::error!(tx_index, next_tx_index, "two consecutive receipts must have consecutive indices");
555 }
556 }
557
558 if backlog_tx.send((block, receipts)).await.is_err() {
559 warn_task_rx_closed(TASK_NAME);
560 return Ok(());
561 }
562 }
563 }
564 }
565
566 async fn start_block_with_changes_fetcher(
568 chain: Arc<BlockchainClient>,
569 backlog_tx: mpsc::Sender<(Block, ExecutionChanges)>,
570 mut importer_block_number: BlockNumber,
571 ) -> anyhow::Result<()> {
572 const TASK_NAME: &str = "block-with-changes-fetcher";
573 let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
574
575 loop {
576 if Self::should_shutdown(TASK_NAME) {
577 return Ok(());
578 }
579
580 let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
582 if importer_block_number.as_u64() > external_rpc_current_block {
583 yield_now().await;
584 continue;
585 }
586
587 let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; let mut blocks_to_fetch = min(blocks_behind, 1_000); tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");
591
592 let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
593 while blocks_to_fetch > 0 {
594 blocks_to_fetch -= 1;
595 tasks.push(fetch_block_with_changes(Arc::clone(&chain), importer_block_number));
596 importer_block_number = importer_block_number.next_block_number();
597 }
598
599 let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
601 while let Some(block) = tasks.next().await {
602 if backlog_tx.send(block).await.is_err() {
603 warn_task_rx_closed(TASK_NAME);
604 return Ok(());
605 }
606 }
607 }
608 }
609}
610
611async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (ExternalBlock, Vec<ExternalReceipt>) {
615 const RETRY_DELAY: Duration = Duration::from_millis(10);
616 Span::with(|s| {
617 s.rec_str("block_number", &block_number);
618 });
619
620 loop {
621 tracing::info!(%block_number, "fetching block and receipts");
622
623 match chain.fetch_block_and_receipts(block_number).await {
624 Ok(Some(response)) => return (response.block, response.receipts),
625 Ok(None) => {
626 tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay.");
627 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
628 }
629 Err(e) => {
630 tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
631 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
632 }
633 };
634 }
635}
636
637async fn fetch_block_with_changes(chain: Arc<BlockchainClient>, block_number: BlockNumber) -> (Block, ExecutionChanges) {
638 const RETRY_DELAY: Duration = Duration::from_millis(10);
639 Span::with(|s| {
640 s.rec_str("block_number", &block_number);
641 });
642
643 loop {
644 tracing::info!(%block_number, "fetching block and changes");
645
646 match chain.fetch_block_with_changes(block_number).await {
647 Ok(Some(response)) => {
648 return response;
649 }
650 Ok(None) => {
651 tracing::warn!(%block_number, delay_ms = %RETRY_DELAY.as_millis(), "block and receipts not available yet, retrying with delay.");
652 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
653 }
654 Err(e) => {
655 tracing::warn!(reason = ?e, %block_number, delay_ms = %RETRY_DELAY.as_millis(), "failed to fetch block and receipts, retrying with delay.");
656 traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
657 }
658 };
659 }
660}
661
662impl Consensus for Importer {
663 async fn lag(&self) -> anyhow::Result<u64> {
664 let elapsed = chrono::Utc::now().timestamp() as u64 - LATEST_FETCHED_BLOCK_TIME.load(Ordering::Relaxed);
665 if elapsed > 4 {
666 Err(anyhow::anyhow!(
667 "too much time elapsed without communicating with the leader. elapsed: {}s",
668 elapsed
669 ))
670 } else {
671 Ok(EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst) - self.storage.read_mined_block_number().as_u64())
672 }
673 }
674
675 fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
676 Ok(&self.chain)
677 }
678}