stratus/eth/executor/
executor.rs

1use std::cmp::max;
2use std::mem;
3use std::sync::Arc;
4
5#[cfg(feature = "metrics")]
6use alloy_consensus::Transaction;
7use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
8use alloy_rpc_types_trace::geth::GethTrace;
9use cfg_if::cfg_if;
10use parking_lot::Mutex;
11use tracing::Span;
12use tracing::debug_span;
13#[cfg(feature = "tracing")]
14use tracing::info_span;
15
16use super::evm_input::InspectorInput;
17use crate::GlobalState;
18#[cfg(feature = "metrics")]
19use crate::eth::codegen;
20use crate::eth::executor::Evm;
21use crate::eth::executor::EvmExecutionResult;
22use crate::eth::executor::EvmInput;
23use crate::eth::executor::ExecutorConfig;
24use crate::eth::executor::evm::EvmKind;
25use crate::eth::miner::Miner;
26use crate::eth::primitives::BlockNumber;
27use crate::eth::primitives::CallInput;
28use crate::eth::primitives::EvmExecution;
29use crate::eth::primitives::EvmExecutionMetrics;
30use crate::eth::primitives::ExecutionResult;
31use crate::eth::primitives::ExternalBlock;
32use crate::eth::primitives::ExternalReceipt;
33use crate::eth::primitives::ExternalReceipts;
34use crate::eth::primitives::ExternalTransaction;
35use crate::eth::primitives::Hash;
36use crate::eth::primitives::PointInTime;
37use crate::eth::primitives::RpcError;
38use crate::eth::primitives::StorageError;
39use crate::eth::primitives::StratusError;
40use crate::eth::primitives::TransactionError;
41use crate::eth::primitives::TransactionExecution;
42use crate::eth::primitives::TransactionInput;
43use crate::eth::primitives::UnexpectedError;
44use crate::eth::primitives::UnixTime;
45use crate::eth::storage::ReadKind;
46use crate::eth::storage::StratusStorage;
47#[cfg(feature = "metrics")]
48use crate::ext::OptionExt;
49use crate::ext::spawn_thread;
50use crate::ext::to_json_string;
51use crate::infra::metrics;
52use crate::infra::metrics::timed;
53use crate::infra::tracing::SpanExt;
54use crate::infra::tracing::warn_task_tx_closed;
55
56// -----------------------------------------------------------------------------
57// Evm task
58// -----------------------------------------------------------------------------
59
60#[derive(Debug)]
61pub struct EvmTask {
62    pub span: Span,
63    pub input: EvmInput,
64    pub response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>,
65}
66
67impl EvmTask {
68    pub fn new(input: EvmInput, response_tx: oneshot::Sender<Result<EvmExecutionResult, StratusError>>) -> Self {
69        Self {
70            span: Span::current(),
71            input,
72            response_tx,
73        }
74    }
75}
76
77pub struct InspectorTask {
78    pub span: Span,
79    pub input: InspectorInput,
80    pub response_tx: oneshot::Sender<Result<GethTrace, StratusError>>,
81}
82
83impl InspectorTask {
84    pub fn new(input: InspectorInput, response_tx: oneshot::Sender<Result<GethTrace, StratusError>>) -> Self {
85        Self {
86            span: Span::current(),
87            input,
88            response_tx,
89        }
90    }
91}
92
93// -----------------------------------------------------------------------------
94// Evm communication channels
95// -----------------------------------------------------------------------------
96
97/// Manages EVM pool and communication channels.
98struct Evms {
99    /// Pool for serial execution of transactions received via `eth_sendRawTransaction`. Usually contains a single EVM.
100    pub tx_local: crossbeam_channel::Sender<EvmTask>,
101
102    /// Pool for serial execution of external transactions received via `importer-online` or `importer-offline`. Usually contains a single EVM.
103    pub tx_external: crossbeam_channel::Sender<EvmTask>,
104
105    /// Pool for parallel execution of calls (eth_call and eth_estimateGas) reading from current state. Usually contains multiple EVMs.
106    pub call_present: crossbeam_channel::Sender<EvmTask>,
107
108    /// Pool for parallel execution of calls (eth_call and eth_estimateGas) reading from past state. Usually contains multiple EVMs.
109    pub call_past: crossbeam_channel::Sender<EvmTask>,
110
111    pub inspector: crossbeam_channel::Sender<InspectorTask>,
112}
113
114impl Evms {
115    /// Spawns EVM tasks in background.
116    fn spawn(storage: Arc<StratusStorage>, config: &ExecutorConfig) -> Self {
117        // function executed by evm threads
118        fn evm_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<EvmTask>, kind: EvmKind) {
119            let mut evm = Evm::new(storage, config, kind);
120
121            // keep executing transactions until the channel is closed
122            while let Ok(task) = task_rx.recv() {
123                if GlobalState::is_shutdown_warn(task_name) {
124                    return;
125                }
126
127                // execute
128                let _enter = task.span.enter();
129                let result = evm.execute(task.input);
130                if let Err(e) = task.response_tx.send(result) {
131                    tracing::error!(reason = ?e, "failed to send evm task execution result");
132                }
133            }
134            warn_task_tx_closed(task_name);
135        }
136
137        // function that spawn evm threads
138        let spawn_evms = |task_name: &str, num_evms: usize, kind: EvmKind| {
139            let (evm_tx, evm_rx) = crossbeam_channel::unbounded::<EvmTask>();
140
141            for evm_index in 1..=num_evms {
142                let evm_task_name = format!("{task_name}-{evm_index}");
143                let evm_storage = Arc::clone(&storage);
144                let evm_config = config.clone();
145                let evm_rx = evm_rx.clone();
146                let thread_name = evm_task_name.clone();
147                spawn_thread(&thread_name, move || {
148                    evm_loop(&evm_task_name, evm_storage, evm_config, evm_rx, kind);
149                });
150            }
151            evm_tx
152        };
153
154        fn inspector_loop(task_name: &str, storage: Arc<StratusStorage>, config: ExecutorConfig, task_rx: crossbeam_channel::Receiver<InspectorTask>) {
155            let mut evm = Evm::new(storage, config, EvmKind::Call);
156
157            // keep executing transactions until the channel is closed
158            while let Ok(task) = task_rx.recv() {
159                if GlobalState::is_shutdown_warn(task_name) {
160                    return;
161                }
162
163                // execute
164                let _enter = task.span.enter();
165                let result = evm.inspect(task.input);
166                if let Err(e) = task.response_tx.send(result) {
167                    tracing::error!(reason = ?e, "failed to send evm task execution result");
168                }
169            }
170            warn_task_tx_closed(task_name);
171        }
172
173        // function that spawn inspector threads
174        let spawn_inspectors = |task_name: &str, num_evms: usize| {
175            let (tx, rx) = crossbeam_channel::unbounded::<InspectorTask>();
176
177            for index in 1..=num_evms {
178                let task_name = format!("{task_name}-{index}");
179                let storage = Arc::clone(&storage);
180                let config = config.clone();
181                let rx = rx.clone();
182                let thread_name = task_name.clone();
183                spawn_thread(&thread_name, move || {
184                    inspector_loop(&task_name, storage, config, rx);
185                });
186            }
187            tx
188        };
189
190        let tx_local = spawn_evms("evm-tx-serial", 1, EvmKind::Transaction);
191        let tx_external = spawn_evms("evm-tx-external", 1, EvmKind::Transaction);
192        let call_present = spawn_evms(
193            "evm-call-present",
194            max(config.executor_call_present_evms.unwrap_or(config.executor_evms / 2), 1),
195            EvmKind::Call,
196        );
197        let call_past = spawn_evms(
198            "evm-call-past",
199            max(config.executor_call_past_evms.unwrap_or(config.executor_evms / 4), 1),
200            EvmKind::Call,
201        );
202        let inspector = spawn_inspectors("inspector", max(config.executor_inspector_evms.unwrap_or(config.executor_evms / 4), 1));
203
204        Evms {
205            tx_local,
206            tx_external,
207            call_present,
208            call_past,
209            inspector,
210        }
211    }
212
213    /// Executes a transaction in the specified route.
214    fn execute(&self, evm_input: EvmInput, route: EvmRoute) -> Result<EvmExecutionResult, StratusError> {
215        let (execution_tx, execution_rx) = oneshot::channel::<Result<EvmExecutionResult, StratusError>>();
216
217        let task = EvmTask::new(evm_input, execution_tx);
218        let _ = match route {
219            EvmRoute::Local => self.tx_local.send(task),
220            EvmRoute::External => self.tx_external.send(task),
221            EvmRoute::CallPresent => self.call_present.send(task),
222            EvmRoute::CallPast => self.call_past.send(task),
223        };
224
225        match execution_rx.recv() {
226            Ok(result) => result,
227            Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
228        }
229    }
230
231    fn inspect(&self, input: InspectorInput) -> Result<GethTrace, StratusError> {
232        let (inspector_tx, inspector_rx) = oneshot::channel::<Result<GethTrace, StratusError>>();
233        let task = InspectorTask::new(input, inspector_tx);
234        let _ = self.inspector.send(task);
235        match inspector_rx.recv() {
236            Ok(result) => result,
237            Err(_) => Err(UnexpectedError::ChannelClosed { channel: "evm" }.into()),
238        }
239    }
240}
241
242#[derive(Debug, Clone, Copy, strum::Display)]
243pub enum EvmRoute {
244    #[strum(to_string = "local")]
245    Local,
246
247    #[strum(to_string = "external")]
248    External,
249
250    #[strum(to_string = "call_present")]
251    CallPresent,
252
253    #[strum(to_string = "call_past")]
254    CallPast,
255}
256
257// -----------------------------------------------------------------------------
258// Executor
259// -----------------------------------------------------------------------------
260
261/// Locks used for local execution.
262#[derive(Default)]
263pub struct ExecutorLocks {
264    transaction: Mutex<()>,
265}
266
267pub struct Executor {
268    /// Executor inner locks.
269    locks: ExecutorLocks,
270
271    /// Channels to send transactions to background EVMs.
272    evms: Evms,
273
274    /// Mutex-wrapped miner for creating new blockchain blocks.
275    miner: Arc<Miner>,
276
277    /// Shared storage backend for persisting blockchain state.
278    storage: Arc<StratusStorage>,
279}
280
281impl Executor {
282    pub fn new(storage: Arc<StratusStorage>, miner: Arc<Miner>, config: ExecutorConfig) -> Self {
283        tracing::info!(?config, "creating executor");
284        let evms = Evms::spawn(Arc::clone(&storage), &config);
285        Self {
286            locks: ExecutorLocks::default(),
287            evms,
288            miner,
289            storage,
290        }
291    }
292
293    // -------------------------------------------------------------------------
294    // External transactions
295    // -------------------------------------------------------------------------
296
297    /// Reexecutes an external block locally and imports it to the temporary storage.
298    ///
299    /// Returns the remaining receipts that were not consumed by the execution.
300    pub fn execute_external_block(&self, mut block: ExternalBlock, mut receipts: ExternalReceipts) -> anyhow::Result<()> {
301        // track
302        #[cfg(feature = "metrics")]
303        let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default());
304
305        #[cfg(feature = "tracing")]
306        let _span = info_span!("executor::external_block", block_number = %block.number()).entered();
307        tracing::info!(block_number = %block.number(), "reexecuting external block");
308
309        // track pending block
310        let block_number = block.number();
311        let block_timestamp = block.timestamp();
312        let block_transactions = mem::take(&mut block.transactions);
313
314        // determine how to execute each transaction
315        for tx in block_transactions.into_transactions() {
316            let receipt = receipts.try_remove(tx.hash())?;
317            self.execute_external_transaction(
318                tx,
319                receipt,
320                block_number,
321                block_timestamp,
322                #[cfg(feature = "metrics")]
323                &mut block_metrics,
324            )?;
325        }
326
327        // track block metrics
328        #[cfg(feature = "metrics")]
329        {
330            metrics::inc_executor_external_block(start.elapsed());
331            metrics::inc_executor_external_block_account_reads(block_metrics.account_reads);
332            metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads);
333        }
334
335        Ok(())
336    }
337
338    /// Reexecutes an external transaction locally ensuring it produces the same output.
339    ///
340    /// This function wraps `reexecute_external_tx_inner` and returns back the payload
341    /// to facilitate re-execution of parallel transactions that failed
342    fn execute_external_transaction(
343        &self,
344        tx: ExternalTransaction,
345        receipt: ExternalReceipt,
346        block_number: BlockNumber,
347        block_timestamp: UnixTime,
348        #[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics,
349    ) -> anyhow::Result<()> {
350        // track
351        #[cfg(feature = "metrics")]
352        let (start, tx_function, tx_contract) = (
353            metrics::now(),
354            codegen::function_sig(tx.inner.input()),
355            codegen::contract_name(&tx.0.to().map_into()),
356        );
357
358        #[cfg(feature = "tracing")]
359        let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash()).entered();
360        tracing::info!(%block_number, tx_hash = %tx.hash(), "reexecuting external transaction");
361
362        let evm_input = EvmInput::from_external(&tx, &receipt, block_number, block_timestamp)?;
363
364        // when transaction externally failed, create fake transaction instead of reexecuting
365        let tx_execution = match receipt.is_success() {
366            // successful external transaction, re-execute locally
367            true => {
368                // re-execute transaction
369                let evm_execution = self.evms.execute(evm_input.clone(), EvmRoute::External);
370
371                // handle re-execution result
372                let mut evm_execution = match evm_execution {
373                    Ok(inner) => inner,
374                    Err(e) => {
375                        let json_tx = to_json_string(&tx);
376                        let json_receipt = to_json_string(&receipt);
377                        tracing::error!(reason = ?e, %block_number, tx_hash = %tx.hash(), %json_tx, %json_receipt, "failed to reexecute external transaction");
378                        return Err(e.into());
379                    }
380                };
381
382                // update execution with receipt
383                evm_execution.execution.apply_receipt(&receipt)?;
384
385                // ensure it matches receipt before saving
386                if let Err(e) = evm_execution.execution.compare_with_receipt(&receipt) {
387                    let json_tx = to_json_string(&tx);
388                    let json_receipt = to_json_string(&receipt);
389                    let json_execution_logs = to_json_string(&evm_execution.execution.logs);
390                    tracing::error!(reason = ?e, %block_number, tx_hash = %tx.hash(), %json_tx, %json_receipt, %json_execution_logs, "failed to reexecute external transaction");
391                    return Err(e);
392                };
393
394                TransactionExecution::new(tx.try_into()?, evm_input, evm_execution)
395            }
396            //
397            // failed external transaction, re-create from receipt without re-executing
398            false => {
399                let sender = self.storage.read_account(receipt.from.into(), PointInTime::Pending, ReadKind::Transaction)?;
400                let execution = EvmExecution::from_failed_external_transaction(sender, &receipt, block_timestamp)?;
401                let evm_result = EvmExecutionResult {
402                    execution,
403                    metrics: EvmExecutionMetrics::default(),
404                };
405                TransactionExecution::new(tx.try_into()?, evm_input, evm_result)
406            }
407        };
408
409        // keep metrics info to avoid cloning when saving
410        cfg_if! {
411            if #[cfg(feature = "metrics")] {
412                let tx_metrics = tx_execution.metrics();
413                let tx_gas = tx_execution.result.execution.gas;
414            }
415        }
416
417        // persist state
418        self.miner.save_execution(tx_execution, false)?;
419
420        // track metrics
421        #[cfg(feature = "metrics")]
422        {
423            *block_metrics += tx_metrics;
424
425            metrics::inc_executor_external_transaction(start.elapsed(), tx_contract, tx_function);
426            metrics::inc_executor_external_transaction_account_reads(tx_metrics.account_reads, tx_contract, tx_function);
427            metrics::inc_executor_external_transaction_slot_reads(tx_metrics.slot_reads, tx_contract, tx_function);
428            metrics::inc_executor_external_transaction_gas(tx_gas.as_u64() as usize, tx_contract, tx_function);
429        }
430
431        Ok(())
432    }
433
434    // -------------------------------------------------------------------------
435    // Local transactions
436    // -------------------------------------------------------------------------
437
438    /// Executes a transaction persisting state changes.
439    #[tracing::instrument(name = "executor::local_transaction", skip_all, fields(tx_hash, tx_from, tx_to, tx_nonce))]
440    pub fn execute_local_transaction(&self, tx: TransactionInput) -> Result<(), StratusError> {
441        #[cfg(feature = "metrics")]
442        let function = codegen::function_sig(&tx.input);
443        #[cfg(feature = "metrics")]
444        let contract = codegen::contract_name(&tx.to);
445        #[cfg(feature = "metrics")]
446        let start = metrics::now();
447
448        tracing::debug!(tx_hash = %tx.hash, "executing local transaction");
449
450        // track
451        Span::with(|s| {
452            s.rec_str("tx_hash", &tx.hash);
453            s.rec_str("tx_from", &tx.signer);
454            s.rec_opt("tx_to", &tx.to);
455            s.rec_str("tx_nonce", &tx.nonce);
456        });
457
458        // execute according to the strategy
459        const INFINITE_ATTEMPTS: usize = usize::MAX;
460
461        // Executes transactions serially:
462        // * Uses a Mutex, so a new transactions starts executing only after the previous one is executed and persisted.
463        // * Without a Mutex, conflict can happen because the next transactions starts executing before the previous one is saved.
464        let _transaction_lock = self.locks.transaction.lock();
465
466        // execute transaction
467        let tx_execution = self.execute_local_transaction_attempts(tx, INFINITE_ATTEMPTS);
468
469        #[cfg(feature = "metrics")]
470        metrics::inc_executor_local_transaction(start.elapsed(), tx_execution.is_ok(), contract, function);
471
472        tx_execution
473    }
474
475    /// Executes a transaction until it reaches the max number of attempts.
476    fn execute_local_transaction_attempts(&self, tx_input: TransactionInput, max_attempts: usize) -> Result<(), StratusError> {
477        // validate
478        if tx_input.signer.is_zero() {
479            return Err(TransactionError::FromZeroAddress.into());
480        }
481
482        // executes transaction until no more conflicts
483        let mut attempt = 0;
484        loop {
485            attempt += 1;
486
487            // track
488            let _span = debug_span!(
489                "executor::local_transaction_attempt",
490                %attempt,
491                tx_hash = %tx_input.hash,
492                tx_from = %tx_input.signer,
493                tx_to = tracing::field::Empty,
494                tx_nonce = %tx_input.nonce
495            )
496            .entered();
497            Span::with(|s| {
498                s.rec_opt("tx_to", &tx_input.to);
499            });
500
501            // prepare evm input
502            let (pending_header, _) = self.storage.read_pending_block_header();
503            let evm_input = EvmInput::from_eth_transaction(&tx_input, &pending_header);
504
505            // execute transaction in evm (retry only in case of conflict, but do not retry on other failures)
506            tracing::debug!(
507                %attempt,
508                tx_hash = %tx_input.hash,
509                tx_nonce = %tx_input.nonce,
510                tx_from = %tx_input.from,
511                tx_signer = %tx_input.signer,
512                tx_to = ?tx_input.to,
513                tx_data_len = %tx_input.input.len(),
514                tx_data = %tx_input.input,
515                ?evm_input,
516                "executing local transaction attempt"
517            );
518
519            let evm_result = self.evms.execute(evm_input.clone(), EvmRoute::Local)?;
520
521            // save execution to temporary storage
522            // in case of failure, retry if conflict or abandon if unexpected error
523            let tx_execution = TransactionExecution::new(tx_input.clone(), evm_input, evm_result);
524            #[cfg(feature = "metrics")]
525            let tx_metrics = tx_execution.metrics();
526            #[cfg(feature = "metrics")]
527            let gas_used = tx_execution.result.execution.gas;
528            #[cfg(feature = "metrics")]
529            let function = codegen::function_sig(&tx_input.input);
530            #[cfg(feature = "metrics")]
531            let contract = codegen::contract_name(&tx_input.to);
532
533            if let ExecutionResult::Reverted { reason } = &tx_execution.result.execution.result {
534                tracing::info!(?reason, "local transaction execution reverted");
535                #[cfg(feature = "metrics")]
536                metrics::inc_executor_local_transaction_reverts(contract, function, reason.0.as_ref());
537            }
538
539            match self.miner.save_execution(tx_execution, true) {
540                Ok(_) => {
541                    // track metrics
542                    #[cfg(feature = "metrics")]
543                    {
544                        metrics::inc_executor_local_transaction_account_reads(tx_metrics.account_reads, contract, function);
545                        metrics::inc_executor_local_transaction_slot_reads(tx_metrics.slot_reads, contract, function);
546                        metrics::inc_executor_local_transaction_gas(gas_used.as_u64() as usize, true, contract, function);
547                    }
548                    return Ok(());
549                }
550                Err(e) => match e {
551                    StratusError::Storage(StorageError::EvmInputMismatch { ref expected, ref actual }) => {
552                        tracing::warn!(?expected, ?actual, "evm input and block header mismatch");
553                        if attempt >= max_attempts {
554                            return Err(e);
555                        }
556                        continue;
557                    }
558                    _ => return Err(e),
559                },
560            }
561        }
562    }
563
564    /// Executes a transaction without persisting state changes.
565    #[tracing::instrument(name = "executor::local_call", skip_all, fields(from, to))]
566    pub fn execute_local_call(&self, call_input: CallInput, point_in_time: PointInTime) -> Result<EvmExecution, StratusError> {
567        #[cfg(feature = "metrics")]
568        let start = metrics::now();
569
570        Span::with(|s| {
571            s.rec_opt("from", &call_input.from);
572            s.rec_opt("to", &call_input.to);
573        });
574        tracing::info!(
575            from = ?call_input.from,
576            to = ?call_input.to,
577            data_len = call_input.data.len(),
578            data = %call_input.data,
579            %point_in_time,
580            "executing read-only local transaction"
581        );
582
583        // execute
584        let evm_input = match point_in_time {
585            PointInTime::Pending => {
586                let (pending_header, tx_count) = self.storage.read_pending_block_header();
587                EvmInput::from_pending_block(call_input.clone(), pending_header, tx_count)
588            }
589            point_in_time => {
590                let Some(block) = self.storage.read_block(point_in_time.into())? else {
591                    return Err(RpcError::BlockFilterInvalid { filter: point_in_time.into() }.into());
592                };
593                EvmInput::from_mined_block(call_input.clone(), block, point_in_time)
594            }
595        };
596
597        let evm_route = match point_in_time {
598            PointInTime::Pending => EvmRoute::CallPresent,
599            PointInTime::Mined | PointInTime::MinedPast(_) => EvmRoute::CallPast,
600        };
601        let evm_result = self.evms.execute(evm_input, evm_route);
602
603        // track metrics
604        #[cfg(feature = "metrics")]
605        {
606            let function = codegen::function_sig(&call_input.data);
607            let contract = codegen::contract_name(&call_input.to);
608
609            match &evm_result {
610                Ok(evm_result) => {
611                    metrics::inc_executor_local_call(start.elapsed(), true, contract, function);
612                    metrics::inc_executor_local_call_account_reads(evm_result.metrics.account_reads, contract, function);
613                    metrics::inc_executor_local_call_slot_reads(evm_result.metrics.slot_reads, contract, function);
614                    metrics::inc_executor_local_call_gas(evm_result.execution.gas.as_u64() as usize, contract, function);
615                }
616                Err(_) => {
617                    metrics::inc_executor_local_call(start.elapsed(), false, contract, function);
618                }
619            }
620        }
621
622        let execution = evm_result?.execution;
623        Ok(execution)
624    }
625
626    pub fn trace_transaction(&self, tx_hash: Hash, opts: Option<GethDebugTracingOptions>, trace_unsuccessful_only: bool) -> Result<GethTrace, StratusError> {
627        Span::with(|s| {
628            s.rec_str("tx_hash", &tx_hash);
629        });
630
631        tracing::info!("inspecting transaction");
632        let opts = opts.unwrap_or_default();
633        let tracer_type = opts.tracer.clone();
634
635        timed(|| {
636            self.evms.inspect(InspectorInput {
637                tx_hash,
638                opts,
639                trace_unsuccessful_only,
640            })
641        })
642        .with(|m| metrics::inc_evm_inspect(m.elapsed, serde_json::to_string(&tracer_type).unwrap_or_else(|_| "unkown".to_owned())))
643    }
644}