stratus/eth/rpc/
rpc_server.rs

1//! RPC server for HTTP and WS.
2
3use std::collections::HashMap;
4use std::ops::Deref;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::LazyLock;
8use std::time::Duration;
9
10use alloy_primitives::U256;
11use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
12use alloy_rpc_types_trace::geth::GethTrace;
13use anyhow::Result;
14use futures::join;
15use http::Method;
16use itertools::Itertools;
17use jsonrpsee::Extensions;
18use jsonrpsee::IntoSubscriptionCloseResponse;
19use jsonrpsee::PendingSubscriptionSink;
20use jsonrpsee::server::BatchRequestConfig;
21use jsonrpsee::server::RandomStringIdProvider;
22use jsonrpsee::server::RpcModule;
23use jsonrpsee::server::Server as RpcServer;
24use jsonrpsee::server::ServerConfig;
25use jsonrpsee::server::ServerHandle;
26use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
27use jsonrpsee::types::Params;
28use jsonrpsee::ws_client::RpcServiceBuilder;
29use parking_lot::RwLock;
30use serde_json::json;
31use tokio::runtime::Handle;
32use tokio::select;
33use tokio::sync::Semaphore;
34use tokio::sync::SemaphorePermit;
35use tower_http::cors::Any;
36use tower_http::cors::CorsLayer;
37use tracing::Instrument;
38use tracing::Span;
39use tracing::field;
40use tracing::info_span;
41
42use crate::GlobalState;
43use crate::NodeMode;
44use crate::alias::AlloyReceipt;
45use crate::alias::JsonValue;
46use crate::config::StratusConfig;
47use crate::eth::codegen;
48use crate::eth::codegen::CONTRACTS;
49use crate::eth::decode;
50use crate::eth::executor::Executor;
51use crate::eth::follower::consensus::Consensus;
52use crate::eth::follower::importer::ImporterConfig;
53use crate::eth::follower::importer::ImporterConsensus;
54use crate::eth::follower::importer::send_block_to_kafka;
55use crate::eth::miner::Miner;
56use crate::eth::miner::MinerMode;
57use crate::eth::primitives::Address;
58use crate::eth::primitives::BlockFilter;
59use crate::eth::primitives::Bytes;
60use crate::eth::primitives::CallInput;
61use crate::eth::primitives::ChainId;
62use crate::eth::primitives::ConsensusError;
63use crate::eth::primitives::DecodeInputError;
64use crate::eth::primitives::EvmExecution;
65use crate::eth::primitives::Hash;
66use crate::eth::primitives::ImporterError;
67use crate::eth::primitives::LogFilterInput;
68#[cfg(feature = "dev")]
69use crate::eth::primitives::Nonce;
70use crate::eth::primitives::PointInTime;
71use crate::eth::primitives::RpcError;
72use crate::eth::primitives::SlotIndex;
73#[cfg(feature = "dev")]
74use crate::eth::primitives::SlotValue;
75use crate::eth::primitives::StateError;
76use crate::eth::primitives::StorageError;
77use crate::eth::primitives::StratusError;
78use crate::eth::primitives::TransactionError;
79use crate::eth::primitives::TransactionInput;
80use crate::eth::primitives::TransactionStage;
81#[cfg(feature = "dev")]
82use crate::eth::primitives::Wei;
83use crate::eth::rpc::RpcContext;
84use crate::eth::rpc::RpcHttpMiddleware;
85use crate::eth::rpc::RpcMiddleware;
86use crate::eth::rpc::RpcServerConfig;
87use crate::eth::rpc::RpcSubscriptions;
88use crate::eth::rpc::next_rpc_param;
89use crate::eth::rpc::next_rpc_param_or_default;
90use crate::eth::rpc::rpc_parser::RpcExtensionsExt;
91use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsHandles;
92use crate::eth::storage::ReadKind;
93use crate::eth::storage::StratusStorage;
94use crate::ext::InfallibleExt;
95use crate::ext::WatchReceiverExt;
96use crate::ext::not;
97use crate::ext::parse_duration;
98use crate::ext::to_json_string;
99use crate::ext::to_json_value;
100use crate::infra::build_info;
101use crate::infra::kafka::KafkaConfig;
102use crate::infra::metrics;
103use crate::infra::tracing::SpanExt;
104use crate::log_and_err;
105// -----------------------------------------------------------------------------
106// Server
107// -----------------------------------------------------------------------------
108
109#[derive(Clone, derive_new::new)]
110pub struct Server {
111    // services
112    pub storage: Arc<StratusStorage>,
113    pub executor: Arc<Executor>,
114    pub miner: Arc<Miner>,
115    pub importer: Arc<RwLock<Option<Arc<ImporterConsensus>>>>,
116
117    // config
118    pub app_config: StratusConfig,
119    pub rpc_config: RpcServerConfig,
120    pub chain_id: ChainId,
121}
122
123impl Server {
124    pub async fn serve(self) -> Result<()> {
125        let this = Arc::new(self);
126        this.update_health().await;
127
128        const TASK_NAME: &str = "rpc-server";
129        let health_worker_handle = tokio::task::spawn({
130            let this = Arc::clone(&this);
131            async move {
132                this.health_worker().await;
133            }
134        });
135        this.update_health().await;
136        let mut health = GlobalState::get_health_receiver();
137        health.mark_unchanged();
138        let (server_handle, subscriptions) = loop {
139            let (server_handle, subscriptions) = this._serve().await?;
140            let server_handle_watch = server_handle.clone();
141
142            // await for cancellation or jsonrpsee to stop (should not happen) or health changes
143            select! {
144                // If the server stop unexpectedly, shutdown stratus and break the loop
145                _ = server_handle_watch.stopped() => {
146                    GlobalState::shutdown_from(TASK_NAME, "finished unexpectedly");
147                    break (server_handle, subscriptions);
148                },
149                // If a shutdown is requested, stop the server and break the loop
150                _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
151                    let _ = server_handle.stop();
152                    break (server_handle, subscriptions);
153                },
154                // If the health state changes to unhealthy, stop the server and subscriptions and recreate them (causing all connections to be dropped)
155                _ = health.wait_for_change(|healthy| GlobalState::restart_on_unhealthy() && !healthy) => {
156                    tracing::info!("health state changed to unhealthy, restarting the rpc server");
157                    let _ = server_handle.stop();
158                    subscriptions.abort();
159                    join!(server_handle.stopped(), subscriptions.stopped());
160                }
161            }
162        };
163        let res = join!(server_handle.stopped(), subscriptions.stopped(), health_worker_handle);
164        res.2?;
165        Ok(())
166    }
167
168    /// Starts JSON-RPC server.
169    async fn _serve(&self) -> anyhow::Result<(ServerHandle, RpcSubscriptionsHandles)> {
170        let this = self.clone();
171        const TASK_NAME: &str = "rpc-server";
172        tracing::info!(%this.rpc_config.rpc_address, %this.rpc_config.rpc_max_connections, "creating {}", TASK_NAME);
173
174        // configure subscriptions
175        let subs = RpcSubscriptions::spawn(
176            this.miner.notifier_pending_txs.subscribe(),
177            this.miner.notifier_blocks.subscribe(),
178            this.miner.notifier_logs.subscribe(),
179        );
180
181        // configure context
182        let ctx = RpcContext {
183            server: Arc::new(this.clone()),
184            client_version: "stratus",
185            subs: Arc::clone(&subs.connected),
186        };
187
188        // configure module
189        let mut module = RpcModule::<RpcContext>::new(ctx);
190        module = register_methods(module)?;
191
192        // configure middleware
193        let cors = CorsLayer::new().allow_methods([Method::POST]).allow_origin(Any).allow_headers(Any);
194        let rpc_middleware = RpcServiceBuilder::new().layer_fn(RpcMiddleware::new);
195        let http_middleware = tower::ServiceBuilder::new().layer(cors).layer_fn(RpcHttpMiddleware::new).layer(
196            ProxyGetRequestLayer::new([
197                ("/health", "stratus_health"),
198                ("/version", "stratus_version"),
199                ("/config", "stratus_config"),
200                ("/state", "stratus_state"),
201            ])
202            .unwrap(),
203        );
204
205        let server_config = ServerConfig::builder()
206            .set_id_provider(RandomStringIdProvider::new(8))
207            .max_connections(this.rpc_config.rpc_max_connections)
208            .max_response_body_size(this.rpc_config.rpc_max_response_size_bytes)
209            .set_batch_request_config(BatchRequestConfig::Limit(this.rpc_config.batch_request_limit))
210            .build();
211
212        // serve module
213        let server = RpcServer::builder()
214            .set_rpc_middleware(rpc_middleware)
215            .set_http_middleware(http_middleware)
216            .set_config(server_config)
217            .build(this.rpc_config.rpc_address)
218            .await?;
219
220        let handle_rpc_server = server.start(module);
221        Ok((handle_rpc_server, subs.handles))
222    }
223
224    pub async fn health_worker(&self) {
225        // Create an interval that ticks at the configured interval
226        let mut interval = tokio::time::interval(std::time::Duration::from_millis(self.rpc_config.health_check_interval_ms));
227
228        // Enter an infinite loop to periodically run the health check
229        loop {
230            // Call the health function and ignore its result
231            self.update_health().await;
232
233            // Wait for the next tick
234            interval.tick().await;
235            if GlobalState::is_shutdown() {
236                break;
237            }
238        }
239    }
240
241    async fn update_health(&self) {
242        let is_healthy = self.health().await;
243        GlobalState::set_health(is_healthy);
244        metrics::set_consensus_is_ready(if is_healthy { 1u64 } else { 0u64 });
245    }
246
247    fn read_importer(&self) -> Option<Arc<ImporterConsensus>> {
248        self.importer.read().as_ref().map(Arc::clone)
249    }
250
251    pub fn set_importer(&self, importer: Option<Arc<ImporterConsensus>>) {
252        *self.importer.write() = importer;
253    }
254
255    async fn health(&self) -> bool {
256        match GlobalState::get_node_mode() {
257            NodeMode::Leader | NodeMode::FakeLeader => true,
258            NodeMode::Follower =>
259                if GlobalState::is_importer_shutdown() {
260                    tracing::warn!("stratus is unhealthy because importer is shutdown");
261                    false
262                } else {
263                    match self.read_importer() {
264                        Some(importer) => importer.should_serve().await,
265                        None => {
266                            tracing::warn!("stratus is unhealthy because importer is not available");
267                            false
268                        }
269                    }
270                },
271        }
272    }
273}
274
275fn register_methods(mut module: RpcModule<RpcContext>) -> anyhow::Result<RpcModule<RpcContext>> {
276    // dev mode methods
277    #[cfg(feature = "dev")]
278    {
279        module.register_blocking_method("evm_setNextBlockTimestamp", evm_set_next_block_timestamp)?;
280        module.register_blocking_method("evm_mine", evm_mine)?;
281        module.register_blocking_method("hardhat_reset", stratus_reset)?;
282        module.register_blocking_method("stratus_reset", stratus_reset)?;
283        module.register_blocking_method("hardhat_setStorageAt", stratus_set_storage_at)?;
284        module.register_blocking_method("stratus_setStorageAt", stratus_set_storage_at)?;
285        module.register_blocking_method("hardhat_setNonce", stratus_set_nonce)?;
286        module.register_blocking_method("stratus_setNonce", stratus_set_nonce)?;
287        module.register_blocking_method("hardhat_setBalance", stratus_set_balance)?;
288        module.register_blocking_method("stratus_setBalance", stratus_set_balance)?;
289        module.register_blocking_method("hardhat_setCode", stratus_set_code)?;
290        module.register_blocking_method("stratus_setCode", stratus_set_code)?;
291        module.register_blocking_method("stratus_clearCache", stratus_clear_cache)?;
292    }
293
294    // stratus status
295    module.register_async_method("stratus_health", stratus_health)?;
296
297    // stratus admin
298    module.register_method("stratus_enableTransactions", stratus_enable_transactions)?;
299    module.register_method("stratus_disableTransactions", stratus_disable_transactions)?;
300    module.register_method("stratus_enableMiner", stratus_enable_miner)?;
301    module.register_method("stratus_disableMiner", stratus_disable_miner)?;
302    module.register_method("stratus_enableUnknownClients", stratus_enable_unknown_clients)?;
303    module.register_method("stratus_disableUnknownClients", stratus_disable_unknown_clients)?;
304    module.register_method("stratus_enableRestartOnUnhealthy", stratus_enable_restart_on_unhealthy)?;
305    module.register_method("stratus_disableRestartOnUnhealthy", stratus_disable_restart_on_unhealthy)?;
306    module.register_async_method("stratus_changeToLeader", stratus_change_to_leader)?;
307    module.register_async_method("stratus_changeToFollower", stratus_change_to_follower)?;
308    module.register_async_method("stratus_initImporter", stratus_init_importer)?;
309    module.register_method("stratus_shutdownImporter", stratus_shutdown_importer)?;
310    module.register_async_method("stratus_changeMinerMode", stratus_change_miner_mode)?;
311    module.register_async_method("stratus_emitBlockEvents", stratus_emit_block_events)?;
312
313    // stratus state
314    module.register_method("stratus_version", stratus_version)?;
315    module.register_method("stratus_config", stratus_config)?;
316    module.register_method("stratus_state", stratus_state)?;
317
318    module.register_async_method("stratus_getSubscriptions", stratus_get_subscriptions)?;
319    module.register_method("stratus_pendingTransactionsCount", stratus_pending_transactions_count)?;
320
321    // blockchain
322    module.register_method("net_version", net_version)?;
323    module.register_async_method("net_listening", net_listening)?;
324    module.register_method("eth_chainId", eth_chain_id)?;
325    module.register_method("web3_clientVersion", web3_client_version)?;
326
327    // gas
328    module.register_method("eth_gasPrice", eth_gas_price)?;
329
330    // stratus importing helpers
331    module.register_blocking_method("stratus_getBlockAndReceipts", stratus_get_block_and_receipts)?;
332    module.register_blocking_method("stratus_getBlockWithChanges", stratus_get_block_with_changes)?;
333
334    // block
335    module.register_blocking_method("eth_blockNumber", eth_block_number)?;
336    module.register_blocking_method("eth_getBlockByNumber", eth_get_block_by_number)?;
337    module.register_blocking_method("eth_getBlockByHash", eth_get_block_by_hash)?;
338    module.register_blocking_method("stratus_getBlockByTimestamp", stratus_get_block_by_timestamp)?;
339    module.register_method("eth_getUncleByBlockHashAndIndex", eth_get_uncle_by_block_hash_and_index)?;
340
341    // transactions
342    module.register_blocking_method("eth_getTransactionByHash", eth_get_transaction_by_hash)?;
343    module.register_blocking_method("eth_getTransactionReceipt", eth_get_transaction_receipt)?;
344    module.register_blocking_method("eth_estimateGas", eth_estimate_gas)?;
345    module.register_blocking_method("eth_call", eth_call)?;
346    module.register_blocking_method("eth_sendRawTransaction", eth_send_raw_transaction)?;
347    module.register_blocking_method("stratus_call", stratus_call)?;
348    module.register_blocking_method("stratus_getTransactionResult", stratus_get_transaction_result)?;
349    module.register_blocking_method("debug_traceTransaction", debug_trace_transaction)?;
350
351    // logs
352    module.register_blocking_method("eth_getLogs", eth_get_logs)?;
353
354    // account
355    module.register_method("eth_accounts", eth_accounts)?;
356    module.register_blocking_method("eth_getTransactionCount", eth_get_transaction_count)?;
357    module.register_blocking_method("eth_getBalance", eth_get_balance)?;
358    module.register_blocking_method("eth_getCode", eth_get_code)?;
359
360    // storage
361    module.register_blocking_method("eth_getStorageAt", eth_get_storage_at)?;
362
363    // subscriptions
364    module.register_subscription("eth_subscribe", "eth_subscription", "eth_unsubscribe", eth_subscribe)?;
365
366    Ok(module)
367}
368
369// -----------------------------------------------------------------------------
370// Debug
371// -----------------------------------------------------------------------------
372
373#[cfg(feature = "dev")]
374fn evm_mine(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
375    ctx.server.miner.mine_local_and_commit()?;
376    Ok(to_json_value(true))
377}
378
379#[cfg(feature = "dev")]
380fn stratus_clear_cache(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
381    ctx.server.storage.clear_cache();
382    Ok(to_json_value(true))
383}
384
385#[cfg(feature = "dev")]
386fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
387    use crate::eth::primitives::UnixTime;
388    use crate::log_and_err;
389
390    let (_, timestamp) = next_rpc_param::<UnixTime>(params.sequence())?;
391    let latest = ctx.server.storage.read_block(BlockFilter::Latest)?;
392    match latest {
393        Some(block) => UnixTime::set_offset(block.header.timestamp, timestamp)?,
394        None => return log_and_err!("reading latest block returned None")?,
395    }
396    Ok(to_json_value(timestamp))
397}
398
399// -----------------------------------------------------------------------------
400// Status - Health checks
401// -----------------------------------------------------------------------------
402
403#[allow(unused_variables)]
404async fn stratus_health(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
405    #[cfg(feature = "dev")]
406    ctx.server.update_health().await;
407
408    if GlobalState::is_shutdown() {
409        tracing::warn!("liveness check failed because of shutdown");
410        return Err(StateError::StratusShutdown.into());
411    }
412
413    if GlobalState::is_healthy() {
414        Ok(json!(true))
415    } else {
416        Err(StateError::StratusNotReady.into())
417    }
418}
419
420// -----------------------------------------------------------------------------
421// Stratus - Admin
422// -----------------------------------------------------------------------------
423
424#[cfg(feature = "dev")]
425fn stratus_reset(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
426    ctx.server.storage.reset_to_genesis()?;
427    Ok(to_json_value(true))
428}
429
430#[cfg(feature = "dev")]
431fn stratus_set_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
432    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
433    let (params, index) = next_rpc_param::<SlotIndex>(params)?;
434    let (_, value) = next_rpc_param::<SlotValue>(params)?;
435
436    tracing::info!(%address, %index, %value, "setting storage at address and index");
437
438    ctx.server.storage.set_storage_at(address, index, value)?;
439
440    Ok(to_json_value(true))
441}
442
443#[cfg(feature = "dev")]
444fn stratus_set_nonce(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
445    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
446    let (_, nonce) = next_rpc_param::<Nonce>(params)?;
447
448    tracing::info!(%address, %nonce, "setting nonce for address");
449
450    ctx.server.storage.set_nonce(address, nonce)?;
451
452    Ok(to_json_value(true))
453}
454
455#[cfg(feature = "dev")]
456fn stratus_set_balance(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
457    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
458    let (_, balance) = next_rpc_param::<Wei>(params)?;
459
460    tracing::info!(%address, %balance, "setting balance for address");
461
462    ctx.server.storage.set_balance(address, balance)?;
463
464    Ok(to_json_value(true))
465}
466
467#[cfg(feature = "dev")]
468fn stratus_set_code(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
469    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
470    let (_, code) = next_rpc_param::<Bytes>(params)?;
471
472    tracing::info!(%address, code_size = %code.0.len(), "setting code for address");
473
474    ctx.server.storage.set_code(address, code)?;
475
476    Ok(to_json_value(true))
477}
478
479static MODE_CHANGE_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
480
481async fn stratus_change_to_leader(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
482    ext.authentication().auth_admin()?;
483    let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
484    let _permit: SemaphorePermit = match permit {
485        Ok(permit) => permit,
486        Err(_) => return Err(StateError::ModeChangeInProgress.into()),
487    };
488
489    const LEADER_MINER_INTERVAL: Duration = Duration::from_secs(1);
490    tracing::info!("starting process to change node to leader");
491
492    if GlobalState::get_node_mode() == NodeMode::Leader {
493        tracing::info!("node is already in leader mode, no changes made");
494        return Ok(json!(false));
495    }
496
497    if GlobalState::is_transactions_enabled() {
498        tracing::error!("transactions are currently enabled, cannot change node mode");
499        return Err(StateError::TransactionsEnabled.into());
500    }
501
502    tracing::info!("shutting down importer");
503    let shutdown_importer_result = stratus_shutdown_importer(Params::new(None), &ctx, &ext);
504    match shutdown_importer_result {
505        Ok(_) => tracing::info!("importer shutdown successfully"),
506        Err(StratusError::Importer(ImporterError::AlreadyShutdown)) => {
507            tracing::warn!("importer is already shutdown, continuing");
508        }
509        Err(e) => {
510            tracing::error!(reason = ?e, "failed to shutdown importer");
511            return Err(e);
512        }
513    }
514
515    tracing::info!("wait for importer to shutdown");
516    GlobalState::wait_for_importer_to_finish().await;
517
518    let change_miner_mode_result = change_miner_mode(MinerMode::Interval(LEADER_MINER_INTERVAL), &ctx).await;
519    if let Err(e) = change_miner_mode_result {
520        tracing::error!(reason = ?e, "failed to change miner mode");
521        return Err(e);
522    }
523    tracing::info!("miner mode changed to interval(1s) successfully");
524
525    GlobalState::set_node_mode(NodeMode::Leader);
526    ctx.server.storage.clear_cache();
527    tracing::info!("node mode changed to leader successfully, cache cleared");
528
529    Ok(json!(true))
530}
531
532async fn stratus_change_to_follower(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
533    ext.authentication().auth_admin()?;
534    let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
535    let _permit: SemaphorePermit = match permit {
536        Ok(permit) => permit,
537        Err(_) => return Err(StateError::ModeChangeInProgress.into()),
538    };
539
540    tracing::info!("starting process to change node to follower");
541
542    if GlobalState::get_node_mode() == NodeMode::Follower {
543        tracing::info!("node is already in follower mode, no changes made");
544        return Ok(json!(false));
545    }
546
547    if GlobalState::is_transactions_enabled() {
548        tracing::error!("transactions are currently enabled, cannot change node mode");
549        return Err(StateError::TransactionsEnabled.into());
550    }
551
552    let pending_txs = ctx.server.storage.pending_transactions();
553    if not(pending_txs.is_empty()) {
554        tracing::error!(pending_txs = ?pending_txs.len(), "cannot change to follower mode with pending transactions");
555        return Err(StorageError::PendingTransactionsExist {
556            pending_txs: pending_txs.len(),
557        }
558        .into());
559    }
560
561    let change_miner_mode_result = change_miner_mode(MinerMode::External, &ctx).await;
562    if let Err(e) = change_miner_mode_result {
563        tracing::error!(reason = ?e, "failed to change miner mode");
564        return Err(e);
565    }
566    tracing::info!("miner mode changed to external successfully");
567
568    GlobalState::set_node_mode(NodeMode::Follower);
569    ctx.server.storage.clear_cache();
570    tracing::info!("storage cache cleared");
571
572    tracing::info!("initializing importer");
573    let init_importer_result = stratus_init_importer(params, Arc::clone(&ctx), ext).await;
574    match init_importer_result {
575        Ok(_) => tracing::info!("importer initialized successfully"),
576        Err(StratusError::Importer(ImporterError::AlreadyRunning)) => {
577            tracing::warn!("importer is already running, continuing");
578        }
579        Err(e) => {
580            tracing::error!(reason = ?e, "failed to initialize importer, reverting node mode to leader");
581            GlobalState::set_node_mode(NodeMode::Leader);
582            return Err(e);
583        }
584    }
585    tracing::info!("node mode changed to follower successfully");
586
587    Ok(json!(true))
588}
589
590async fn stratus_init_importer(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
591    ext.authentication().auth_admin()?;
592    let (params, external_rpc) = next_rpc_param::<String>(params.sequence())?;
593    let (params, external_rpc_ws) = next_rpc_param::<String>(params)?;
594    let (params, raw_external_rpc_timeout) = next_rpc_param::<String>(params)?;
595    let (params, raw_sync_interval) = next_rpc_param::<String>(params)?;
596    let (_, raw_external_rpc_max_response_size_bytes) = next_rpc_param::<String>(params)?;
597
598    let external_rpc_timeout = parse_duration(&raw_external_rpc_timeout).map_err(|e| {
599        tracing::error!(reason = ?e, "failed to parse external_rpc_timeout");
600        ImporterError::ConfigParseError
601    })?;
602
603    let sync_interval = parse_duration(&raw_sync_interval).map_err(|e| {
604        tracing::error!(reason = ?e, "failed to parse sync_interval");
605        ImporterError::ConfigParseError
606    })?;
607
608    let external_rpc_max_response_size_bytes = raw_external_rpc_max_response_size_bytes.parse::<u32>().map_err(|e| {
609        tracing::error!(reason = ?e, "failed to parse external_rpc_max_response_size_bytes");
610        ImporterError::ConfigParseError
611    })?;
612
613    let importer_config = ImporterConfig {
614        external_rpc,
615        external_rpc_ws: Some(external_rpc_ws),
616        external_rpc_timeout,
617        sync_interval,
618        external_rpc_max_response_size_bytes,
619        enable_block_changes_replication: std::env::var("ENABLE_BLOCK_CHANGES_REPLICATION")
620            .ok()
621            .is_some_and(|val| val == "1" || val == "true"),
622    };
623
624    importer_config.init_follower_importer(ctx).await
625}
626
627fn stratus_shutdown_importer(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<JsonValue, StratusError> {
628    ext.authentication().auth_admin()?;
629    if GlobalState::get_node_mode() != NodeMode::Follower {
630        tracing::error!("node is currently not a follower");
631        return Err(StateError::StratusNotFollower.into());
632    }
633
634    if GlobalState::is_importer_shutdown() && ctx.server.read_importer().is_none() {
635        tracing::error!("importer is already shut down");
636        return Err(ImporterError::AlreadyShutdown.into());
637    }
638
639    ctx.server.set_importer(None);
640
641    const TASK_NAME: &str = "rpc-server::importer-shutdown";
642    GlobalState::shutdown_importer_from(TASK_NAME, "received importer shutdown request");
643
644    Ok(json!(true))
645}
646
647async fn stratus_change_miner_mode(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
648    ext.authentication().auth_admin()?;
649    let (_, mode_str) = next_rpc_param::<String>(params.sequence())?;
650
651    let mode = MinerMode::from_str(&mode_str).map_err(|e| {
652        tracing::error!(reason = ?e, "failed to parse miner mode");
653        RpcError::MinerModeParamInvalid
654    })?;
655
656    change_miner_mode(mode, &ctx).await
657}
658
659/// Tries changing miner mode, returns `Ok(true)` if changed, and `Ok(false)` if no changing was necessary.
660///
661/// This function also enables the miner after changing it.
662async fn change_miner_mode(new_mode: MinerMode, ctx: &RpcContext) -> Result<JsonValue, StratusError> {
663    if GlobalState::is_transactions_enabled() {
664        tracing::error!("cannot change miner mode while transactions are enabled");
665        return Err(StateError::TransactionsEnabled.into());
666    }
667
668    let previous_mode = ctx.server.miner.mode();
669
670    if previous_mode == new_mode {
671        tracing::warn!(?new_mode, current = ?new_mode, "miner mode already set, skipping");
672        return Ok(json!(false));
673    }
674
675    if not(ctx.server.miner.is_paused()) && previous_mode.is_interval() {
676        return log_and_err!("can't change miner mode from Interval without pausing it first").map_err(Into::into);
677    }
678
679    match new_mode {
680        MinerMode::External => {
681            tracing::info!("changing miner mode to External");
682
683            let pending_txs = ctx.server.storage.pending_transactions();
684            if not(pending_txs.is_empty()) {
685                tracing::error!(pending_txs = ?pending_txs.len(), "cannot change miner mode to External with pending transactions");
686                return Err(StorageError::PendingTransactionsExist {
687                    pending_txs: pending_txs.len(),
688                }
689                .into());
690            }
691
692            ctx.server.miner.switch_to_external_mode().await;
693        }
694        MinerMode::Interval(duration) => {
695            tracing::info!(duration = ?duration, "changing miner mode to Interval");
696
697            if ctx.server.read_importer().is_some() {
698                tracing::error!("cannot change miner mode to Interval with consensus set");
699                return Err(ConsensusError::Set.into());
700            }
701
702            ctx.server.miner.start_interval_mining(duration).await;
703        }
704        MinerMode::Automine => {
705            return log_and_err!("Miner mode change to 'automine' is unsupported.").map_err(Into::into);
706        }
707    }
708
709    Ok(json!(true))
710}
711
712fn stratus_enable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
713    ext.authentication().auth_admin()?;
714    GlobalState::set_unknown_client_enabled(true);
715    Ok(GlobalState::is_unknown_client_enabled())
716}
717
718fn stratus_disable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
719    ext.authentication().auth_admin()?;
720    GlobalState::set_unknown_client_enabled(false);
721    Ok(GlobalState::is_unknown_client_enabled())
722}
723
724fn stratus_enable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
725    ext.authentication().auth_admin()?;
726    GlobalState::set_transactions_enabled(true);
727    Ok(GlobalState::is_transactions_enabled())
728}
729
730fn stratus_disable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
731    ext.authentication().auth_admin()?;
732    GlobalState::set_transactions_enabled(false);
733    Ok(GlobalState::is_transactions_enabled())
734}
735
736fn stratus_enable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
737    ext.authentication().auth_admin()?;
738    ctx.server.miner.unpause();
739    Ok(true)
740}
741
742fn stratus_disable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
743    ext.authentication().auth_admin()?;
744    ctx.server.miner.pause();
745    Ok(false)
746}
747
748/// Returns the count of executed transactions waiting to enter the next block.
749fn stratus_pending_transactions_count(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> usize {
750    ctx.server.storage.pending_transactions().len()
751}
752
753fn stratus_disable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
754    ext.authentication().auth_admin()?;
755    GlobalState::set_restart_on_unhealthy(false);
756    Ok(GlobalState::restart_on_unhealthy())
757}
758
759fn stratus_enable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
760    ext.authentication().auth_admin()?;
761    GlobalState::set_restart_on_unhealthy(true);
762    Ok(GlobalState::restart_on_unhealthy())
763}
764
765/// Emit the kafka events for the transactions in a block on-demand.
766async fn stratus_emit_block_events(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<(), StratusError> {
767    ext.authentication().auth_admin()?;
768    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
769    let (_, kafka_config_override) = next_rpc_param_or_default::<Option<KafkaConfig>>(params)?;
770
771    let Some(block) = ctx.server.storage.read_block(filter)? else {
772        return Err(StorageError::BlockNotFound { filter }.into());
773    };
774
775    let kafka_config = kafka_config_override.as_ref().or(ctx.server.app_config.kafka_config.as_ref());
776    let Some(kafka_connector) = kafka_config.map(|inner| inner.init()).transpose()? else {
777        return Err(StateError::Misconfigured {
778            details: "kafka configuration is not set",
779        }
780        .into());
781    };
782
783    send_block_to_kafka(&Some(kafka_connector), &block).await?;
784
785    Ok(())
786}
787
788// -----------------------------------------------------------------------------
789// Stratus - State
790// -----------------------------------------------------------------------------
791
792fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
793    Ok(build_info::as_json())
794}
795
796fn stratus_config(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<StratusConfig, StratusError> {
797    ext.authentication().auth_admin()?;
798    Ok(ctx.server.app_config.clone())
799}
800
801fn stratus_state(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
802    Ok(GlobalState::get_global_state_as_json(ctx))
803}
804
805async fn stratus_get_subscriptions(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
806    // NOTE: this is a workaround for holding only one lock at a time
807    let pending_txs = serde_json::to_value(ctx.subs.pending_txs.read().await.values().collect_vec()).expect_infallible();
808    let new_heads = serde_json::to_value(ctx.subs.new_heads.read().await.values().collect_vec()).expect_infallible();
809    let logs = serde_json::to_value(ctx.subs.logs.read().await.values().flat_map(HashMap::values).collect_vec()).expect_infallible();
810
811    let response = json!({
812        "newPendingTransactions": pending_txs,
813        "newHeads": new_heads,
814        "logs": logs,
815    });
816    Ok(response)
817}
818
819// -----------------------------------------------------------------------------
820// Blockchain
821// -----------------------------------------------------------------------------
822
823async fn net_listening(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
824    let net_listening = stratus_health(params, ctx, ext).await;
825
826    tracing::info!(net_listening = ?net_listening, "network listening status");
827
828    net_listening
829}
830
831fn net_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
832    ctx.server.chain_id.to_string()
833}
834
835fn eth_chain_id(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
836    hex_num(ctx.server.chain_id)
837}
838
839fn web3_client_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
840    ctx.client_version.to_owned()
841}
842
843// -----------------------------------------------------------------------------
844// Gas
845// -----------------------------------------------------------------------------
846
847fn eth_gas_price(_: Params<'_>, _: &RpcContext, _: &Extensions) -> String {
848    hex_zero()
849}
850
851// -----------------------------------------------------------------------------
852// Block
853// -----------------------------------------------------------------------------
854
855fn eth_block_number(_params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
856    // enter span
857    let _middleware_enter = ext.enter_middleware_span();
858    let _method_enter = info_span!("rpc::eth_blockNumber", block_number = field::Empty).entered();
859
860    // execute
861    let block_number = ctx.server.storage.read_mined_block_number();
862    Span::with(|s| s.rec_str("block_number", &block_number));
863
864    Ok(to_json_value(block_number))
865}
866
867fn stratus_get_block_and_receipts(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
868    // enter span
869    let _middleware_enter = ext.enter_middleware_span();
870    let _method_enter = info_span!("rpc::stratus_getBlockAndReceipts").entered();
871
872    // parse params
873    let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
874
875    // track
876    tracing::info!(%filter, "reading block and receipts");
877
878    let Some(block) = ctx.server.storage.read_block(filter)? else {
879        tracing::info!(%filter, "block not found");
880        return Ok(JsonValue::Null);
881    };
882
883    tracing::info!(%filter, "block with transactions found");
884    let receipts = block.transactions.iter().cloned().map(AlloyReceipt::from).collect::<Vec<_>>();
885
886    Ok(json!({
887        "block": block.to_json_rpc_with_full_transactions(),
888        "receipts": receipts,
889    }))
890}
891
892fn stratus_get_block_with_changes(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
893    // enter span
894    let _middleware_enter = ext.enter_middleware_span();
895    let _method_enter = info_span!("rpc::stratus_getBlockWithChanges").entered();
896
897    // parse params
898    let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
899
900    // track
901    tracing::info!(%filter, "reading block and changes");
902
903    let Some(block) = ctx.server.storage.read_block_with_changes(filter)? else {
904        tracing::info!(%filter, "block not found");
905        return Ok(JsonValue::Null);
906    };
907
908    tracing::info!(%filter, "block with changes found");
909
910    Ok(json!(block))
911}
912
913fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
914    let _middleware_enter = ext.enter_middleware_span();
915
916    let _method_enter = info_span!(
917        "rpc::eth_getBlockByHash",
918        filter = field::Empty,
919        block_number = field::Empty,
920        found = field::Empty
921    )
922    .entered();
923
924    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
925    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
926
927    match &filter {
928        BlockFilter::Hash(_) => (),
929        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
930    }
931
932    eth_get_block_by_selector(filter, full_transactions, ctx)
933}
934
935fn eth_get_block_by_number(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
936    let _middleware_enter = ext.enter_middleware_span();
937
938    let _method_enter = info_span!(
939        "rpc::eth_getBlockByNumber",
940        filter = field::Empty,
941        block_number = field::Empty,
942        found = field::Empty
943    )
944    .entered();
945
946    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
947    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
948
949    match &filter {
950        BlockFilter::Earliest | BlockFilter::Latest | BlockFilter::Number(_) | BlockFilter::Pending => (),
951        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
952    }
953
954    eth_get_block_by_selector(filter, full_transactions, ctx)
955}
956
957#[inline(always)]
958fn eth_get_block_by_selector(filter: BlockFilter, full_transactions: bool, ctx: Arc<RpcContext>) -> Result<JsonValue, StratusError> {
959    // track
960    Span::with(|s| s.rec_str("filter", &filter));
961    tracing::info!(%filter, %full_transactions, "reading block");
962
963    // execute
964    let block = ctx.server.storage.read_block(filter)?;
965    Span::with(|s| {
966        s.record("found", block.is_some());
967        if let Some(ref block) = block {
968            s.rec_str("block_number", &block.number());
969        }
970    });
971    match (block, full_transactions) {
972        (Some(block), true) => {
973            tracing::info!(%filter, "block with full transactions found");
974            Ok(block.to_json_rpc_with_full_transactions())
975        }
976        (Some(block), false) => {
977            tracing::info!(%filter, "block with only hashes found");
978            Ok(block.to_json_rpc_with_transactions_hashes())
979        }
980        (None, _) => {
981            tracing::info!(%filter, "block not found");
982            Ok(JsonValue::Null)
983        }
984    }
985}
986
987fn eth_get_uncle_by_block_hash_and_index(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
988    Ok(JsonValue::Null)
989}
990
991fn stratus_get_block_by_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
992    let _middleware_enter = ext.enter_middleware_span();
993    let _method_enter = info_span!(
994        "rpc::stratus_getBlockByTimestamp",
995        timestamp = field::Empty,
996        block_number = field::Empty,
997        found = field::Empty
998    )
999    .entered();
1000
1001    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
1002    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
1003
1004    match &filter {
1005        BlockFilter::Timestamp { .. } => (),
1006        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
1007    }
1008
1009    eth_get_block_by_selector(filter, full_transactions, ctx)
1010}
1011
1012// -----------------------------------------------------------------------------
1013// Transaction
1014// -----------------------------------------------------------------------------
1015
1016fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1017    // enter span
1018    let _middleware_enter = ext.enter_middleware_span();
1019    let _method_enter = info_span!("rpc::eth_getTransactionByHash", tx_hash = field::Empty, found = field::Empty).entered();
1020
1021    // parse params
1022    let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1023
1024    // track
1025    Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1026    tracing::info!(%tx_hash, "reading transaction");
1027
1028    // execute
1029    let tx = ctx.server.storage.read_transaction(tx_hash)?;
1030    Span::with(|s| {
1031        s.record("found", tx.is_some());
1032    });
1033
1034    match tx {
1035        Some(tx) => {
1036            tracing::info!(%tx_hash, "transaction found");
1037            Ok(tx.to_json_rpc_transaction())
1038        }
1039        None => {
1040            tracing::info!(%tx_hash, "transaction not found");
1041            Ok(JsonValue::Null)
1042        }
1043    }
1044}
1045
1046fn rpc_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<Option<TransactionStage>, StratusError> {
1047    // parse params
1048    let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1049
1050    // track
1051    Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1052    tracing::info!("reading transaction receipt");
1053
1054    // execute
1055    let tx = ctx.server.storage.read_transaction(tx_hash)?;
1056    Span::with(|s| {
1057        s.record("found", tx.is_some());
1058    });
1059
1060    Ok(tx)
1061}
1062
1063fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1064    // enter span
1065    let _middleware_enter = ext.enter_middleware_span();
1066    let _method_enter = info_span!("rpc::eth_getTransactionReceipt", tx_hash = field::Empty, found = field::Empty).entered();
1067
1068    match rpc_get_transaction_receipt(params, ctx)? {
1069        Some(tx) => {
1070            tracing::info!("transaction receipt found");
1071            Ok(tx.to_json_rpc_receipt())
1072        }
1073        None => {
1074            tracing::info!("transaction receipt not found");
1075            Ok(JsonValue::Null)
1076        }
1077    }
1078}
1079
1080fn stratus_get_transaction_result(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1081    // enter span
1082    let _middleware_enter = ext.enter_middleware_span();
1083    let _method_enter = info_span!("rpc::stratus_get_transaction_result", tx_hash = field::Empty, found = field::Empty).entered();
1084
1085    match rpc_get_transaction_receipt(params, ctx)? {
1086        Some(tx) => {
1087            tracing::info!("transaction receipt found");
1088            Ok(to_json_value(tx.to_result().execution.result))
1089        }
1090        None => {
1091            tracing::info!("transaction receipt not found");
1092            Ok(JsonValue::Null)
1093        }
1094    }
1095}
1096
1097fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1098    // enter span
1099    let _middleware_enter = ext.enter_middleware_span();
1100    let _method_enter = info_span!("rpc::eth_estimateGas", tx_from = field::Empty, tx_to = field::Empty).entered();
1101
1102    // parse params
1103    let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;
1104
1105    // track
1106    Span::with(|s| {
1107        s.rec_opt("tx_from", &call.from);
1108        s.rec_opt("tx_to", &call.to);
1109    });
1110    tracing::info!("executing eth_estimateGas");
1111
1112    // execute
1113    match ctx.server.executor.execute_local_call(call, PointInTime::Mined) {
1114        // result is success
1115        Ok(result) if result.is_success() => {
1116            tracing::info!(tx_output = %result.output, "executed eth_estimateGas with success");
1117            let overestimated_gas = (result.gas_used.as_u64()) as f64 * 1.1;
1118            Ok(hex_num(U256::from(overestimated_gas as u64)))
1119        }
1120
1121        // result is failure
1122        Ok(result) => {
1123            tracing::warn!(tx_output = %result.output, "executed eth_estimateGas with failure");
1124            Err(TransactionError::RevertedCall { output: result.output }.into())
1125        }
1126
1127        // internal error
1128        Err(e) => {
1129            tracing::warn!(reason = ?e, "failed to execute eth_estimateGas");
1130            Err(e)
1131        }
1132    }
1133}
1134
1135fn rpc_call(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<EvmExecution, StratusError> {
1136    // parse params
1137    let (params, call) = next_rpc_param::<CallInput>(params.sequence())?;
1138    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1139
1140    // track
1141    Span::with(|s| {
1142        s.rec_opt("tx_from", &call.from);
1143        s.rec_opt("tx_to", &call.to);
1144        s.rec_str("filter", &filter);
1145    });
1146    tracing::info!(%filter, "executing eth_call");
1147
1148    // execute
1149    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1150    ctx.server.executor.execute_local_call(call, point_in_time)
1151}
1152
1153fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1154    // enter span
1155    let _middleware_enter = ext.enter_middleware_span();
1156    let _method_enter = info_span!("rpc::eth_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1157
1158    match rpc_call(params, ctx) {
1159        // result is success
1160        Ok(result) if result.is_success() => {
1161            tracing::info!(tx_output = %result.output, "executed eth_call with success");
1162            Ok(hex_data(result.output))
1163        }
1164        // result is failure
1165        Ok(result) => {
1166            tracing::warn!(tx_output = %result.output, "executed eth_call with failure");
1167            Err(TransactionError::RevertedCall { output: result.output }.into())
1168        }
1169        // internal error
1170        Err(e) => {
1171            tracing::warn!(reason = ?e, "failed to execute eth_call");
1172            Err(e)
1173        }
1174    }
1175}
1176
1177fn debug_trace_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1178    // enter span
1179    let _middleware_enter = ext.enter_middleware_span();
1180    let _method_enter = info_span!("rpc::debug_traceTransaction", tx_hash = field::Empty,).entered();
1181
1182    let (params, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1183    let (_, opts) = next_rpc_param_or_default::<Option<GethDebugTracingOptions>>(params)?;
1184    let trace_unsuccessful_only = ctx
1185        .server
1186        .rpc_config
1187        .rpc_debug_trace_unsuccessful_only
1188        .as_ref()
1189        .is_some_and(|inner| inner.contains(ext.rpc_client()));
1190
1191    match ctx.server.executor.trace_transaction(tx_hash, opts, trace_unsuccessful_only) {
1192        Ok(result) => {
1193            tracing::info!(?tx_hash, "executed debug_traceTransaction successfully");
1194
1195            // Enhance GethTrace with decoded information using serialization approach
1196            let enhanced_response = enhance_trace_with_decoded_info(&result);
1197
1198            Ok(enhanced_response)
1199        }
1200        Err(err) => {
1201            tracing::warn!(?err, "error executing debug_traceTransaction");
1202            Err(err)
1203        }
1204    }
1205}
1206
1207fn stratus_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1208    // enter span
1209    let _middleware_enter = ext.enter_middleware_span();
1210    let _method_enter = info_span!("rpc::stratus_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1211
1212    match rpc_call(params, ctx) {
1213        // result is success
1214        Ok(result) if result.is_success() => {
1215            tracing::info!(tx_output = %result.output, "executed stratus_call with success");
1216            Ok(hex_data(result.output))
1217        }
1218        // result is failure
1219        Ok(result) => {
1220            tracing::warn!(tx_output = %result.output, "executed stratus_call with failure");
1221            Err(TransactionError::RevertedCallWithReason {
1222                reason: (&result.output).into(),
1223            }
1224            .into())
1225        }
1226        // internal error
1227        Err(e) => {
1228            tracing::warn!(reason = ?e, "failed to execute stratus_call");
1229            Err(e)
1230        }
1231    }
1232}
1233
1234fn eth_send_raw_transaction(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1235    // enter span
1236    let _middleware_enter = ext.enter_middleware_span();
1237    let _method_enter = info_span!(
1238        "rpc::eth_sendRawTransaction",
1239        tx_hash = field::Empty,
1240        tx_from = field::Empty,
1241        tx_to = field::Empty,
1242        tx_nonce = field::Empty
1243    )
1244    .entered();
1245
1246    // get the pre-decoded transaction from extensions
1247    let (tx, tx_data) = match (ext.get::<TransactionInput>(), ext.get::<Bytes>()) {
1248        (Some(tx), Some(data)) => (tx.clone(), data.clone()),
1249        _ => {
1250            tracing::error!("failed to execute eth_sendRawTransaction because transaction input is not available");
1251            return Err(RpcError::TransactionInvalid {
1252                decode_error: "transaction input is not available".to_string(),
1253            }
1254            .into());
1255        }
1256    };
1257    let tx_hash = tx.transaction_info.hash;
1258
1259    // track
1260    Span::with(|s| {
1261        s.rec_str("tx_hash", &tx_hash);
1262        s.rec_str("tx_from", &tx.execution_info.signer);
1263        s.rec_opt("tx_to", &tx.execution_info.to);
1264        s.rec_str("tx_nonce", &tx.execution_info.nonce);
1265    });
1266
1267    if not(GlobalState::is_transactions_enabled()) {
1268        tracing::warn!(%tx_hash, "failed to execute eth_sendRawTransaction because transactions are disabled");
1269        return Err(StateError::TransactionsDisabled.into());
1270    }
1271
1272    // HOTFIX: this is a temporary stopgap measure to prevent type 4 transactions which currently cause the followers to crash
1273    if tx.transaction_info.tx_type.is_some_and(|t| t > 3) {
1274        tracing::warn!(%tx_hash, "rejecting unsuported transaction type");
1275        return Err(RpcError::ParameterInvalid.into());
1276    }
1277
1278    // execute locally or forward to leader
1279    match GlobalState::get_node_mode() {
1280        NodeMode::Leader | NodeMode::FakeLeader => match ctx.server.executor.execute_local_transaction(tx) {
1281            Ok(_) => Ok(hex_data(tx_hash)),
1282            Err(e) => {
1283                tracing::warn!(reason = ?e, ?tx_hash, "failed to execute eth_sendRawTransaction");
1284                Err(e)
1285            }
1286        },
1287        NodeMode::Follower => match &ctx.server.read_importer() {
1288            Some(importer) => match Handle::current().block_on(importer.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) {
1289                Ok(hash) => Ok(hex_data(hash)),
1290                Err(e) => Err(e),
1291            },
1292            None => {
1293                tracing::error!("unable to forward transaction because consensus is temporarily unavailable for follower node");
1294                Err(ConsensusError::Unavailable.into())
1295            }
1296        },
1297    }
1298}
1299
1300// -----------------------------------------------------------------------------
1301// Logs
1302// -----------------------------------------------------------------------------
1303
1304fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1305    const MAX_BLOCK_RANGE: u64 = 5_000;
1306
1307    // enter span
1308    let _middleware_enter = ext.enter_middleware_span();
1309    let _method_enter = info_span!(
1310        "rpc::eth_getLogs",
1311        filter = field::Empty,
1312        filter_from = field::Empty,
1313        filter_to = field::Empty,
1314        filter_range = field::Empty,
1315        filter_event = field::Empty
1316    )
1317    .entered();
1318
1319    // parse params
1320    let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params.sequence())?;
1321    let mut filter = filter_input.parse(&ctx.server.storage)?;
1322
1323    // for this operation, the filter always need the end block specified to calculate the difference
1324    let to_block = match filter.to_block {
1325        Some(block) => block,
1326        None => {
1327            let block = ctx.server.storage.read_mined_block_number();
1328            filter.to_block = Some(block);
1329            block
1330        }
1331    };
1332    let blocks_in_range = filter.from_block.count_to(to_block);
1333
1334    let event_name = codegen::event_names_from_filter(&filter);
1335
1336    // track
1337    Span::with(|s| {
1338        s.rec_str("filter", &to_json_string(&filter));
1339        s.rec_str("filter_from", &filter.from_block);
1340        s.rec_str("filter_to", &to_block);
1341        s.rec_str("filter_range", &blocks_in_range);
1342        s.rec_str("filter_event", &event_name);
1343    });
1344    tracing::info!(?filter, filter_event = %event_name, "reading logs");
1345
1346    // check range
1347    if blocks_in_range > MAX_BLOCK_RANGE {
1348        return Err(RpcError::BlockRangeInvalid {
1349            actual: blocks_in_range,
1350            max: MAX_BLOCK_RANGE,
1351        }
1352        .into());
1353    }
1354
1355    // execute
1356    let logs = ctx.server.storage.read_logs(&filter)?;
1357    Ok(JsonValue::Array(logs.into_iter().map(|x| x.to_json_rpc_log()).collect()))
1358}
1359
1360// -----------------------------------------------------------------------------
1361// Account
1362// -----------------------------------------------------------------------------
1363
1364fn eth_accounts(_: Params<'_>, _ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
1365    Ok(json!([]))
1366}
1367
1368fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1369    // enter span
1370    let _middleware_enter = ext.enter_middleware_span();
1371    let _method_enter = info_span!("rpc::eth_getTransactionCount", address = field::Empty, filter = field::Empty).entered();
1372
1373    // pare params
1374    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1375    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1376
1377    // track
1378    Span::with(|s| {
1379        s.rec_str("address", &address);
1380        s.rec_str("address", &filter);
1381    });
1382    tracing::info!(%address, %filter, "reading account nonce");
1383
1384    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1385    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1386    Ok(hex_num(account.nonce))
1387}
1388
1389fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1390    // enter span
1391    let _middleware_enter = ext.enter_middleware_span();
1392    let _method_enter = info_span!("rpc::eth_getBalance", address = field::Empty, filter = field::Empty).entered();
1393
1394    // parse params
1395    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1396    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1397
1398    // track
1399    Span::with(|s| {
1400        s.rec_str("address", &address);
1401        s.rec_str("filter", &filter);
1402    });
1403    tracing::info!(%address, %filter, "reading account native balance");
1404
1405    // execute
1406    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1407    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1408    Ok(hex_num(account.balance))
1409}
1410
1411fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1412    // enter span
1413    let _middleware_enter = ext.enter_middleware_span();
1414    let _method_enter = info_span!("rpc::eth_getCode", address = field::Empty, filter = field::Empty).entered();
1415
1416    // parse params
1417    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1418    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1419
1420    // track
1421    Span::with(|s| {
1422        s.rec_str("address", &address);
1423        s.rec_str("filter", &filter);
1424    });
1425
1426    // execute
1427    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1428    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1429
1430    Ok(account.bytecode.map(|bytecode| hex_data(bytecode.original_bytes())).unwrap_or_else(hex_null))
1431}
1432
1433// -----------------------------------------------------------------------------
1434// Subscriptions
1435// -----------------------------------------------------------------------------
1436
1437async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc<RpcContext>, ext: Extensions) -> impl IntoSubscriptionCloseResponse {
1438    // `middleware_enter` created to be used as a parent by `method_span`
1439    let middleware_enter = ext.enter_middleware_span();
1440    let method_span = info_span!("rpc::eth_subscribe", subscription = field::Empty, subscription_event = field::Empty);
1441    drop(middleware_enter);
1442
1443    async move {
1444        // parse params
1445        let client = ext.rpc_client();
1446        let (params, event) = match next_rpc_param::<String>(params.sequence()) {
1447            Ok((params, event)) => (params, event),
1448            Err(e) => {
1449                pending.reject(StratusError::from(e)).await;
1450                return Ok(());
1451            }
1452        };
1453
1454        // check subscription limits
1455        if let Err(e) = ctx.subs.check_client_subscriptions(ctx.server.rpc_config.rpc_max_subscriptions, client).await {
1456            pending.reject(e).await;
1457            return Ok(());
1458        }
1459
1460        // track
1461        Span::with(|s| s.rec_str("subscription", &event));
1462        tracing::info!(%event, "subscribing to rpc event");
1463
1464        // execute
1465        match event.deref() {
1466            "newPendingTransactions" => {
1467                ctx.subs.add_new_pending_txs_subscription(client, pending.accept().await?).await;
1468            }
1469
1470            "newHeads" => {
1471                ctx.subs.add_new_heads_subscription(client, pending.accept().await?).await;
1472            }
1473
1474            "logs" => {
1475                let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params)?;
1476                let filter = filter_input.parse(&ctx.server.storage)?;
1477
1478                let event_name = codegen::event_names_from_filter(&filter).to_string();
1479                Span::with(|s| s.rec_str("subscription_event", &event_name));
1480                tracing::info!(subscription_event = %event_name, "subscribing to logs with event filter");
1481
1482                ctx.subs.add_logs_subscription(client, filter, pending.accept().await?).await;
1483            }
1484
1485            // unsupported
1486            event => {
1487                pending
1488                    .reject(StratusError::from(RpcError::SubscriptionInvalid { event: event.to_string() }))
1489                    .await;
1490            }
1491        }
1492
1493        Ok(())
1494    }
1495    .instrument(method_span)
1496    .await
1497}
1498
1499// -----------------------------------------------------------------------------
1500// Storage
1501// -----------------------------------------------------------------------------
1502
1503fn eth_get_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1504    // enter span
1505    let _middleware_enter = ext.enter_middleware_span();
1506    let _method_enter = info_span!("rpc::eth_getStorageAt", address = field::Empty, index = field::Empty).entered();
1507
1508    // parse params
1509    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1510    let (params, index) = next_rpc_param::<SlotIndex>(params)?;
1511    let (_, block_filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1512
1513    Span::with(|s| {
1514        s.rec_str("address", &address);
1515        s.rec_str("index", &index);
1516    });
1517
1518    // execute
1519    let point_in_time = ctx.server.storage.translate_to_point_in_time(block_filter)?;
1520    let slot = ctx.server.storage.read_slot(address, index, point_in_time, ReadKind::RPC)?;
1521
1522    // It must be padded, even if it is zero.
1523    Ok(hex_num_zero_padded(slot.value.as_u256()))
1524}
1525
1526// -----------------------------------------------------------------------------
1527// Response helpers
1528// -----------------------------------------------------------------------------
1529
1530#[inline(always)]
1531fn hex_data<T: AsRef<[u8]>>(value: T) -> String {
1532    const_hex::encode_prefixed(value)
1533}
1534
1535#[inline(always)]
1536fn hex_num(value: impl Into<U256>) -> String {
1537    format!("{:#x}", value.into())
1538}
1539
1540#[inline(always)]
1541fn hex_num_zero_padded(value: impl Into<U256>) -> String {
1542    let width = 64 + 2; //the prefix is included in the total width
1543    format!("{:#0width$x}", value.into(), width = width)
1544}
1545
1546fn hex_zero() -> String {
1547    "0x0".to_owned()
1548}
1549
1550fn hex_null() -> String {
1551    "0x".to_owned()
1552}
1553
1554/// Enhances trace using serialized JSON modification
1555fn enhance_trace_with_decoded_info(trace: &GethTrace) -> JsonValue {
1556    match trace {
1557        GethTrace::CallTracer(call_frame) => {
1558            // Serialize first, then enhance
1559            let mut json = to_json_value(call_frame);
1560            enhance_serialized_call_frame(&mut json);
1561            json
1562        }
1563        _ => {
1564            // For non-CallTracer traces, return as-is without enhancement
1565            to_json_value(trace)
1566        }
1567    }
1568}
1569
1570fn enhance_serialized_call_frame(json: &mut JsonValue) {
1571    if let Some(json_obj) = json.as_object_mut() {
1572        json_obj
1573            .get("from")
1574            .and_then(|v| v.as_str())
1575            .and_then(|s| s.parse::<Address>().ok())
1576            .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1577            .and_then(|s| json_obj.insert("decodedFromContract".to_string(), json!(s)));
1578
1579        json_obj
1580            .get("to")
1581            .and_then(|v| if v.is_null() { None } else { v.as_str() })
1582            .and_then(|s| s.parse::<Address>().ok())
1583            .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1584            .and_then(|s| json_obj.insert("decodedToContract".to_string(), json!(s)));
1585
1586        json_obj
1587            .get("input")
1588            .and_then(|v| v.as_str())
1589            .and_then(|s| const_hex::decode(s.trim_start_matches("0x")).ok())
1590            .inspect(|input| {
1591                if let Some(signature) = codegen::function_sig_opt(input) {
1592                    json_obj.insert("decodedFunctionSignature".to_string(), json!(signature));
1593                }
1594
1595                match decode::decode_input_arguments(input) {
1596                    Ok(args) => {
1597                        json_obj.insert("decodedFunctionArguments".to_string(), json!(args));
1598                    }
1599                    Err(DecodeInputError::InvalidAbi { message }) => {
1600                        tracing::warn!(
1601                            message = %message,
1602                            "Invalid ABI stored"
1603                        );
1604                    }
1605                    _ => (),
1606                }
1607            });
1608        if let Some(calls) = json_obj.get_mut("calls").and_then(|v| v.as_array_mut()) {
1609            for call in calls {
1610                enhance_serialized_call_frame(call);
1611            }
1612        }
1613    }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618    use alloy_primitives::Address;
1619    use alloy_primitives::Bytes;
1620    use alloy_primitives::U256;
1621    use alloy_rpc_types_trace::geth::CallFrame;
1622
1623    use super::*;
1624
1625    fn create_simple_test_call_structure() -> CallFrame {
1626        // Create deepest level calls (level 3)
1627        let deep_call_1 = CallFrame {
1628            from: "0x562689c910361ae21d12eadafbfca727b3bcbc24".parse::<Address>().unwrap(), // Maps to Compound_Agent_4
1629            to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), // Maps to BRLCToken
1630            input: Bytes::from(
1631                const_hex::decode(
1632                    "70a08231000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e", // balanceOf function
1633                )
1634                .unwrap(),
1635            ),
1636            output: Some(Bytes::from(
1637                const_hex::decode("0000000000000000000000000000000000000000000000000de0b6b3a7640000").unwrap(),
1638            )),
1639            gas: U256::from(5000),
1640            gas_used: U256::from(3000),
1641            value: None,
1642            typ: "STATICCALL".to_string(),
1643            error: None,
1644            revert_reason: None,
1645            calls: Vec::new(),
1646            logs: Vec::new(),
1647        };
1648
1649        let deep_call_2 = CallFrame {
1650            from: "0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap(), // Maps to Cashier_BRLC_v2
1651            to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), // Maps to USJIMToken
1652            input: Bytes::from(
1653                const_hex::decode(
1654                    "dd62ed3e000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e1", // allowance function
1655                )
1656                .unwrap(),
1657            ),
1658            output: Some(Bytes::from(
1659                const_hex::decode("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(),
1660            )),
1661            gas: U256::from(8000),
1662            gas_used: U256::from(5000),
1663            value: None,
1664            typ: "STATICCALL".to_string(),
1665            error: None,
1666            revert_reason: None,
1667            calls: Vec::new(),
1668            logs: Vec::new(),
1669        };
1670
1671        // Create level 2 nested calls using real contract addresses from CONTRACTS map
1672        let nested_call_1 = CallFrame {
1673            from: "0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap(), // Maps to BRLCToken
1674            to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), // Maps to USJIMToken
1675            input: Bytes::from(
1676                const_hex::decode(
1677                    "a9059cbb000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", // transfer function
1678                )
1679                .unwrap(),
1680            ),
1681            output: Some(Bytes::from(
1682                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1683            )),
1684            gas: U256::from(21000),
1685            gas_used: U256::from(20000),
1686            value: None,
1687            typ: "CALL".to_string(),
1688            error: None,
1689            revert_reason: None,
1690            calls: vec![deep_call_1],
1691            logs: Vec::new(),
1692        };
1693
1694        let nested_call_2 = CallFrame {
1695            from: "0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap(), // Maps to USJIMToken
1696            to: Some("0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap()), // Maps to Cashier_BRLC_v2
1697            input: Bytes::from(
1698                const_hex::decode(
1699                    "095ea7b3000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", // approve function
1700                )
1701                .unwrap(),
1702            ),
1703            output: Some(Bytes::from(
1704                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1705            )),
1706            gas: U256::from(30000),
1707            gas_used: U256::from(25000),
1708            value: None,
1709            typ: "CALL".to_string(),
1710            error: None,
1711            revert_reason: None,
1712            calls: vec![deep_call_2],
1713            logs: Vec::new(),
1714        };
1715
1716        // Create main call containing nested calls (level 1)
1717        CallFrame {
1718            from: "0x742d35Cc6634C0532925a3b8D7C9be8813eeb02e".parse::<Address>().unwrap(),
1719            to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), // BRLCToken
1720            input: Bytes::from(
1721                const_hex::decode(
1722                    "23b872dd000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e10000000000000000000000000000000000000000000000000de0b6b3a7640000", // transferFrom function
1723                )
1724                .unwrap(),
1725            ),
1726            output: Some(Bytes::from(
1727                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1728            )),
1729            gas: U256::from(100000),
1730            gas_used: U256::from(85000),
1731            value: None,
1732            typ: "CALL".to_string(),
1733            error: None,
1734            revert_reason: None,
1735            calls: vec![nested_call_1, nested_call_2],
1736            logs: Vec::new(),
1737        }
1738    }
1739
1740    #[test]
1741    fn test_contract_name_decoding() {
1742        // Create a 3-level structure using real contract addresses
1743        let test_call = create_simple_test_call_structure();
1744        let geth_trace = GethTrace::CallTracer(test_call);
1745
1746        // Enhance the trace with decoded information
1747        let result = enhance_trace_with_decoded_info(&geth_trace);
1748        let result_str = serde_json::to_string_pretty(&result).unwrap();
1749
1750        assert!(result_str.contains("Cashier_BRLC_v2"));
1751        assert!(result_str.contains("BRLCToken"));
1752        assert!(result_str.contains("USJIMToken"));
1753        assert!(result_str.contains("Compound_Agent_4"));
1754
1755        // Verify function signature decoding for all the expected functions
1756        assert!(result_str.contains("transfer(address,uint256)"));
1757        assert!(result_str.contains("approve(address,uint256)"));
1758        assert!(result_str.contains("transferFrom(address,address,uint256)"));
1759        assert!(result_str.contains("balanceOf(address)"));
1760        assert!(result_str.contains("allowance(address,address)"));
1761    }
1762}