1use std::mem;
2use std::sync::Arc;
3
4#[cfg(feature = "metrics")]
5use alloy_consensus::Transaction;
6use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
7use alloy_rpc_types_trace::geth::GethTrace;
8use anyhow::bail;
9use cfg_if::cfg_if;
10use parking_lot::Mutex;
11use tracing::Span;
12use tracing::debug_span;
13#[cfg(feature = "tracing")]
14use tracing::info_span;
15
16use super::evm_input::InspectorInput;
17use crate::GlobalState;
18#[cfg(feature = "metrics")]
19use crate::eth::codegen;
20use crate::eth::executor::Evm;
21use crate::eth::executor::EvmExecutionResult;
22use crate::eth::executor::EvmInput;
23use crate::eth::executor::ExecutorConfig;
24use crate::eth::executor::evm::EvmKind;
25use crate::eth::miner::Miner;
26use crate::eth::primitives::BlockNumber;
27use crate::eth::primitives::CallInput;
28use crate::eth::primitives::EvmExecution;
29use crate::eth::primitives::EvmExecutionMetrics;
30use crate::eth::primitives::ExecutionResult;
31use crate::eth::primitives::ExternalBlock;
32use crate::eth::primitives::ExternalReceipt;
33use crate::eth::primitives::ExternalReceipts;
34use crate::eth::primitives::ExternalTransaction;
35use crate::eth::primitives::Hash;
36use crate::eth::primitives::PointInTime;
37use crate::eth::primitives::RpcError;
38use crate::eth::primitives::StorageError;
39use crate::eth::primitives::StratusError;
40use crate::eth::primitives::TransactionError;
41use crate::eth::primitives::TransactionExecution;
42use crate::eth::primitives::TransactionInput;
43use crate::eth::primitives::UnexpectedError;
44use crate::eth::primitives::UnixTime;
45use crate::eth::storage::ReadKind;
46use crate::eth::storage::StratusStorage;
47#[cfg(feature = "metrics")]
48use crate::ext::OptionExt;
49use crate::ext::spawn_thread;
50use crate::ext::to_json_string;
51use crate::infra::metrics;
52use crate::infra::metrics::timed;
53use crate::infra::tracing::SpanExt;
54use crate::infra::tracing::warn_task_tx_closed;
55
56#[derive(Debug)]
61pub struct EvmTask {
62 pub span: Span,
63 pub input: EvmInput,
64 pub response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>,
65}
66
67impl EvmTask {
68 pub fn new(input: EvmInput, response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>) -> Self {
69 Self {
70 span: Span::current(),
71 input,
72 response_tx,
73 }
74 }
75}
76
77pub struct InspectorTask {
78 pub span: Span,
79 pub input: InspectorInput,
80 pub response_tx: oneshot::Sender<Result<GethTrace, StratusError>>,
81}
82
83impl InspectorTask {
84 pub fn new(input: InspectorInput, response_tx: oneshot::Sender<Result<GethTrace, StratusError>>) -> Self {
85 Self {
86 span: Span::current(),
87 input,
88 response_tx,
89 }
90 }
91}
92
93struct Evms {
99 pub tx: crossbeam_channel::Sender<EvmTask>,
101
102 pub call_present: crossbeam_channel::Sender<EvmTask>,
104
105 pub call_past: crossbeam_channel::Sender<EvmTask>,
107
108 pub inspector: crossbeam_channel::Sender<InspectorTask>,
110}
111
112impl Evms {
113 fn spawn(storage: Arc<StratusStorage>, config: &ExecutorConfig) -> Self {
115 fn evm_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<EvmTask>, kind: EvmKind) {
117 let mut evm = Evm::new(storage, config, kind);
118
119 while let Ok(task) = task_rx.recv() {
121 if GlobalState::is_shutdown_warn(task_name) {
122 return;
123 }
124
125 let _enter = task.span.enter();
127 let result = evm.execute(task.input);
128 if let Err(e) = task.response_tx.send(result) {
129 tracing::error!(reason = ?e, "failed to send evm task execution result");
130 }
131 }
132 warn_task_tx_closed(task_name);
133 }
134
135 let spawn_evms = |task_name: &str, num_evms: usize, kind: EvmKind| {
137 let (evm_tx, evm_rx) = crossbeam_channel::unbounded::<EvmTask>();
138
139 for evm_index in 1..=num_evms {
140 let evm_task_name = format!("{task_name}-{evm_index}");
141 let evm_storage = Arc::clone(&storage);
142 let evm_config = config.clone();
143 let evm_rx = evm_rx.clone();
144 let thread_name = evm_task_name.clone();
145 spawn_thread(&thread_name, move || {
146 evm_loop(&evm_task_name, evm_storage, evm_config, evm_rx, kind);
147 });
148 }
149 evm_tx
150 };
151
152 fn inspector_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<InspectorTask>) {
153 let mut evm = Evm::new(storage, config, EvmKind::Call);
154
155 while let Ok(task) = task_rx.recv() {
157 if GlobalState::is_shutdown_warn(task_name) {
158 return;
159 }
160
161 let _enter = task.span.enter();
163 let result = evm.inspect(task.input);
164 if let Err(e) = task.response_tx.send(result) {
165 tracing::error!(reason = ?e, "failed to send evm task execution result");
166 }
167 }
168 warn_task_tx_closed(task_name);
169 }
170
171 let spawn_inspectors = |task_name: &str, num_evms: usize| {
173 let (tx, rx) = crossbeam_channel::unbounded::<InspectorTask>();
174
175 for index in 1..=num_evms {
176 let task_name = format!("{task_name}-{index}");
177 let storage = Arc::clone(&storage);
178 let config = config.clone();
179 let rx = rx.clone();
180 let thread_name = task_name.clone();
181 spawn_thread(&thread_name, move || {
182 inspector_loop(&task_name, storage, config, rx);
183 });
184 }
185 tx
186 };
187
188 let tx = spawn_evms("evm-tx", 1, EvmKind::Transaction);
189 let call_present = spawn_evms("evm-call-present", config.call_present_evms, EvmKind::Call);
190 let call_past = spawn_evms("evm-call-past", config.call_past_evms, EvmKind::Call);
191 let inspector = spawn_inspectors("inspector", config.inspector_evms);
192
193 Evms {
194 tx,
195 call_present,
196 call_past,
197 inspector,
198 }
199 }
200
201 fn execute(&self, evm_input: EvmInput, route: EvmRoute) -> Result<EvmExecutionResult, StratusError> {
203 let (execution_tx, execution_rx) = oneshot::channel::<Result<EvmExecutionResult, StratusError>>();
204
205 let task = EvmTask::new(evm_input, execution_tx);
206 let _ = match route {
207 EvmRoute::Transaction => self.tx.send(task),
208 EvmRoute::CallPresent => self.call_present.send(task),
209 EvmRoute::CallPast => self.call_past.send(task),
210 };
211
212 match execution_rx.recv() {
213 Ok(result) => result,
214 Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
215 }
216 }
217
218 fn inspect(&self, input: InspectorInput) -> Result<GethTrace, StratusError> {
219 let (inspector_tx, inspector_rx) = oneshot::channel::<Result<GethTrace, StratusError>>();
220 let task = InspectorTask::new(input, inspector_tx);
221 let _ = self.inspector.send(task);
222 match inspector_rx.recv() {
223 Ok(result) => result,
224 Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
225 }
226 }
227}
228
229#[derive(Debug, Clone, Copy, strum::Display)]
230pub enum EvmRoute {
231 #[strum(to_string = "transaction")]
232 Transaction,
233
234 #[strum(to_string = "call_present")]
235 CallPresent,
236
237 #[strum(to_string = "call_past")]
238 CallPast,
239}
240
241#[derive(Default)]
247pub struct ExecutorLocks {
248 transaction: Mutex<()>,
249}
250
251pub struct Executor {
252 locks: ExecutorLocks,
254
255 evms: Evms,
257
258 miner: Arc<Miner>,
260
261 storage: Arc<StratusStorage>,
263}
264
265impl Executor {
266 pub fn new(storage: Arc<StratusStorage>, miner: Arc<Miner>, config: ExecutorConfig) -> Self {
267 tracing::info!(?config, "creating executor");
268 let evms = Evms::spawn(Arc::clone(&storage), &config);
269 Self {
270 locks: ExecutorLocks::default(),
271 evms,
272 miner,
273 storage,
274 }
275 }
276
277 pub fn execute_external_block(&self, mut block: ExternalBlock, mut receipts: ExternalReceipts) -> anyhow::Result<()> {
285 #[cfg(feature = "metrics")]
287 let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default());
288
289 #[cfg(feature = "tracing")]
290 let _span = info_span!("executor::external_block", block_number = %block.number()).entered();
291 tracing::info!(block_number = %block.number(), "reexecuting external block");
292
293 self.storage.set_pending_from_external(&block);
294
295 let block_number = block.number();
297 let block_timestamp = block.timestamp();
298 let block_transactions = mem::take(&mut block.transactions);
299
300 for tx in block_transactions.into_transactions() {
302 let receipt = receipts.try_remove(tx.hash())?;
303 self.execute_external_transaction(
304 tx,
305 receipt,
306 block_number,
307 block_timestamp,
308 #[cfg(feature = "metrics")]
309 &mut block_metrics,
310 )?;
311 }
312
313 #[cfg(feature = "metrics")]
315 {
316 metrics::inc_executor_external_block(start.elapsed());
317 metrics::inc_executor_external_block_account_reads(block_metrics.account_reads);
318 metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads);
319 }
320
321 Ok(())
322 }
323
324 fn execute_external_transaction(
329 &self,
330 tx: ExternalTransaction,
331 receipt: ExternalReceipt,
332 block_number: BlockNumber,
333 block_timestamp: UnixTime,
334 #[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics,
335 ) -> anyhow::Result<()> {
336 #[cfg(feature = "metrics")]
338 let (start, tx_function, tx_contract) = (
339 metrics::now(),
340 codegen::function_sig(tx.inner.input()),
341 codegen::contract_name(&tx.0.to().map_into()),
342 );
343
344 #[cfg(feature = "tracing")]
345 let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash()).entered();
346 tracing::info!(%block_number, tx_hash = %tx.hash(), "reexecuting external transaction");
347
348 let tx_input: TransactionInput = tx.try_into()?;
349 let (pending_block, _) = self.storage.read_pending_block_header();
350 let mut evm_input = EvmInput::from_eth_transaction(&tx_input.execution_info, pending_block.number, *pending_block.timestamp);
351
352 let tx_execution = match receipt.is_success() {
354 true => {
356 let evm_execution = self.evms.execute(evm_input.clone(), EvmRoute::Transaction);
358
359 let mut evm_execution = match evm_execution {
361 Ok(inner) => inner,
362 Err(e) => {
363 let json_tx = to_json_string(&tx_input);
364 let json_receipt = to_json_string(&receipt);
365 tracing::error!(reason = ?e, %block_number, tx_hash = %tx_input.transaction_info.hash, %json_tx, %json_receipt, "failed to reexecute external transaction");
366 return Err(e.into());
367 }
368 };
369
370 evm_execution.execution.apply_receipt(&receipt)?;
372
373 if let Err(e) = evm_execution.execution.compare_with_receipt(&receipt) {
375 let json_tx = to_json_string(&tx_input);
376 let json_receipt = to_json_string(&receipt);
377 let json_execution_logs = to_json_string(&evm_execution.execution.logs);
378 tracing::error!(reason = ?e, %block_number, tx_hash = %tx_input.transaction_info.hash, %json_tx, %json_receipt, %json_execution_logs, "failed to reexecute external transaction");
379 return Err(e);
380 };
381
382 TransactionExecution::new(tx_input.transaction_info, tx_input.signature, evm_input, evm_execution)
383 }
384 false => {
387 let sender = self.storage.read_account(receipt.from.into(), PointInTime::Pending, ReadKind::Transaction)?;
388 if tx_input.execution_info.nonce != sender.nonce {
389 bail!(
390 "reverted external transaction should have the correct nonce. address: {:?}, input: {:?}, sender: {:?}",
391 tx_input.execution_info.signer,
392 tx_input.execution_info.nonce,
393 sender.nonce
394 );
395 }
396 let execution = EvmExecution::from_failed_external_transaction(sender, &receipt, block_timestamp)?;
397 let evm_result = EvmExecutionResult {
398 execution,
399 metrics: EvmExecutionMetrics::default(),
400 };
401
402 evm_input.gas_limit = tx_input.execution_info.gas_limit;
403 evm_input.gas_price = tx_input.execution_info.gas_price;
404
405 TransactionExecution::new(tx_input.transaction_info, tx_input.signature, evm_input, evm_result)
406 }
407 };
408
409 cfg_if! {
411 if #[cfg(feature = "metrics")] {
412 let tx_metrics = tx_execution.metrics();
413 let tx_gas = tx_execution.result.execution.gas_used;
414 }
415 }
416
417 self.miner.save_execution(tx_execution)?;
419
420 #[cfg(feature = "metrics")]
422 {
423 *block_metrics += tx_metrics;
424
425 metrics::inc_executor_external_transaction(start.elapsed(), tx_contract, tx_function);
426 metrics::inc_executor_external_transaction_account_reads(tx_metrics.account_reads, tx_contract, tx_function);
427 metrics::inc_executor_external_transaction_slot_reads(tx_metrics.slot_reads, tx_contract, tx_function);
428 metrics::inc_executor_external_transaction_gas(tx_gas.as_u64() as usize, tx_contract, tx_function);
429 }
430
431 Ok(())
432 }
433
434 #[tracing::instrument(name = "executor::local_transaction", skip_all, fields(tx_hash, tx_from, tx_to, tx_nonce))]
440 pub fn execute_local_transaction(&self, tx: TransactionInput) -> Result<(), StratusError> {
441 #[cfg(feature = "metrics")]
442 let function = codegen::function_sig(&tx.execution_info.input);
443 #[cfg(feature = "metrics")]
444 let contract = codegen::contract_name(&tx.execution_info.to);
445 #[cfg(feature = "metrics")]
446 let start = metrics::now();
447
448 tracing::debug!(tx_hash = %tx.transaction_info.hash, "executing local transaction");
449
450 Span::with(|s| {
452 s.rec_str("tx_hash", &tx.transaction_info.hash);
453 s.rec_str("tx_from", &tx.execution_info.signer);
454 s.rec_opt("tx_to", &tx.execution_info.to);
455 s.rec_str("tx_nonce", &tx.execution_info.nonce);
456 });
457
458 const INFINITE_ATTEMPTS: usize = usize::MAX;
460
461 let _transaction_lock = self.locks.transaction.lock();
465
466 let tx_execution = self.execute_local_transaction_attempts(tx, INFINITE_ATTEMPTS);
468
469 #[cfg(feature = "metrics")]
470 metrics::inc_executor_local_transaction(start.elapsed(), tx_execution.is_ok(), contract, function);
471
472 tx_execution
473 }
474
475 fn execute_local_transaction_attempts(&self, tx_input: TransactionInput, max_attempts: usize) -> Result<(), StratusError> {
477 if tx_input.execution_info.signer.is_zero() {
479 return Err(TransactionError::FromZeroAddress.into());
480 }
481
482 let mut attempt = 0;
484 loop {
485 attempt += 1;
486
487 let _span = debug_span!(
489 "executor::local_transaction_attempt",
490 %attempt,
491 tx_hash = %tx_input.transaction_info.hash,
492 tx_from = %tx_input.execution_info.signer,
493 tx_to = tracing::field::Empty,
494 tx_nonce = %tx_input.execution_info.nonce
495 )
496 .entered();
497 Span::with(|s| {
498 s.rec_opt("tx_to", &tx_input.execution_info.to);
499 });
500
501 let (pending_header, _) = self.storage.read_pending_block_header();
503 let evm_input = EvmInput::from_eth_transaction(&tx_input.execution_info, pending_header.number, *pending_header.timestamp);
504
505 tracing::debug!(
507 %attempt,
508 tx_hash = %tx_input.transaction_info.hash,
509 tx_nonce = %tx_input.execution_info.nonce,
510 tx_signer = %tx_input.execution_info.signer,
511 tx_to = ?tx_input.execution_info.to,
512 tx_data_len = %tx_input.execution_info.input.len(),
513 tx_data = %tx_input.execution_info.input,
514 ?evm_input,
515 "executing local transaction attempt"
516 );
517
518 let evm_result = self.evms.execute(evm_input.clone(), EvmRoute::Transaction)?;
519
520 let tx_execution = TransactionExecution::new(tx_input.transaction_info.clone(), tx_input.signature.clone(), evm_input, evm_result);
523 #[cfg(feature = "metrics")]
524 let tx_metrics = tx_execution.metrics();
525 #[cfg(feature = "metrics")]
526 let gas_used = tx_execution.result.execution.gas_used;
527 #[cfg(feature = "metrics")]
528 let function = codegen::function_sig(&tx_input.execution_info.input);
529 #[cfg(feature = "metrics")]
530 let contract = codegen::contract_name(&tx_input.execution_info.to);
531
532 if let ExecutionResult::Reverted { reason } = &tx_execution.result.execution.result {
533 tracing::info!(?reason, "local transaction execution reverted");
534 #[cfg(feature = "metrics")]
535 metrics::inc_executor_local_transaction_reverts(contract, function, reason.0.as_ref());
536 }
537
538 match self.miner.save_execution(tx_execution) {
539 Ok(_) => {
540 #[cfg(feature = "metrics")]
542 {
543 metrics::inc_executor_local_transaction_account_reads(tx_metrics.account_reads, contract, function);
544 metrics::inc_executor_local_transaction_slot_reads(tx_metrics.slot_reads, contract, function);
545 metrics::inc_executor_local_transaction_gas(gas_used.as_u64() as usize, true, contract, function);
546 }
547 return Ok(());
548 }
549 Err(e) => match e {
550 StratusError::Storage(StorageError::EvmInputMismatch { ref expected, ref actual }) => {
551 tracing::warn!(?expected, ?actual, "evm input and block header mismatch");
552 if attempt >= max_attempts {
553 return Err(e);
554 }
555 continue;
556 }
557 _ => return Err(e),
558 },
559 }
560 }
561 }
562
563 #[tracing::instrument(name = "executor::local_call", skip_all, fields(from, to))]
565 pub fn execute_local_call(&self, call_input: CallInput, point_in_time: PointInTime) -> Result<EvmExecution, StratusError> {
566 #[cfg(feature = "metrics")]
567 let start = metrics::now();
568
569 Span::with(|s| {
570 s.rec_opt("from", &call_input.from);
571 s.rec_opt("to", &call_input.to);
572 });
573 tracing::info!(
574 from = ?call_input.from,
575 to = ?call_input.to,
576 data_len = call_input.data.len(),
577 data = %call_input.data,
578 %point_in_time,
579 "executing read-only local transaction"
580 );
581
582 let evm_input = match point_in_time {
584 PointInTime::Pending => {
585 let (pending_header, tx_count) = self.storage.read_pending_block_header();
586 EvmInput::from_pending_block(call_input.clone(), pending_header, tx_count)
587 }
588 point_in_time => {
589 let Some(block) = self.storage.read_block(point_in_time.into())? else {
590 return Err(RpcError::BlockFilterInvalid { filter: point_in_time.into() }.into());
591 };
592 EvmInput::from_mined_block(call_input.clone(), block, point_in_time)
593 }
594 };
595
596 let evm_route = match point_in_time {
597 PointInTime::Pending => EvmRoute::CallPresent,
598 PointInTime::Mined | PointInTime::MinedPast(_) => EvmRoute::CallPast,
599 };
600 let evm_result = self.evms.execute(evm_input, evm_route);
601
602 #[cfg(feature = "metrics")]
604 {
605 let function = codegen::function_sig(&call_input.data);
606 let contract = codegen::contract_name(&call_input.to);
607
608 match &evm_result {
609 Ok(evm_result) => {
610 metrics::inc_executor_local_call(start.elapsed(), true, contract, function);
611 metrics::inc_executor_local_call_account_reads(evm_result.metrics.account_reads, contract, function);
612 metrics::inc_executor_local_call_slot_reads(evm_result.metrics.slot_reads, contract, function);
613 metrics::inc_executor_local_call_gas(evm_result.execution.gas_used.as_u64() as usize, contract, function);
614 }
615 Err(_) => {
616 metrics::inc_executor_local_call(start.elapsed(), false, contract, function);
617 }
618 }
619 }
620
621 let execution = evm_result?.execution;
622 Ok(execution)
623 }
624
625 pub fn trace_transaction(&self, tx_hash: Hash, opts: Option<GethDebugTracingOptions>, trace_unsuccessful_only: bool) -> Result<GethTrace, StratusError> {
626 Span::with(|s| {
627 s.rec_str("tx_hash", &tx_hash);
628 });
629
630 tracing::info!("inspecting transaction");
631 let opts = opts.unwrap_or_default();
632 let tracer_type = opts.tracer.clone();
633
634 timed(|| {
635 self.evms.inspect(InspectorInput {
636 tx_hash,
637 opts,
638 trace_unsuccessful_only,
639 })
640 })
641 .with(|m| metrics::inc_evm_inspect(m.elapsed, serde_json::to_string(&tracer_type).unwrap_or_else(|_| "unkown".to_owned())))
642 }
643}