1use std::cmp::max;
2use std::mem;
3use std::sync::Arc;
4
5#[cfg(feature = "metrics")]
6use alloy_consensus::Transaction;
7use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
8use alloy_rpc_types_trace::geth::GethTrace;
9use cfg_if::cfg_if;
10use parking_lot::Mutex;
11use tracing::Span;
12use tracing::debug_span;
13#[cfg(feature = "tracing")]
14use tracing::info_span;
15
16use super::evm_input::InspectorInput;
17use crate::GlobalState;
18#[cfg(feature = "metrics")]
19use crate::eth::codegen;
20use crate::eth::executor::Evm;
21use crate::eth::executor::EvmExecutionResult;
22use crate::eth::executor::EvmInput;
23use crate::eth::executor::ExecutorConfig;
24use crate::eth::executor::evm::EvmKind;
25use crate::eth::miner::Miner;
26use crate::eth::primitives::BlockNumber;
27use crate::eth::primitives::CallInput;
28use crate::eth::primitives::EvmExecution;
29use crate::eth::primitives::EvmExecutionMetrics;
30use crate::eth::primitives::ExecutionResult;
31use crate::eth::primitives::ExternalBlock;
32use crate::eth::primitives::ExternalReceipt;
33use crate::eth::primitives::ExternalReceipts;
34use crate::eth::primitives::ExternalTransaction;
35use crate::eth::primitives::Hash;
36use crate::eth::primitives::PointInTime;
37use crate::eth::primitives::RpcError;
38use crate::eth::primitives::StorageError;
39use crate::eth::primitives::StratusError;
40use crate::eth::primitives::TransactionError;
41use crate::eth::primitives::TransactionExecution;
42use crate::eth::primitives::TransactionInput;
43use crate::eth::primitives::UnexpectedError;
44use crate::eth::primitives::UnixTime;
45use crate::eth::storage::ReadKind;
46use crate::eth::storage::StratusStorage;
47#[cfg(feature = "metrics")]
48use crate::ext::OptionExt;
49use crate::ext::spawn_thread;
50use crate::ext::to_json_string;
51use crate::infra::metrics;
52use crate::infra::metrics::timed;
53use crate::infra::tracing::SpanExt;
54use crate::infra::tracing::warn_task_tx_closed;
55
56#[derive(Debug)]
61pub struct EvmTask {
62 pub span: Span,
63 pub input: EvmInput,
64 pub response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>,
65}
66
67impl EvmTask {
68 pub fn new(input: EvmInput, response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>) -> Self {
69 Self {
70 span: Span::current(),
71 input,
72 response_tx,
73 }
74 }
75}
76
77pub struct InspectorTask {
78 pub span: Span,
79 pub input: InspectorInput,
80 pub response_tx: oneshot::Sender<Result<GethTrace, StratusError>>,
81}
82
83impl InspectorTask {
84 pub fn new(input: InspectorInput, response_tx: oneshot::Sender<Result<GethTrace, StratusError>>) -> Self {
85 Self {
86 span: Span::current(),
87 input,
88 response_tx,
89 }
90 }
91}
92
93struct Evms {
99 pub tx_local: crossbeam_channel::Sender<EvmTask>,
101
102 pub tx_external: crossbeam_channel::Sender<EvmTask>,
104
105 pub call_present: crossbeam_channel::Sender<EvmTask>,
107
108 pub call_past: crossbeam_channel::Sender<EvmTask>,
110
111 pub inspector: crossbeam_channel::Sender<InspectorTask>,
112}
113
114impl Evms {
115 fn spawn(storage: Arc<StratusStorage>, config: &ExecutorConfig) -> Self {
117 fn evm_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<EvmTask>, kind: EvmKind) {
119 let mut evm = Evm::new(storage, config, kind);
120
121 while let Ok(task) = task_rx.recv() {
123 if GlobalState::is_shutdown_warn(task_name) {
124 return;
125 }
126
127 let _enter = task.span.enter();
129 let result = evm.execute(task.input);
130 if let Err(e) = task.response_tx.send(result) {
131 tracing::error!(reason = ?e, "failed to send evm task execution result");
132 }
133 }
134 warn_task_tx_closed(task_name);
135 }
136
137 let spawn_evms = |task_name: &str, num_evms: usize, kind: EvmKind| {
139 let (evm_tx, evm_rx) = crossbeam_channel::unbounded::<EvmTask>();
140
141 for evm_index in 1..=num_evms {
142 let evm_task_name = format!("{task_name}-{evm_index}");
143 let evm_storage = Arc::clone(&storage);
144 let evm_config = config.clone();
145 let evm_rx = evm_rx.clone();
146 let thread_name = evm_task_name.clone();
147 spawn_thread(&thread_name, move || {
148 evm_loop(&evm_task_name, evm_storage, evm_config, evm_rx, kind);
149 });
150 }
151 evm_tx
152 };
153
154 fn inspector_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<InspectorTask>) {
155 let mut evm = Evm::new(storage, config, EvmKind::Call);
156
157 while let Ok(task) = task_rx.recv() {
159 if GlobalState::is_shutdown_warn(task_name) {
160 return;
161 }
162
163 let _enter = task.span.enter();
165 let result = evm.inspect(task.input);
166 if let Err(e) = task.response_tx.send(result) {
167 tracing::error!(reason = ?e, "failed to send evm task execution result");
168 }
169 }
170 warn_task_tx_closed(task_name);
171 }
172
173 let spawn_inspectors = |task_name: &str, num_evms: usize| {
175 let (tx, rx) = crossbeam_channel::unbounded::<InspectorTask>();
176
177 for index in 1..=num_evms {
178 let task_name = format!("{task_name}-{index}");
179 let storage = Arc::clone(&storage);
180 let config = config.clone();
181 let rx = rx.clone();
182 let thread_name = task_name.clone();
183 spawn_thread(&thread_name, move || {
184 inspector_loop(&task_name, storage, config, rx);
185 });
186 }
187 tx
188 };
189
190 let tx_local = spawn_evms("evm-tx-serial", 1, EvmKind::Transaction);
191 let tx_external = spawn_evms("evm-tx-external", 1, EvmKind::Transaction);
192 let call_present = spawn_evms(
193 "evm-call-present",
194 max(config.executor_call_present_evms.unwrap_or(config.executor_evms / 2), 1),
195 EvmKind::Call,
196 );
197 let call_past = spawn_evms(
198 "evm-call-past",
199 max(config.executor_call_past_evms.unwrap_or(config.executor_evms / 4), 1),
200 EvmKind::Call,
201 );
202 let inspector = spawn_inspectors("inspector", max(config.executor_inspector_evms.unwrap_or(config.executor_evms / 4), 1));
203
204 Evms {
205 tx_local,
206 tx_external,
207 call_present,
208 call_past,
209 inspector,
210 }
211 }
212
213 fn execute(&self, evm_input: EvmInput, route: EvmRoute) -> Result<EvmExecutionResult, StratusError> {
215 let (execution_tx, execution_rx) = oneshot::channel::<Result<EvmExecutionResult, StratusError>>();
216
217 let task = EvmTask::new(evm_input, execution_tx);
218 let _ = match route {
219 EvmRoute::Local => self.tx_local.send(task),
220 EvmRoute::External => self.tx_external.send(task),
221 EvmRoute::CallPresent => self.call_present.send(task),
222 EvmRoute::CallPast => self.call_past.send(task),
223 };
224
225 match execution_rx.recv() {
226 Ok(result) => result,
227 Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
228 }
229 }
230
231 fn inspect(&self, input: InspectorInput) -> Result<GethTrace, StratusError> {
232 let (inspector_tx, inspector_rx) = oneshot::channel::<Result<GethTrace, StratusError>>();
233 let task = InspectorTask::new(input, inspector_tx);
234 let _ = self.inspector.send(task);
235 match inspector_rx.recv() {
236 Ok(result) => result,
237 Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
238 }
239 }
240}
241
242#[derive(Debug, Clone, Copy, strum::Display)]
243pub enum EvmRoute {
244 #[strum(to_string = "local")]
245 Local,
246
247 #[strum(to_string = "external")]
248 External,
249
250 #[strum(to_string = "call_present")]
251 CallPresent,
252
253 #[strum(to_string = "call_past")]
254 CallPast,
255}
256
257#[derive(Default)]
263pub struct ExecutorLocks {
264 transaction: Mutex<()>,
265}
266
267pub struct Executor {
268 locks: ExecutorLocks,
270
271 evms: Evms,
273
274 miner: Arc<Miner>,
276
277 storage: Arc<StratusStorage>,
279}
280
281impl Executor {
282 pub fn new(storage: Arc<StratusStorage>, miner: Arc<Miner>, config: ExecutorConfig) -> Self {
283 tracing::info!(?config, "creating executor");
284 let evms = Evms::spawn(Arc::clone(&storage), &config);
285 Self {
286 locks: ExecutorLocks::default(),
287 evms,
288 miner,
289 storage,
290 }
291 }
292
293 pub fn execute_external_block(&self, mut block: ExternalBlock, mut receipts: ExternalReceipts) -> anyhow::Result<()> {
301 #[cfg(feature = "metrics")]
303 let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default());
304
305 #[cfg(feature = "tracing")]
306 let _span = info_span!("executor::external_block", block_number = %block.number()).entered();
307 tracing::info!(block_number = %block.number(), "reexecuting external block");
308
309 let block_number = block.number();
311 let block_timestamp = block.timestamp();
312 let block_transactions = mem::take(&mut block.transactions);
313
314 for tx in block_transactions.into_transactions() {
316 let receipt = receipts.try_remove(tx.hash())?;
317 self.execute_external_transaction(
318 tx,
319 receipt,
320 block_number,
321 block_timestamp,
322 #[cfg(feature = "metrics")]
323 &mut block_metrics,
324 )?;
325 }
326
327 #[cfg(feature = "metrics")]
329 {
330 metrics::inc_executor_external_block(start.elapsed());
331 metrics::inc_executor_external_block_account_reads(block_metrics.account_reads);
332 metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads);
333 }
334
335 Ok(())
336 }
337
338 fn execute_external_transaction(
343 &self,
344 tx: ExternalTransaction,
345 receipt: ExternalReceipt,
346 block_number: BlockNumber,
347 block_timestamp: UnixTime,
348 #[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics,
349 ) -> anyhow::Result<()> {
350 #[cfg(feature = "metrics")]
352 let (start, tx_function, tx_contract) = (
353 metrics::now(),
354 codegen::function_sig(tx.inner.input()),
355 codegen::contract_name(&tx.0.to().map_into()),
356 );
357
358 #[cfg(feature = "tracing")]
359 let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash()).entered();
360 tracing::info!(%block_number, tx_hash = %tx.hash(), "reexecuting external transaction");
361
362 let evm_input = EvmInput::from_external(&tx, &receipt, block_number, block_timestamp)?;
363
364 let tx_execution = match receipt.is_success() {
366 true => {
368 let evm_execution = self.evms.execute(evm_input.clone(), EvmRoute::External);
370
371 let mut evm_execution = match evm_execution {
373 Ok(inner) => inner,
374 Err(e) => {
375 let json_tx = to_json_string(&tx);
376 let json_receipt = to_json_string(&receipt);
377 tracing::error!(reason = ?e, %block_number, tx_hash = %tx.hash(), %json_tx, %json_receipt, "failed to reexecute external transaction");
378 return Err(e.into());
379 }
380 };
381
382 evm_execution.execution.apply_receipt(&receipt)?;
384
385 if let Err(e) = evm_execution.execution.compare_with_receipt(&receipt) {
387 let json_tx = to_json_string(&tx);
388 let json_receipt = to_json_string(&receipt);
389 let json_execution_logs = to_json_string(&evm_execution.execution.logs);
390 tracing::error!(reason = ?e, %block_number, tx_hash = %tx.hash(), %json_tx, %json_receipt, %json_execution_logs, "failed to reexecute external transaction");
391 return Err(e);
392 };
393
394 TransactionExecution::new(tx.try_into()?, evm_input, evm_execution)
395 }
396 false => {
399 let sender = self.storage.read_account(receipt.from.into(), PointInTime::Pending, ReadKind::Transaction)?;
400 let execution = EvmExecution::from_failed_external_transaction(sender, &receipt, block_timestamp)?;
401 let evm_result = EvmExecutionResult {
402 execution,
403 metrics: EvmExecutionMetrics::default(),
404 };
405 TransactionExecution::new(tx.try_into()?, evm_input, evm_result)
406 }
407 };
408
409 cfg_if! {
411 if #[cfg(feature = "metrics")] {
412 let tx_metrics = tx_execution.metrics();
413 let tx_gas = tx_execution.result.execution.gas;
414 }
415 }
416
417 self.miner.save_execution(tx_execution, false)?;
419
420 #[cfg(feature = "metrics")]
422 {
423 *block_metrics += tx_metrics;
424
425 metrics::inc_executor_external_transaction(start.elapsed(), tx_contract, tx_function);
426 metrics::inc_executor_external_transaction_account_reads(tx_metrics.account_reads, tx_contract, tx_function);
427 metrics::inc_executor_external_transaction_slot_reads(tx_metrics.slot_reads, tx_contract, tx_function);
428 metrics::inc_executor_external_transaction_gas(tx_gas.as_u64() as usize, tx_contract, tx_function);
429 }
430
431 Ok(())
432 }
433
434 #[tracing::instrument(name = "executor::local_transaction", skip_all, fields(tx_hash, tx_from, tx_to, tx_nonce))]
440 pub fn execute_local_transaction(&self, tx: TransactionInput) -> Result<(), StratusError> {
441 #[cfg(feature = "metrics")]
442 let function = codegen::function_sig(&tx.input);
443 #[cfg(feature = "metrics")]
444 let contract = codegen::contract_name(&tx.to);
445 #[cfg(feature = "metrics")]
446 let start = metrics::now();
447
448 tracing::debug!(tx_hash = %tx.hash, "executing local transaction");
449
450 Span::with(|s| {
452 s.rec_str("tx_hash", &tx.hash);
453 s.rec_str("tx_from", &tx.signer);
454 s.rec_opt("tx_to", &tx.to);
455 s.rec_str("tx_nonce", &tx.nonce);
456 });
457
458 const INFINITE_ATTEMPTS: usize = usize::MAX;
460
461 let _transaction_lock = self.locks.transaction.lock();
465
466 let tx_execution = self.execute_local_transaction_attempts(tx, INFINITE_ATTEMPTS);
468
469 #[cfg(feature = "metrics")]
470 metrics::inc_executor_local_transaction(start.elapsed(), tx_execution.is_ok(), contract, function);
471
472 tx_execution
473 }
474
475 fn execute_local_transaction_attempts(&self, tx_input: TransactionInput, max_attempts: usize) -> Result<(), StratusError> {
477 if tx_input.signer.is_zero() {
479 return Err(TransactionError::FromZeroAddress.into());
480 }
481
482 let mut attempt = 0;
484 loop {
485 attempt += 1;
486
487 let _span = debug_span!(
489 "executor::local_transaction_attempt",
490 %attempt,
491 tx_hash = %tx_input.hash,
492 tx_from = %tx_input.signer,
493 tx_to = tracing::field::Empty,
494 tx_nonce = %tx_input.nonce
495 )
496 .entered();
497 Span::with(|s| {
498 s.rec_opt("tx_to", &tx_input.to);
499 });
500
501 let (pending_header, _) = self.storage.read_pending_block_header();
503 let evm_input = EvmInput::from_eth_transaction(&tx_input, &pending_header);
504
505 tracing::debug!(
507 %attempt,
508 tx_hash = %tx_input.hash,
509 tx_nonce = %tx_input.nonce,
510 tx_from = %tx_input.from,
511 tx_signer = %tx_input.signer,
512 tx_to = ?tx_input.to,
513 tx_data_len = %tx_input.input.len(),
514 tx_data = %tx_input.input,
515 ?evm_input,
516 "executing local transaction attempt"
517 );
518
519 let evm_result = self.evms.execute(evm_input.clone(), EvmRoute::Local)?;
520
521 let tx_execution = TransactionExecution::new(tx_input.clone(), evm_input, evm_result);
524 #[cfg(feature = "metrics")]
525 let tx_metrics = tx_execution.metrics();
526 #[cfg(feature = "metrics")]
527 let gas_used = tx_execution.result.execution.gas;
528 #[cfg(feature = "metrics")]
529 let function = codegen::function_sig(&tx_input.input);
530 #[cfg(feature = "metrics")]
531 let contract = codegen::contract_name(&tx_input.to);
532
533 if let ExecutionResult::Reverted { reason } = &tx_execution.result.execution.result {
534 tracing::info!(?reason, "local transaction execution reverted");
535 #[cfg(feature = "metrics")]
536 metrics::inc_executor_local_transaction_reverts(contract, function, reason.0.as_ref());
537 }
538
539 match self.miner.save_execution(tx_execution, true) {
540 Ok(_) => {
541 #[cfg(feature = "metrics")]
543 {
544 metrics::inc_executor_local_transaction_account_reads(tx_metrics.account_reads, contract, function);
545 metrics::inc_executor_local_transaction_slot_reads(tx_metrics.slot_reads, contract, function);
546 metrics::inc_executor_local_transaction_gas(gas_used.as_u64() as usize, true, contract, function);
547 }
548 return Ok(());
549 }
550 Err(e) => match e {
551 StratusError::Storage(StorageError::EvmInputMismatch { ref expected, ref actual }) => {
552 tracing::warn!(?expected, ?actual, "evm input and block header mismatch");
553 if attempt >= max_attempts {
554 return Err(e);
555 }
556 continue;
557 }
558 _ => return Err(e),
559 },
560 }
561 }
562 }
563
564 #[tracing::instrument(name = "executor::local_call", skip_all, fields(from, to))]
566 pub fn execute_local_call(&self, call_input: CallInput, point_in_time: PointInTime) -> Result<EvmExecution, StratusError> {
567 #[cfg(feature = "metrics")]
568 let start = metrics::now();
569
570 Span::with(|s| {
571 s.rec_opt("from", &call_input.from);
572 s.rec_opt("to", &call_input.to);
573 });
574 tracing::info!(
575 from = ?call_input.from,
576 to = ?call_input.to,
577 data_len = call_input.data.len(),
578 data = %call_input.data,
579 %point_in_time,
580 "executing read-only local transaction"
581 );
582
583 let evm_input = match point_in_time {
585 PointInTime::Pending => {
586 let (pending_header, tx_count) = self.storage.read_pending_block_header();
587 EvmInput::from_pending_block(call_input.clone(), pending_header, tx_count)
588 }
589 point_in_time => {
590 let Some(block) = self.storage.read_block(point_in_time.into())? else {
591 return Err(RpcError::BlockFilterInvalid { filter: point_in_time.into() }.into());
592 };
593 EvmInput::from_mined_block(call_input.clone(), block, point_in_time)
594 }
595 };
596
597 let evm_route = match point_in_time {
598 PointInTime::Pending => EvmRoute::CallPresent,
599 PointInTime::Mined | PointInTime::MinedPast(_) => EvmRoute::CallPast,
600 };
601 let evm_result = self.evms.execute(evm_input, evm_route);
602
603 #[cfg(feature = "metrics")]
605 {
606 let function = codegen::function_sig(&call_input.data);
607 let contract = codegen::contract_name(&call_input.to);
608
609 match &evm_result {
610 Ok(evm_result) => {
611 metrics::inc_executor_local_call(start.elapsed(), true, contract, function);
612 metrics::inc_executor_local_call_account_reads(evm_result.metrics.account_reads, contract, function);
613 metrics::inc_executor_local_call_slot_reads(evm_result.metrics.slot_reads, contract, function);
614 metrics::inc_executor_local_call_gas(evm_result.execution.gas.as_u64() as usize, contract, function);
615 }
616 Err(_) => {
617 metrics::inc_executor_local_call(start.elapsed(), false, contract, function);
618 }
619 }
620 }
621
622 let execution = evm_result?.execution;
623 Ok(execution)
624 }
625
626 pub fn trace_transaction(&self, tx_hash: Hash, opts: Option<GethDebugTracingOptions>, trace_unsuccessful_only: bool) -> Result<GethTrace, StratusError> {
627 Span::with(|s| {
628 s.rec_str("tx_hash", &tx_hash);
629 });
630
631 tracing::info!("inspecting transaction");
632 let opts = opts.unwrap_or_default();
633 let tracer_type = opts.tracer.clone();
634
635 timed(|| {
636 self.evms.inspect(InspectorInput {
637 tx_hash,
638 opts,
639 trace_unsuccessful_only,
640 })
641 })
642 .with(|m| metrics::inc_evm_inspect(m.elapsed, serde_json::to_string(&tracer_type).unwrap_or_else(|_| "unkown".to_owned())))
643 }
644}