stratus/eth/rpc/
rpc_server.rs

1//! RPC server for HTTP and WS.
2
3use std::collections::HashMap;
4use std::ops::Deref;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::LazyLock;
8use std::time::Duration;
9
10use alloy_primitives::U256;
11use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
12use alloy_rpc_types_trace::geth::GethTrace;
13use anyhow::Result;
14use futures::join;
15use http::Method;
16use itertools::Itertools;
17use jsonrpsee::Extensions;
18use jsonrpsee::IntoSubscriptionCloseResponse;
19use jsonrpsee::PendingSubscriptionSink;
20use jsonrpsee::server::BatchRequestConfig;
21use jsonrpsee::server::RandomStringIdProvider;
22use jsonrpsee::server::RpcModule;
23use jsonrpsee::server::Server as RpcServer;
24use jsonrpsee::server::ServerConfig;
25use jsonrpsee::server::ServerHandle;
26use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
27use jsonrpsee::types::Params;
28use jsonrpsee::ws_client::RpcServiceBuilder;
29use parking_lot::RwLock;
30use serde_json::json;
31use tokio::runtime::Handle;
32use tokio::select;
33use tokio::sync::Semaphore;
34use tokio::sync::SemaphorePermit;
35use tower_http::cors::Any;
36use tower_http::cors::CorsLayer;
37use tracing::Instrument;
38use tracing::Span;
39use tracing::field;
40use tracing::info_span;
41
42use crate::GlobalState;
43use crate::NodeMode;
44use crate::alias::AlloyReceipt;
45use crate::alias::JsonValue;
46use crate::config::StratusConfig;
47use crate::eth::codegen;
48use crate::eth::codegen::CONTRACTS;
49use crate::eth::decode;
50use crate::eth::executor::Executor;
51use crate::eth::follower::consensus::Consensus;
52use crate::eth::follower::importer::ImporterConfig;
53use crate::eth::follower::importer::ImporterConsensus;
54use crate::eth::miner::Miner;
55use crate::eth::miner::MinerMode;
56use crate::eth::primitives::Address;
57use crate::eth::primitives::BlockFilter;
58use crate::eth::primitives::Bytes;
59use crate::eth::primitives::CallInput;
60use crate::eth::primitives::ChainId;
61use crate::eth::primitives::ConsensusError;
62use crate::eth::primitives::DecodeInputError;
63use crate::eth::primitives::EvmExecution;
64use crate::eth::primitives::Hash;
65use crate::eth::primitives::ImporterError;
66use crate::eth::primitives::LogFilterInput;
67#[cfg(feature = "dev")]
68use crate::eth::primitives::Nonce;
69use crate::eth::primitives::PointInTime;
70use crate::eth::primitives::RpcError;
71use crate::eth::primitives::SlotIndex;
72#[cfg(feature = "dev")]
73use crate::eth::primitives::SlotValue;
74use crate::eth::primitives::StateError;
75use crate::eth::primitives::StorageError;
76use crate::eth::primitives::StratusError;
77use crate::eth::primitives::TransactionError;
78use crate::eth::primitives::TransactionInput;
79use crate::eth::primitives::TransactionStage;
80#[cfg(feature = "dev")]
81use crate::eth::primitives::Wei;
82use crate::eth::rpc::RpcContext;
83use crate::eth::rpc::RpcHttpMiddleware;
84use crate::eth::rpc::RpcMiddleware;
85use crate::eth::rpc::RpcServerConfig;
86use crate::eth::rpc::RpcSubscriptions;
87use crate::eth::rpc::next_rpc_param;
88use crate::eth::rpc::next_rpc_param_or_default;
89use crate::eth::rpc::rpc_parser::RpcExtensionsExt;
90use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsHandles;
91use crate::eth::storage::ReadKind;
92use crate::eth::storage::StratusStorage;
93use crate::ext::InfallibleExt;
94use crate::ext::WatchReceiverExt;
95use crate::ext::not;
96use crate::ext::parse_duration;
97use crate::ext::to_json_string;
98use crate::ext::to_json_value;
99use crate::infra::build_info;
100use crate::infra::metrics;
101use crate::infra::tracing::SpanExt;
102use crate::log_and_err;
103// -----------------------------------------------------------------------------
104// Server
105// -----------------------------------------------------------------------------
106
107#[derive(Clone, derive_new::new)]
108pub struct Server {
109    // services
110    pub storage: Arc<StratusStorage>,
111    pub executor: Arc<Executor>,
112    pub miner: Arc<Miner>,
113    pub importer: Arc<RwLock<Option<Arc<ImporterConsensus>>>>,
114
115    // config
116    pub app_config: StratusConfig,
117    pub rpc_config: RpcServerConfig,
118    pub chain_id: ChainId,
119}
120
121impl Server {
122    pub async fn serve(self) -> Result<()> {
123        let this = Arc::new(self);
124        this.update_health().await;
125
126        const TASK_NAME: &str = "rpc-server";
127        let health_worker_handle = tokio::task::spawn({
128            let this = Arc::clone(&this);
129            async move {
130                this.health_worker().await;
131            }
132        });
133        this.update_health().await;
134        let mut health = GlobalState::get_health_receiver();
135        health.mark_unchanged();
136        let (server_handle, subscriptions) = loop {
137            let (server_handle, subscriptions) = this._serve().await?;
138            let server_handle_watch = server_handle.clone();
139
140            // await for cancellation or jsonrpsee to stop (should not happen) or health changes
141            select! {
142                // If the server stop unexpectedly, shutdown stratus and break the loop
143                _ = server_handle_watch.stopped() => {
144                    GlobalState::shutdown_from(TASK_NAME, "finished unexpectedly");
145                    break (server_handle, subscriptions);
146                },
147                // If a shutdown is requested, stop the server and break the loop
148                _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
149                    let _ = server_handle.stop();
150                    break (server_handle, subscriptions);
151                },
152                // If the health state changes to unhealthy, stop the server and subscriptions and recreate them (causing all connections to be dropped)
153                _ = health.wait_for_change(|healthy| GlobalState::restart_on_unhealthy() && !healthy) => {
154                    tracing::info!("health state changed to unhealthy, restarting the rpc server");
155                    let _ = server_handle.stop();
156                    subscriptions.abort();
157                    join!(server_handle.stopped(), subscriptions.stopped());
158                }
159            }
160        };
161        let res = join!(server_handle.stopped(), subscriptions.stopped(), health_worker_handle);
162        res.2?;
163        Ok(())
164    }
165
166    /// Starts JSON-RPC server.
167    async fn _serve(&self) -> anyhow::Result<(ServerHandle, RpcSubscriptionsHandles)> {
168        let this = self.clone();
169        const TASK_NAME: &str = "rpc-server";
170        tracing::info!(%this.rpc_config.rpc_address, %this.rpc_config.rpc_max_connections, "creating {}", TASK_NAME);
171
172        // configure subscriptions
173        let subs = RpcSubscriptions::spawn(
174            this.miner.notifier_pending_txs.subscribe(),
175            this.miner.notifier_blocks.subscribe(),
176            this.miner.notifier_logs.subscribe(),
177        );
178
179        // configure context
180        let ctx = RpcContext {
181            server: Arc::new(this.clone()),
182            client_version: "stratus",
183            subs: Arc::clone(&subs.connected),
184        };
185
186        // configure module
187        let mut module = RpcModule::<RpcContext>::new(ctx);
188        module = register_methods(module)?;
189
190        // configure middleware
191        let cors = CorsLayer::new().allow_methods([Method::POST]).allow_origin(Any).allow_headers(Any);
192        let rpc_middleware = RpcServiceBuilder::new().layer_fn(RpcMiddleware::new);
193        let http_middleware = tower::ServiceBuilder::new().layer(cors).layer_fn(RpcHttpMiddleware::new).layer(
194            ProxyGetRequestLayer::new([
195                ("/health", "stratus_health"),
196                ("/version", "stratus_version"),
197                ("/config", "stratus_config"),
198                ("/state", "stratus_state"),
199            ])
200            .unwrap(),
201        );
202
203        let server_config = ServerConfig::builder()
204            .set_id_provider(RandomStringIdProvider::new(8))
205            .max_connections(this.rpc_config.rpc_max_connections)
206            .max_response_body_size(this.rpc_config.rpc_max_response_size_bytes)
207            .set_batch_request_config(BatchRequestConfig::Limit(this.rpc_config.batch_request_limit))
208            .build();
209
210        // serve module
211        let server = RpcServer::builder()
212            .set_rpc_middleware(rpc_middleware)
213            .set_http_middleware(http_middleware)
214            .set_config(server_config)
215            .build(this.rpc_config.rpc_address)
216            .await?;
217
218        let handle_rpc_server = server.start(module);
219        Ok((handle_rpc_server, subs.handles))
220    }
221
222    pub async fn health_worker(&self) {
223        // Create an interval that ticks at the configured interval
224        let mut interval = tokio::time::interval(std::time::Duration::from_millis(self.rpc_config.health_check_interval_ms));
225
226        // Enter an infinite loop to periodically run the health check
227        loop {
228            // Call the health function and ignore its result
229            self.update_health().await;
230
231            // Wait for the next tick
232            interval.tick().await;
233            if GlobalState::is_shutdown() {
234                break;
235            }
236        }
237    }
238
239    async fn update_health(&self) {
240        let is_healthy = self.health().await;
241        GlobalState::set_health(is_healthy);
242        metrics::set_consensus_is_ready(if is_healthy { 1u64 } else { 0u64 });
243    }
244
245    fn read_importer(&self) -> Option<Arc<ImporterConsensus>> {
246        self.importer.read().as_ref().map(Arc::clone)
247    }
248
249    pub fn set_importer(&self, importer: Option<Arc<ImporterConsensus>>) {
250        *self.importer.write() = importer;
251    }
252
253    async fn health(&self) -> bool {
254        match GlobalState::get_node_mode() {
255            NodeMode::Leader | NodeMode::FakeLeader => true,
256            NodeMode::Follower =>
257                if GlobalState::is_importer_shutdown() {
258                    tracing::warn!("stratus is unhealthy because importer is shutdown");
259                    false
260                } else {
261                    match self.read_importer() {
262                        Some(importer) => importer.should_serve().await,
263                        None => {
264                            tracing::warn!("stratus is unhealthy because importer is not available");
265                            false
266                        }
267                    }
268                },
269        }
270    }
271}
272
273fn register_methods(mut module: RpcModule<RpcContext>) -> anyhow::Result<RpcModule<RpcContext>> {
274    // dev mode methods
275    #[cfg(feature = "dev")]
276    {
277        module.register_blocking_method("evm_setNextBlockTimestamp", evm_set_next_block_timestamp)?;
278        module.register_blocking_method("evm_mine", evm_mine)?;
279        module.register_blocking_method("hardhat_reset", stratus_reset)?;
280        module.register_blocking_method("stratus_reset", stratus_reset)?;
281        module.register_blocking_method("hardhat_setStorageAt", stratus_set_storage_at)?;
282        module.register_blocking_method("stratus_setStorageAt", stratus_set_storage_at)?;
283        module.register_blocking_method("hardhat_setNonce", stratus_set_nonce)?;
284        module.register_blocking_method("stratus_setNonce", stratus_set_nonce)?;
285        module.register_blocking_method("hardhat_setBalance", stratus_set_balance)?;
286        module.register_blocking_method("stratus_setBalance", stratus_set_balance)?;
287        module.register_blocking_method("hardhat_setCode", stratus_set_code)?;
288        module.register_blocking_method("stratus_setCode", stratus_set_code)?;
289        module.register_blocking_method("stratus_clearCache", stratus_clear_cache)?;
290    }
291
292    // stratus status
293    module.register_async_method("stratus_health", stratus_health)?;
294
295    // stratus admin
296    module.register_method("stratus_enableTransactions", stratus_enable_transactions)?;
297    module.register_method("stratus_disableTransactions", stratus_disable_transactions)?;
298    module.register_method("stratus_enableMiner", stratus_enable_miner)?;
299    module.register_method("stratus_disableMiner", stratus_disable_miner)?;
300    module.register_method("stratus_enableUnknownClients", stratus_enable_unknown_clients)?;
301    module.register_method("stratus_disableUnknownClients", stratus_disable_unknown_clients)?;
302    module.register_method("stratus_enableRestartOnUnhealthy", stratus_enable_restart_on_unhealthy)?;
303    module.register_method("stratus_disableRestartOnUnhealthy", stratus_disable_restart_on_unhealthy)?;
304    module.register_async_method("stratus_changeToLeader", stratus_change_to_leader)?;
305    module.register_async_method("stratus_changeToFollower", stratus_change_to_follower)?;
306    module.register_async_method("stratus_initImporter", stratus_init_importer)?;
307    module.register_method("stratus_shutdownImporter", stratus_shutdown_importer)?;
308    module.register_async_method("stratus_changeMinerMode", stratus_change_miner_mode)?;
309
310    // stratus state
311    module.register_method("stratus_version", stratus_version)?;
312    module.register_method("stratus_config", stratus_config)?;
313    module.register_method("stratus_state", stratus_state)?;
314
315    module.register_async_method("stratus_getSubscriptions", stratus_get_subscriptions)?;
316    module.register_method("stratus_pendingTransactionsCount", stratus_pending_transactions_count)?;
317
318    // blockchain
319    module.register_method("net_version", net_version)?;
320    module.register_async_method("net_listening", net_listening)?;
321    module.register_method("eth_chainId", eth_chain_id)?;
322    module.register_method("web3_clientVersion", web3_client_version)?;
323
324    // gas
325    module.register_method("eth_gasPrice", eth_gas_price)?;
326
327    // stratus importing helpers
328    module.register_blocking_method("stratus_getBlockAndReceipts", stratus_get_block_and_receipts)?;
329    module.register_blocking_method("stratus_getBlockWithChanges", stratus_get_block_with_changes)?;
330
331    // block
332    module.register_blocking_method("eth_blockNumber", eth_block_number)?;
333    module.register_blocking_method("eth_getBlockByNumber", eth_get_block_by_number)?;
334    module.register_blocking_method("eth_getBlockByHash", eth_get_block_by_hash)?;
335    module.register_blocking_method("stratus_getBlockByTimestamp", stratus_get_block_by_timestamp)?;
336    module.register_method("eth_getUncleByBlockHashAndIndex", eth_get_uncle_by_block_hash_and_index)?;
337
338    // transactions
339    module.register_blocking_method("eth_getTransactionByHash", eth_get_transaction_by_hash)?;
340    module.register_blocking_method("eth_getTransactionReceipt", eth_get_transaction_receipt)?;
341    module.register_blocking_method("eth_estimateGas", eth_estimate_gas)?;
342    module.register_blocking_method("eth_call", eth_call)?;
343    module.register_blocking_method("eth_sendRawTransaction", eth_send_raw_transaction)?;
344    module.register_blocking_method("stratus_call", stratus_call)?;
345    module.register_blocking_method("stratus_getTransactionResult", stratus_get_transaction_result)?;
346    module.register_blocking_method("debug_traceTransaction", debug_trace_transaction)?;
347
348    // logs
349    module.register_blocking_method("eth_getLogs", eth_get_logs)?;
350
351    // account
352    module.register_method("eth_accounts", eth_accounts)?;
353    module.register_blocking_method("eth_getTransactionCount", eth_get_transaction_count)?;
354    module.register_blocking_method("eth_getBalance", eth_get_balance)?;
355    module.register_blocking_method("eth_getCode", eth_get_code)?;
356
357    // storage
358    module.register_blocking_method("eth_getStorageAt", eth_get_storage_at)?;
359
360    // subscriptions
361    module.register_subscription("eth_subscribe", "eth_subscription", "eth_unsubscribe", eth_subscribe)?;
362
363    Ok(module)
364}
365
366// -----------------------------------------------------------------------------
367// Debug
368// -----------------------------------------------------------------------------
369
370#[cfg(feature = "dev")]
371fn evm_mine(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
372    ctx.server.miner.mine_local_and_commit()?;
373    Ok(to_json_value(true))
374}
375
376#[cfg(feature = "dev")]
377fn stratus_clear_cache(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
378    ctx.server.storage.clear_cache();
379    Ok(to_json_value(true))
380}
381
382#[cfg(feature = "dev")]
383fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
384    use crate::eth::primitives::UnixTime;
385    use crate::log_and_err;
386
387    let (_, timestamp) = next_rpc_param::<UnixTime>(params.sequence())?;
388    let latest = ctx.server.storage.read_block(BlockFilter::Latest)?;
389    match latest {
390        Some(block) => UnixTime::set_offset(block.header.timestamp, timestamp)?,
391        None => return log_and_err!("reading latest block returned None")?,
392    }
393    Ok(to_json_value(timestamp))
394}
395
396// -----------------------------------------------------------------------------
397// Status - Health checks
398// -----------------------------------------------------------------------------
399
400#[allow(unused_variables)]
401async fn stratus_health(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
402    #[cfg(feature = "dev")]
403    ctx.server.update_health().await;
404
405    if GlobalState::is_shutdown() {
406        tracing::warn!("liveness check failed because of shutdown");
407        return Err(StateError::StratusShutdown.into());
408    }
409
410    if GlobalState::is_healthy() {
411        Ok(json!(true))
412    } else {
413        Err(StateError::StratusNotReady.into())
414    }
415}
416
417// -----------------------------------------------------------------------------
418// Stratus - Admin
419// -----------------------------------------------------------------------------
420
421#[cfg(feature = "dev")]
422fn stratus_reset(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
423    ctx.server.storage.reset_to_genesis()?;
424    Ok(to_json_value(true))
425}
426
427#[cfg(feature = "dev")]
428fn stratus_set_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
429    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
430    let (params, index) = next_rpc_param::<SlotIndex>(params)?;
431    let (_, value) = next_rpc_param::<SlotValue>(params)?;
432
433    tracing::info!(%address, %index, %value, "setting storage at address and index");
434
435    ctx.server.storage.set_storage_at(address, index, value)?;
436
437    Ok(to_json_value(true))
438}
439
440#[cfg(feature = "dev")]
441fn stratus_set_nonce(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
442    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
443    let (_, nonce) = next_rpc_param::<Nonce>(params)?;
444
445    tracing::info!(%address, %nonce, "setting nonce for address");
446
447    ctx.server.storage.set_nonce(address, nonce)?;
448
449    Ok(to_json_value(true))
450}
451
452#[cfg(feature = "dev")]
453fn stratus_set_balance(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
454    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
455    let (_, balance) = next_rpc_param::<Wei>(params)?;
456
457    tracing::info!(%address, %balance, "setting balance for address");
458
459    ctx.server.storage.set_balance(address, balance)?;
460
461    Ok(to_json_value(true))
462}
463
464#[cfg(feature = "dev")]
465fn stratus_set_code(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
466    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
467    let (_, code) = next_rpc_param::<Bytes>(params)?;
468
469    tracing::info!(%address, code_size = %code.0.len(), "setting code for address");
470
471    ctx.server.storage.set_code(address, code)?;
472
473    Ok(to_json_value(true))
474}
475
476static MODE_CHANGE_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
477
478async fn stratus_change_to_leader(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
479    ext.authentication().auth_admin()?;
480    let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
481    let _permit: SemaphorePermit = match permit {
482        Ok(permit) => permit,
483        Err(_) => return Err(StateError::ModeChangeInProgress.into()),
484    };
485
486    const LEADER_MINER_INTERVAL: Duration = Duration::from_secs(1);
487    tracing::info!("starting process to change node to leader");
488
489    if GlobalState::get_node_mode() == NodeMode::Leader {
490        tracing::info!("node is already in leader mode, no changes made");
491        return Ok(json!(false));
492    }
493
494    if GlobalState::is_transactions_enabled() {
495        tracing::error!("transactions are currently enabled, cannot change node mode");
496        return Err(StateError::TransactionsEnabled.into());
497    }
498
499    tracing::info!("shutting down importer");
500    let shutdown_importer_result = stratus_shutdown_importer(Params::new(None), &ctx, &ext);
501    match shutdown_importer_result {
502        Ok(_) => tracing::info!("importer shutdown successfully"),
503        Err(StratusError::Importer(ImporterError::AlreadyShutdown)) => {
504            tracing::warn!("importer is already shutdown, continuing");
505        }
506        Err(e) => {
507            tracing::error!(reason = ?e, "failed to shutdown importer");
508            return Err(e);
509        }
510    }
511
512    tracing::info!("wait for importer to shutdown");
513    GlobalState::wait_for_importer_to_finish().await;
514
515    let change_miner_mode_result = change_miner_mode(MinerMode::Interval(LEADER_MINER_INTERVAL), &ctx).await;
516    if let Err(e) = change_miner_mode_result {
517        tracing::error!(reason = ?e, "failed to change miner mode");
518        return Err(e);
519    }
520    tracing::info!("miner mode changed to interval(1s) successfully");
521
522    GlobalState::set_node_mode(NodeMode::Leader);
523    ctx.server.storage.clear_cache();
524    tracing::info!("node mode changed to leader successfully, cache cleared");
525
526    Ok(json!(true))
527}
528
529async fn stratus_change_to_follower(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
530    ext.authentication().auth_admin()?;
531    let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
532    let _permit: SemaphorePermit = match permit {
533        Ok(permit) => permit,
534        Err(_) => return Err(StateError::ModeChangeInProgress.into()),
535    };
536
537    tracing::info!("starting process to change node to follower");
538
539    if GlobalState::get_node_mode() == NodeMode::Follower {
540        tracing::info!("node is already in follower mode, no changes made");
541        return Ok(json!(false));
542    }
543
544    if GlobalState::is_transactions_enabled() {
545        tracing::error!("transactions are currently enabled, cannot change node mode");
546        return Err(StateError::TransactionsEnabled.into());
547    }
548
549    let pending_txs = ctx.server.storage.pending_transactions();
550    if not(pending_txs.is_empty()) {
551        tracing::error!(pending_txs = ?pending_txs.len(), "cannot change to follower mode with pending transactions");
552        return Err(StorageError::PendingTransactionsExist {
553            pending_txs: pending_txs.len(),
554        }
555        .into());
556    }
557
558    let change_miner_mode_result = change_miner_mode(MinerMode::External, &ctx).await;
559    if let Err(e) = change_miner_mode_result {
560        tracing::error!(reason = ?e, "failed to change miner mode");
561        return Err(e);
562    }
563    tracing::info!("miner mode changed to external successfully");
564
565    GlobalState::set_node_mode(NodeMode::Follower);
566    ctx.server.storage.clear_cache();
567    tracing::info!("storage cache cleared");
568
569    tracing::info!("initializing importer");
570    let init_importer_result = stratus_init_importer(params, Arc::clone(&ctx), ext).await;
571    match init_importer_result {
572        Ok(_) => tracing::info!("importer initialized successfully"),
573        Err(StratusError::Importer(ImporterError::AlreadyRunning)) => {
574            tracing::warn!("importer is already running, continuing");
575        }
576        Err(e) => {
577            tracing::error!(reason = ?e, "failed to initialize importer, reverting node mode to leader");
578            GlobalState::set_node_mode(NodeMode::Leader);
579            return Err(e);
580        }
581    }
582    tracing::info!("node mode changed to follower successfully");
583
584    Ok(json!(true))
585}
586
587async fn stratus_init_importer(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
588    ext.authentication().auth_admin()?;
589    let (params, external_rpc) = next_rpc_param::<String>(params.sequence())?;
590    let (params, external_rpc_ws) = next_rpc_param::<String>(params)?;
591    let (params, raw_external_rpc_timeout) = next_rpc_param::<String>(params)?;
592    let (params, raw_sync_interval) = next_rpc_param::<String>(params)?;
593    let (_, raw_external_rpc_max_response_size_bytes) = next_rpc_param::<String>(params)?;
594
595    let external_rpc_timeout = parse_duration(&raw_external_rpc_timeout).map_err(|e| {
596        tracing::error!(reason = ?e, "failed to parse external_rpc_timeout");
597        ImporterError::ConfigParseError
598    })?;
599
600    let sync_interval = parse_duration(&raw_sync_interval).map_err(|e| {
601        tracing::error!(reason = ?e, "failed to parse sync_interval");
602        ImporterError::ConfigParseError
603    })?;
604
605    let external_rpc_max_response_size_bytes = raw_external_rpc_max_response_size_bytes.parse::<u32>().map_err(|e| {
606        tracing::error!(reason = ?e, "failed to parse external_rpc_max_response_size_bytes");
607        ImporterError::ConfigParseError
608    })?;
609
610    let importer_config = ImporterConfig {
611        external_rpc,
612        external_rpc_ws: Some(external_rpc_ws),
613        external_rpc_timeout,
614        sync_interval,
615        external_rpc_max_response_size_bytes,
616        enable_block_changes_replication: std::env::var("ENABLE_BLOCK_CHANGES_REPLICATION")
617            .ok()
618            .is_some_and(|val| val == "1" || val == "true"),
619    };
620
621    importer_config.init_follower_importer(ctx).await
622}
623
624fn stratus_shutdown_importer(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<JsonValue, StratusError> {
625    ext.authentication().auth_admin()?;
626    if GlobalState::get_node_mode() != NodeMode::Follower {
627        tracing::error!("node is currently not a follower");
628        return Err(StateError::StratusNotFollower.into());
629    }
630
631    if GlobalState::is_importer_shutdown() && ctx.server.read_importer().is_none() {
632        tracing::error!("importer is already shut down");
633        return Err(ImporterError::AlreadyShutdown.into());
634    }
635
636    ctx.server.set_importer(None);
637
638    const TASK_NAME: &str = "rpc-server::importer-shutdown";
639    GlobalState::shutdown_importer_from(TASK_NAME, "received importer shutdown request");
640
641    Ok(json!(true))
642}
643
644async fn stratus_change_miner_mode(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
645    ext.authentication().auth_admin()?;
646    let (_, mode_str) = next_rpc_param::<String>(params.sequence())?;
647
648    let mode = MinerMode::from_str(&mode_str).map_err(|e| {
649        tracing::error!(reason = ?e, "failed to parse miner mode");
650        RpcError::MinerModeParamInvalid
651    })?;
652
653    change_miner_mode(mode, &ctx).await
654}
655
656/// Tries changing miner mode, returns `Ok(true)` if changed, and `Ok(false)` if no changing was necessary.
657///
658/// This function also enables the miner after changing it.
659async fn change_miner_mode(new_mode: MinerMode, ctx: &RpcContext) -> Result<JsonValue, StratusError> {
660    if GlobalState::is_transactions_enabled() {
661        tracing::error!("cannot change miner mode while transactions are enabled");
662        return Err(StateError::TransactionsEnabled.into());
663    }
664
665    let previous_mode = ctx.server.miner.mode();
666
667    if previous_mode == new_mode {
668        tracing::warn!(?new_mode, current = ?new_mode, "miner mode already set, skipping");
669        return Ok(json!(false));
670    }
671
672    if not(ctx.server.miner.is_paused()) && previous_mode.is_interval() {
673        return log_and_err!("can't change miner mode from Interval without pausing it first").map_err(Into::into);
674    }
675
676    match new_mode {
677        MinerMode::External => {
678            tracing::info!("changing miner mode to External");
679
680            let pending_txs = ctx.server.storage.pending_transactions();
681            if not(pending_txs.is_empty()) {
682                tracing::error!(pending_txs = ?pending_txs.len(), "cannot change miner mode to External with pending transactions");
683                return Err(StorageError::PendingTransactionsExist {
684                    pending_txs: pending_txs.len(),
685                }
686                .into());
687            }
688
689            ctx.server.miner.switch_to_external_mode().await;
690        }
691        MinerMode::Interval(duration) => {
692            tracing::info!(duration = ?duration, "changing miner mode to Interval");
693
694            if ctx.server.read_importer().is_some() {
695                tracing::error!("cannot change miner mode to Interval with consensus set");
696                return Err(ConsensusError::Set.into());
697            }
698
699            ctx.server.miner.start_interval_mining(duration).await;
700        }
701        MinerMode::Automine => {
702            return log_and_err!("Miner mode change to 'automine' is unsupported.").map_err(Into::into);
703        }
704    }
705
706    Ok(json!(true))
707}
708
709fn stratus_enable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
710    ext.authentication().auth_admin()?;
711    GlobalState::set_unknown_client_enabled(true);
712    Ok(GlobalState::is_unknown_client_enabled())
713}
714
715fn stratus_disable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
716    ext.authentication().auth_admin()?;
717    GlobalState::set_unknown_client_enabled(false);
718    Ok(GlobalState::is_unknown_client_enabled())
719}
720
721fn stratus_enable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
722    ext.authentication().auth_admin()?;
723    GlobalState::set_transactions_enabled(true);
724    Ok(GlobalState::is_transactions_enabled())
725}
726
727fn stratus_disable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
728    ext.authentication().auth_admin()?;
729    GlobalState::set_transactions_enabled(false);
730    Ok(GlobalState::is_transactions_enabled())
731}
732
733fn stratus_enable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
734    ext.authentication().auth_admin()?;
735    ctx.server.miner.unpause();
736    Ok(true)
737}
738
739fn stratus_disable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
740    ext.authentication().auth_admin()?;
741    ctx.server.miner.pause();
742    Ok(false)
743}
744
745/// Returns the count of executed transactions waiting to enter the next block.
746fn stratus_pending_transactions_count(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> usize {
747    ctx.server.storage.pending_transactions().len()
748}
749
750fn stratus_disable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
751    ext.authentication().auth_admin()?;
752    GlobalState::set_restart_on_unhealthy(false);
753    Ok(GlobalState::restart_on_unhealthy())
754}
755
756fn stratus_enable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
757    ext.authentication().auth_admin()?;
758    GlobalState::set_restart_on_unhealthy(true);
759    Ok(GlobalState::restart_on_unhealthy())
760}
761
762// -----------------------------------------------------------------------------
763// Stratus - State
764// -----------------------------------------------------------------------------
765
766fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
767    Ok(build_info::as_json())
768}
769
770fn stratus_config(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<StratusConfig, StratusError> {
771    ext.authentication().auth_admin()?;
772    Ok(ctx.server.app_config.clone())
773}
774
775fn stratus_state(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
776    Ok(GlobalState::get_global_state_as_json(ctx))
777}
778
779async fn stratus_get_subscriptions(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
780    // NOTE: this is a workaround for holding only one lock at a time
781    let pending_txs = serde_json::to_value(ctx.subs.pending_txs.read().await.values().collect_vec()).expect_infallible();
782    let new_heads = serde_json::to_value(ctx.subs.new_heads.read().await.values().collect_vec()).expect_infallible();
783    let logs = serde_json::to_value(ctx.subs.logs.read().await.values().flat_map(HashMap::values).collect_vec()).expect_infallible();
784
785    let response = json!({
786        "newPendingTransactions": pending_txs,
787        "newHeads": new_heads,
788        "logs": logs,
789    });
790    Ok(response)
791}
792
793// -----------------------------------------------------------------------------
794// Blockchain
795// -----------------------------------------------------------------------------
796
797async fn net_listening(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
798    let net_listening = stratus_health(params, ctx, ext).await;
799
800    tracing::info!(net_listening = ?net_listening, "network listening status");
801
802    net_listening
803}
804
805fn net_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
806    ctx.server.chain_id.to_string()
807}
808
809fn eth_chain_id(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
810    hex_num(ctx.server.chain_id)
811}
812
813fn web3_client_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
814    ctx.client_version.to_owned()
815}
816
817// -----------------------------------------------------------------------------
818// Gas
819// -----------------------------------------------------------------------------
820
821fn eth_gas_price(_: Params<'_>, _: &RpcContext, _: &Extensions) -> String {
822    hex_zero()
823}
824
825// -----------------------------------------------------------------------------
826// Block
827// -----------------------------------------------------------------------------
828
829fn eth_block_number(_params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
830    // enter span
831    let _middleware_enter = ext.enter_middleware_span();
832    let _method_enter = info_span!("rpc::eth_blockNumber", block_number = field::Empty).entered();
833
834    // execute
835    let block_number = ctx.server.storage.read_mined_block_number();
836    Span::with(|s| s.rec_str("block_number", &block_number));
837
838    Ok(to_json_value(block_number))
839}
840
841fn stratus_get_block_and_receipts(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
842    // enter span
843    let _middleware_enter = ext.enter_middleware_span();
844    let _method_enter = info_span!("rpc::stratus_getBlockAndReceipts").entered();
845
846    // parse params
847    let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
848
849    // track
850    tracing::info!(%filter, "reading block and receipts");
851
852    let Some(block) = ctx.server.storage.read_block(filter)? else {
853        tracing::info!(%filter, "block not found");
854        return Ok(JsonValue::Null);
855    };
856
857    tracing::info!(%filter, "block with transactions found");
858    let receipts = block.transactions.iter().cloned().map(AlloyReceipt::from).collect::<Vec<_>>();
859
860    Ok(json!({
861        "block": block.to_json_rpc_with_full_transactions(),
862        "receipts": receipts,
863    }))
864}
865
866fn stratus_get_block_with_changes(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
867    // enter span
868    let _middleware_enter = ext.enter_middleware_span();
869    let _method_enter = info_span!("rpc::stratus_getBlockWithChanges").entered();
870
871    // parse params
872    let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
873
874    // track
875    tracing::info!(%filter, "reading block and changes");
876
877    let Some(block) = ctx.server.storage.read_block_with_changes(filter)? else {
878        tracing::info!(%filter, "block not found");
879        return Ok(JsonValue::Null);
880    };
881
882    tracing::info!(%filter, "block with changes found");
883
884    Ok(json!(block))
885}
886
887fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
888    let _middleware_enter = ext.enter_middleware_span();
889
890    let _method_enter = info_span!(
891        "rpc::eth_getBlockByHash",
892        filter = field::Empty,
893        block_number = field::Empty,
894        found = field::Empty
895    )
896    .entered();
897
898    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
899    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
900
901    match &filter {
902        BlockFilter::Hash(_) => (),
903        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
904    }
905
906    eth_get_block_by_selector(filter, full_transactions, ctx)
907}
908
909fn eth_get_block_by_number(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
910    let _middleware_enter = ext.enter_middleware_span();
911
912    let _method_enter = info_span!(
913        "rpc::eth_getBlockByNumber",
914        filter = field::Empty,
915        block_number = field::Empty,
916        found = field::Empty
917    )
918    .entered();
919
920    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
921    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
922
923    match &filter {
924        BlockFilter::Earliest | BlockFilter::Latest | BlockFilter::Number(_) | BlockFilter::Pending => (),
925        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
926    }
927
928    eth_get_block_by_selector(filter, full_transactions, ctx)
929}
930
931#[inline(always)]
932fn eth_get_block_by_selector(filter: BlockFilter, full_transactions: bool, ctx: Arc<RpcContext>) -> Result<JsonValue, StratusError> {
933    // track
934    Span::with(|s| s.rec_str("filter", &filter));
935    tracing::info!(%filter, %full_transactions, "reading block");
936
937    // execute
938    let block = ctx.server.storage.read_block(filter)?;
939    Span::with(|s| {
940        s.record("found", block.is_some());
941        if let Some(ref block) = block {
942            s.rec_str("block_number", &block.number());
943        }
944    });
945    match (block, full_transactions) {
946        (Some(block), true) => {
947            tracing::info!(%filter, "block with full transactions found");
948            Ok(block.to_json_rpc_with_full_transactions())
949        }
950        (Some(block), false) => {
951            tracing::info!(%filter, "block with only hashes found");
952            Ok(block.to_json_rpc_with_transactions_hashes())
953        }
954        (None, _) => {
955            tracing::info!(%filter, "block not found");
956            Ok(JsonValue::Null)
957        }
958    }
959}
960
961fn eth_get_uncle_by_block_hash_and_index(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
962    Ok(JsonValue::Null)
963}
964
965fn stratus_get_block_by_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
966    let _middleware_enter = ext.enter_middleware_span();
967    let _method_enter = info_span!(
968        "rpc::stratus_getBlockByTimestamp",
969        timestamp = field::Empty,
970        block_number = field::Empty,
971        found = field::Empty
972    )
973    .entered();
974
975    let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
976    let (_, full_transactions) = next_rpc_param::<bool>(params)?;
977
978    match &filter {
979        BlockFilter::Timestamp { .. } => (),
980        _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
981    }
982
983    eth_get_block_by_selector(filter, full_transactions, ctx)
984}
985
986// -----------------------------------------------------------------------------
987// Transaction
988// -----------------------------------------------------------------------------
989
990fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
991    // enter span
992    let _middleware_enter = ext.enter_middleware_span();
993    let _method_enter = info_span!("rpc::eth_getTransactionByHash", tx_hash = field::Empty, found = field::Empty).entered();
994
995    // parse params
996    let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
997
998    // track
999    Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1000    tracing::info!(%tx_hash, "reading transaction");
1001
1002    // execute
1003    let tx = ctx.server.storage.read_transaction(tx_hash)?;
1004    Span::with(|s| {
1005        s.record("found", tx.is_some());
1006    });
1007
1008    match tx {
1009        Some(tx) => {
1010            tracing::info!(%tx_hash, "transaction found");
1011            Ok(tx.to_json_rpc_transaction())
1012        }
1013        None => {
1014            tracing::info!(%tx_hash, "transaction not found");
1015            Ok(JsonValue::Null)
1016        }
1017    }
1018}
1019
1020fn rpc_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<Option<TransactionStage>, StratusError> {
1021    // parse params
1022    let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1023
1024    // track
1025    Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1026    tracing::info!("reading transaction receipt");
1027
1028    // execute
1029    let tx = ctx.server.storage.read_transaction(tx_hash)?;
1030    Span::with(|s| {
1031        s.record("found", tx.is_some());
1032    });
1033
1034    Ok(tx)
1035}
1036
1037fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1038    // enter span
1039    let _middleware_enter = ext.enter_middleware_span();
1040    let _method_enter = info_span!("rpc::eth_getTransactionReceipt", tx_hash = field::Empty, found = field::Empty).entered();
1041
1042    match rpc_get_transaction_receipt(params, ctx)? {
1043        Some(tx) => {
1044            tracing::info!("transaction receipt found");
1045            Ok(tx.to_json_rpc_receipt())
1046        }
1047        None => {
1048            tracing::info!("transaction receipt not found");
1049            Ok(JsonValue::Null)
1050        }
1051    }
1052}
1053
1054fn stratus_get_transaction_result(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1055    // enter span
1056    let _middleware_enter = ext.enter_middleware_span();
1057    let _method_enter = info_span!("rpc::stratus_get_transaction_result", tx_hash = field::Empty, found = field::Empty).entered();
1058
1059    match rpc_get_transaction_receipt(params, ctx)? {
1060        Some(tx) => {
1061            tracing::info!("transaction receipt found");
1062            Ok(to_json_value(tx.to_result().execution.result))
1063        }
1064        None => {
1065            tracing::info!("transaction receipt not found");
1066            Ok(JsonValue::Null)
1067        }
1068    }
1069}
1070
1071fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1072    // enter span
1073    let _middleware_enter = ext.enter_middleware_span();
1074    let _method_enter = info_span!("rpc::eth_estimateGas", tx_from = field::Empty, tx_to = field::Empty).entered();
1075
1076    // parse params
1077    let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;
1078
1079    // track
1080    Span::with(|s| {
1081        s.rec_opt("tx_from", &call.from);
1082        s.rec_opt("tx_to", &call.to);
1083    });
1084    tracing::info!("executing eth_estimateGas");
1085
1086    // execute
1087    match ctx.server.executor.execute_local_call(call, PointInTime::Mined) {
1088        // result is success
1089        Ok(result) if result.is_success() => {
1090            tracing::info!(tx_output = %result.output, "executed eth_estimateGas with success");
1091            let overestimated_gas = (result.gas_used.as_u64()) as f64 * 1.1;
1092            Ok(hex_num(U256::from(overestimated_gas as u64)))
1093        }
1094
1095        // result is failure
1096        Ok(result) => {
1097            tracing::warn!(tx_output = %result.output, "executed eth_estimateGas with failure");
1098            Err(TransactionError::RevertedCall { output: result.output }.into())
1099        }
1100
1101        // internal error
1102        Err(e) => {
1103            tracing::warn!(reason = ?e, "failed to execute eth_estimateGas");
1104            Err(e)
1105        }
1106    }
1107}
1108
1109fn rpc_call(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<EvmExecution, StratusError> {
1110    // parse params
1111    let (params, call) = next_rpc_param::<CallInput>(params.sequence())?;
1112    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1113
1114    // track
1115    Span::with(|s| {
1116        s.rec_opt("tx_from", &call.from);
1117        s.rec_opt("tx_to", &call.to);
1118        s.rec_str("filter", &filter);
1119    });
1120    tracing::info!(%filter, "executing eth_call");
1121
1122    // execute
1123    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1124    ctx.server.executor.execute_local_call(call, point_in_time)
1125}
1126
1127fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1128    // enter span
1129    let _middleware_enter = ext.enter_middleware_span();
1130    let _method_enter = info_span!("rpc::eth_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1131
1132    match rpc_call(params, ctx) {
1133        // result is success
1134        Ok(result) if result.is_success() => {
1135            tracing::info!(tx_output = %result.output, "executed eth_call with success");
1136            Ok(hex_data(result.output))
1137        }
1138        // result is failure
1139        Ok(result) => {
1140            tracing::warn!(tx_output = %result.output, "executed eth_call with failure");
1141            Err(TransactionError::RevertedCall { output: result.output }.into())
1142        }
1143        // internal error
1144        Err(e) => {
1145            tracing::warn!(reason = ?e, "failed to execute eth_call");
1146            Err(e)
1147        }
1148    }
1149}
1150
1151fn debug_trace_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1152    // enter span
1153    let _middleware_enter = ext.enter_middleware_span();
1154    let _method_enter = info_span!("rpc::debug_traceTransaction", tx_hash = field::Empty,).entered();
1155
1156    let (params, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1157    let (_, opts) = next_rpc_param_or_default::<Option<GethDebugTracingOptions>>(params)?;
1158    let trace_unsuccessful_only = ctx
1159        .server
1160        .rpc_config
1161        .rpc_debug_trace_unsuccessful_only
1162        .as_ref()
1163        .is_some_and(|inner| inner.contains(ext.rpc_client()));
1164
1165    match ctx.server.executor.trace_transaction(tx_hash, opts, trace_unsuccessful_only) {
1166        Ok(result) => {
1167            tracing::info!(?tx_hash, "executed debug_traceTransaction successfully");
1168
1169            // Enhance GethTrace with decoded information using serialization approach
1170            let enhanced_response = enhance_trace_with_decoded_info(&result);
1171
1172            Ok(enhanced_response)
1173        }
1174        Err(err) => {
1175            tracing::warn!(?err, "error executing debug_traceTransaction");
1176            Err(err)
1177        }
1178    }
1179}
1180
1181fn stratus_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1182    // enter span
1183    let _middleware_enter = ext.enter_middleware_span();
1184    let _method_enter = info_span!("rpc::stratus_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1185
1186    match rpc_call(params, ctx) {
1187        // result is success
1188        Ok(result) if result.is_success() => {
1189            tracing::info!(tx_output = %result.output, "executed stratus_call with success");
1190            Ok(hex_data(result.output))
1191        }
1192        // result is failure
1193        Ok(result) => {
1194            tracing::warn!(tx_output = %result.output, "executed stratus_call with failure");
1195            Err(TransactionError::RevertedCallWithReason {
1196                reason: (&result.output).into(),
1197            }
1198            .into())
1199        }
1200        // internal error
1201        Err(e) => {
1202            tracing::warn!(reason = ?e, "failed to execute stratus_call");
1203            Err(e)
1204        }
1205    }
1206}
1207
1208fn eth_send_raw_transaction(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1209    // enter span
1210    let _middleware_enter = ext.enter_middleware_span();
1211    let _method_enter = info_span!(
1212        "rpc::eth_sendRawTransaction",
1213        tx_hash = field::Empty,
1214        tx_from = field::Empty,
1215        tx_to = field::Empty,
1216        tx_nonce = field::Empty
1217    )
1218    .entered();
1219
1220    // get the pre-decoded transaction from extensions
1221    let (tx, tx_data) = match (ext.get::<TransactionInput>(), ext.get::<Bytes>()) {
1222        (Some(tx), Some(data)) => (tx.clone(), data.clone()),
1223        _ => {
1224            tracing::error!("failed to execute eth_sendRawTransaction because transaction input is not available");
1225            return Err(RpcError::TransactionInvalid {
1226                decode_error: "transaction input is not available".to_string(),
1227            }
1228            .into());
1229        }
1230    };
1231    let tx_hash = tx.transaction_info.hash;
1232
1233    // track
1234    Span::with(|s| {
1235        s.rec_str("tx_hash", &tx_hash);
1236        s.rec_str("tx_from", &tx.execution_info.signer);
1237        s.rec_opt("tx_to", &tx.execution_info.to);
1238        s.rec_str("tx_nonce", &tx.execution_info.nonce);
1239    });
1240
1241    if not(GlobalState::is_transactions_enabled()) {
1242        tracing::warn!(%tx_hash, "failed to execute eth_sendRawTransaction because transactions are disabled");
1243        return Err(StateError::TransactionsDisabled.into());
1244    }
1245
1246    // execute locally or forward to leader
1247    match GlobalState::get_node_mode() {
1248        NodeMode::Leader | NodeMode::FakeLeader => match ctx.server.executor.execute_local_transaction(tx) {
1249            Ok(_) => Ok(hex_data(tx_hash)),
1250            Err(e) => {
1251                tracing::warn!(reason = ?e, ?tx_hash, "failed to execute eth_sendRawTransaction");
1252                Err(e)
1253            }
1254        },
1255        NodeMode::Follower => match &ctx.server.read_importer() {
1256            Some(importer) => match Handle::current().block_on(importer.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) {
1257                Ok(hash) => Ok(hex_data(hash)),
1258                Err(e) => Err(e),
1259            },
1260            None => {
1261                tracing::error!("unable to forward transaction because consensus is temporarily unavailable for follower node");
1262                Err(ConsensusError::Unavailable.into())
1263            }
1264        },
1265    }
1266}
1267
1268// -----------------------------------------------------------------------------
1269// Logs
1270// -----------------------------------------------------------------------------
1271
1272fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1273    const MAX_BLOCK_RANGE: u64 = 5_000;
1274
1275    // enter span
1276    let _middleware_enter = ext.enter_middleware_span();
1277    let _method_enter = info_span!(
1278        "rpc::eth_getLogs",
1279        filter = field::Empty,
1280        filter_from = field::Empty,
1281        filter_to = field::Empty,
1282        filter_range = field::Empty,
1283        filter_event = field::Empty
1284    )
1285    .entered();
1286
1287    // parse params
1288    let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params.sequence())?;
1289    let mut filter = filter_input.parse(&ctx.server.storage)?;
1290
1291    // for this operation, the filter always need the end block specified to calculate the difference
1292    let to_block = match filter.to_block {
1293        Some(block) => block,
1294        None => {
1295            let block = ctx.server.storage.read_mined_block_number();
1296            filter.to_block = Some(block);
1297            block
1298        }
1299    };
1300    let blocks_in_range = filter.from_block.count_to(to_block);
1301
1302    let event_name = codegen::event_names_from_filter(&filter);
1303
1304    // track
1305    Span::with(|s| {
1306        s.rec_str("filter", &to_json_string(&filter));
1307        s.rec_str("filter_from", &filter.from_block);
1308        s.rec_str("filter_to", &to_block);
1309        s.rec_str("filter_range", &blocks_in_range);
1310        s.rec_str("filter_event", &event_name);
1311    });
1312    tracing::info!(?filter, filter_event = %event_name, "reading logs");
1313
1314    // check range
1315    if blocks_in_range > MAX_BLOCK_RANGE {
1316        return Err(RpcError::BlockRangeInvalid {
1317            actual: blocks_in_range,
1318            max: MAX_BLOCK_RANGE,
1319        }
1320        .into());
1321    }
1322
1323    // execute
1324    let logs = ctx.server.storage.read_logs(&filter)?;
1325    Ok(JsonValue::Array(logs.into_iter().map(|x| x.to_json_rpc_log()).collect()))
1326}
1327
1328// -----------------------------------------------------------------------------
1329// Account
1330// -----------------------------------------------------------------------------
1331
1332fn eth_accounts(_: Params<'_>, _ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
1333    Ok(json!([]))
1334}
1335
1336fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1337    // enter span
1338    let _middleware_enter = ext.enter_middleware_span();
1339    let _method_enter = info_span!("rpc::eth_getTransactionCount", address = field::Empty, filter = field::Empty).entered();
1340
1341    // pare params
1342    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1343    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1344
1345    // track
1346    Span::with(|s| {
1347        s.rec_str("address", &address);
1348        s.rec_str("address", &filter);
1349    });
1350    tracing::info!(%address, %filter, "reading account nonce");
1351
1352    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1353    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1354    Ok(hex_num(account.nonce))
1355}
1356
1357fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1358    // enter span
1359    let _middleware_enter = ext.enter_middleware_span();
1360    let _method_enter = info_span!("rpc::eth_getBalance", address = field::Empty, filter = field::Empty).entered();
1361
1362    // parse params
1363    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1364    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1365
1366    // track
1367    Span::with(|s| {
1368        s.rec_str("address", &address);
1369        s.rec_str("filter", &filter);
1370    });
1371    tracing::info!(%address, %filter, "reading account native balance");
1372
1373    // execute
1374    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1375    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1376    Ok(hex_num(account.balance))
1377}
1378
1379fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1380    // enter span
1381    let _middleware_enter = ext.enter_middleware_span();
1382    let _method_enter = info_span!("rpc::eth_getCode", address = field::Empty, filter = field::Empty).entered();
1383
1384    // parse params
1385    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1386    let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1387
1388    // track
1389    Span::with(|s| {
1390        s.rec_str("address", &address);
1391        s.rec_str("filter", &filter);
1392    });
1393
1394    // execute
1395    let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1396    let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1397
1398    Ok(account.bytecode.map(|bytecode| hex_data(bytecode.original_bytes())).unwrap_or_else(hex_null))
1399}
1400
1401// -----------------------------------------------------------------------------
1402// Subscriptions
1403// -----------------------------------------------------------------------------
1404
1405async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc<RpcContext>, ext: Extensions) -> impl IntoSubscriptionCloseResponse {
1406    // `middleware_enter` created to be used as a parent by `method_span`
1407    let middleware_enter = ext.enter_middleware_span();
1408    let method_span = info_span!("rpc::eth_subscribe", subscription = field::Empty, subscription_event = field::Empty);
1409    drop(middleware_enter);
1410
1411    async move {
1412        // parse params
1413        let client = ext.rpc_client();
1414        let (params, event) = match next_rpc_param::<String>(params.sequence()) {
1415            Ok((params, event)) => (params, event),
1416            Err(e) => {
1417                pending.reject(StratusError::from(e)).await;
1418                return Ok(());
1419            }
1420        };
1421
1422        // check subscription limits
1423        if let Err(e) = ctx.subs.check_client_subscriptions(ctx.server.rpc_config.rpc_max_subscriptions, client).await {
1424            pending.reject(e).await;
1425            return Ok(());
1426        }
1427
1428        // track
1429        Span::with(|s| s.rec_str("subscription", &event));
1430        tracing::info!(%event, "subscribing to rpc event");
1431
1432        // execute
1433        match event.deref() {
1434            "newPendingTransactions" => {
1435                ctx.subs.add_new_pending_txs_subscription(client, pending.accept().await?).await;
1436            }
1437
1438            "newHeads" => {
1439                ctx.subs.add_new_heads_subscription(client, pending.accept().await?).await;
1440            }
1441
1442            "logs" => {
1443                let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params)?;
1444                let filter = filter_input.parse(&ctx.server.storage)?;
1445
1446                let event_name = codegen::event_names_from_filter(&filter).to_string();
1447                Span::with(|s| s.rec_str("subscription_event", &event_name));
1448                tracing::info!(subscription_event = %event_name, "subscribing to logs with event filter");
1449
1450                ctx.subs.add_logs_subscription(client, filter, pending.accept().await?).await;
1451            }
1452
1453            // unsupported
1454            event => {
1455                pending
1456                    .reject(StratusError::from(RpcError::SubscriptionInvalid { event: event.to_string() }))
1457                    .await;
1458            }
1459        }
1460
1461        Ok(())
1462    }
1463    .instrument(method_span)
1464    .await
1465}
1466
1467// -----------------------------------------------------------------------------
1468// Storage
1469// -----------------------------------------------------------------------------
1470
1471fn eth_get_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1472    // enter span
1473    let _middleware_enter = ext.enter_middleware_span();
1474    let _method_enter = info_span!("rpc::eth_getStorageAt", address = field::Empty, index = field::Empty).entered();
1475
1476    // parse params
1477    let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1478    let (params, index) = next_rpc_param::<SlotIndex>(params)?;
1479    let (_, block_filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1480
1481    Span::with(|s| {
1482        s.rec_str("address", &address);
1483        s.rec_str("index", &index);
1484    });
1485
1486    // execute
1487    let point_in_time = ctx.server.storage.translate_to_point_in_time(block_filter)?;
1488    let slot = ctx.server.storage.read_slot(address, index, point_in_time, ReadKind::RPC)?;
1489
1490    // It must be padded, even if it is zero.
1491    Ok(hex_num_zero_padded(slot.value.as_u256()))
1492}
1493
1494// -----------------------------------------------------------------------------
1495// Response helpers
1496// -----------------------------------------------------------------------------
1497
1498#[inline(always)]
1499fn hex_data<T: AsRef<[u8]>>(value: T) -> String {
1500    const_hex::encode_prefixed(value)
1501}
1502
1503#[inline(always)]
1504fn hex_num(value: impl Into<U256>) -> String {
1505    format!("{:#x}", value.into())
1506}
1507
1508#[inline(always)]
1509fn hex_num_zero_padded(value: impl Into<U256>) -> String {
1510    let width = 64 + 2; //the prefix is included in the total width
1511    format!("{:#0width$x}", value.into(), width = width)
1512}
1513
1514fn hex_zero() -> String {
1515    "0x0".to_owned()
1516}
1517
1518fn hex_null() -> String {
1519    "0x".to_owned()
1520}
1521
1522/// Enhances trace using serialized JSON modification
1523fn enhance_trace_with_decoded_info(trace: &GethTrace) -> JsonValue {
1524    match trace {
1525        GethTrace::CallTracer(call_frame) => {
1526            // Serialize first, then enhance
1527            let mut json = to_json_value(call_frame);
1528            enhance_serialized_call_frame(&mut json);
1529            json
1530        }
1531        _ => {
1532            // For non-CallTracer traces, return as-is without enhancement
1533            to_json_value(trace)
1534        }
1535    }
1536}
1537
1538fn enhance_serialized_call_frame(json: &mut JsonValue) {
1539    if let Some(json_obj) = json.as_object_mut() {
1540        json_obj
1541            .get("from")
1542            .and_then(|v| v.as_str())
1543            .and_then(|s| s.parse::<Address>().ok())
1544            .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1545            .and_then(|s| json_obj.insert("decodedFromContract".to_string(), json!(s)));
1546
1547        json_obj
1548            .get("to")
1549            .and_then(|v| if v.is_null() { None } else { v.as_str() })
1550            .and_then(|s| s.parse::<Address>().ok())
1551            .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1552            .and_then(|s| json_obj.insert("decodedToContract".to_string(), json!(s)));
1553
1554        json_obj
1555            .get("input")
1556            .and_then(|v| v.as_str())
1557            .and_then(|s| const_hex::decode(s.trim_start_matches("0x")).ok())
1558            .inspect(|input| {
1559                if let Some(signature) = codegen::function_sig_opt(input) {
1560                    json_obj.insert("decodedFunctionSignature".to_string(), json!(signature));
1561                }
1562
1563                match decode::decode_input_arguments(input) {
1564                    Ok(args) => {
1565                        json_obj.insert("decodedFunctionArguments".to_string(), json!(args));
1566                    }
1567                    Err(DecodeInputError::InvalidAbi { message }) => {
1568                        tracing::warn!(
1569                            message = %message,
1570                            "Invalid ABI stored"
1571                        );
1572                    }
1573                    _ => (),
1574                }
1575            });
1576        if let Some(calls) = json_obj.get_mut("calls").and_then(|v| v.as_array_mut()) {
1577            for call in calls {
1578                enhance_serialized_call_frame(call);
1579            }
1580        }
1581    }
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586    use alloy_primitives::Address;
1587    use alloy_primitives::Bytes;
1588    use alloy_primitives::U256;
1589    use alloy_rpc_types_trace::geth::CallFrame;
1590
1591    use super::*;
1592
1593    fn create_simple_test_call_structure() -> CallFrame {
1594        // Create deepest level calls (level 3)
1595        let deep_call_1 = CallFrame {
1596            from: "0x562689c910361ae21d12eadafbfca727b3bcbc24".parse::<Address>().unwrap(), // Maps to Compound_Agent_4
1597            to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), // Maps to BRLCToken
1598            input: Bytes::from(
1599                const_hex::decode(
1600                    "70a08231000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e", // balanceOf function
1601                )
1602                .unwrap(),
1603            ),
1604            output: Some(Bytes::from(
1605                const_hex::decode("0000000000000000000000000000000000000000000000000de0b6b3a7640000").unwrap(),
1606            )),
1607            gas: U256::from(5000),
1608            gas_used: U256::from(3000),
1609            value: None,
1610            typ: "STATICCALL".to_string(),
1611            error: None,
1612            revert_reason: None,
1613            calls: Vec::new(),
1614            logs: Vec::new(),
1615        };
1616
1617        let deep_call_2 = CallFrame {
1618            from: "0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap(), // Maps to Cashier_BRLC_v2
1619            to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), // Maps to USJIMToken
1620            input: Bytes::from(
1621                const_hex::decode(
1622                    "dd62ed3e000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e1", // allowance function
1623                )
1624                .unwrap(),
1625            ),
1626            output: Some(Bytes::from(
1627                const_hex::decode("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(),
1628            )),
1629            gas: U256::from(8000),
1630            gas_used: U256::from(5000),
1631            value: None,
1632            typ: "STATICCALL".to_string(),
1633            error: None,
1634            revert_reason: None,
1635            calls: Vec::new(),
1636            logs: Vec::new(),
1637        };
1638
1639        // Create level 2 nested calls using real contract addresses from CONTRACTS map
1640        let nested_call_1 = CallFrame {
1641            from: "0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap(), // Maps to BRLCToken
1642            to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), // Maps to USJIMToken
1643            input: Bytes::from(
1644                const_hex::decode(
1645                    "a9059cbb000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", // transfer function
1646                )
1647                .unwrap(),
1648            ),
1649            output: Some(Bytes::from(
1650                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1651            )),
1652            gas: U256::from(21000),
1653            gas_used: U256::from(20000),
1654            value: None,
1655            typ: "CALL".to_string(),
1656            error: None,
1657            revert_reason: None,
1658            calls: vec![deep_call_1],
1659            logs: Vec::new(),
1660        };
1661
1662        let nested_call_2 = CallFrame {
1663            from: "0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap(), // Maps to USJIMToken
1664            to: Some("0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap()), // Maps to Cashier_BRLC_v2
1665            input: Bytes::from(
1666                const_hex::decode(
1667                    "095ea7b3000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", // approve function
1668                )
1669                .unwrap(),
1670            ),
1671            output: Some(Bytes::from(
1672                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1673            )),
1674            gas: U256::from(30000),
1675            gas_used: U256::from(25000),
1676            value: None,
1677            typ: "CALL".to_string(),
1678            error: None,
1679            revert_reason: None,
1680            calls: vec![deep_call_2],
1681            logs: Vec::new(),
1682        };
1683
1684        // Create main call containing nested calls (level 1)
1685        CallFrame {
1686            from: "0x742d35Cc6634C0532925a3b8D7C9be8813eeb02e".parse::<Address>().unwrap(),
1687            to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), // BRLCToken
1688            input: Bytes::from(
1689                const_hex::decode(
1690                    "23b872dd000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e10000000000000000000000000000000000000000000000000de0b6b3a7640000", // transferFrom function
1691                )
1692                .unwrap(),
1693            ),
1694            output: Some(Bytes::from(
1695                const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1696            )),
1697            gas: U256::from(100000),
1698            gas_used: U256::from(85000),
1699            value: None,
1700            typ: "CALL".to_string(),
1701            error: None,
1702            revert_reason: None,
1703            calls: vec![nested_call_1, nested_call_2],
1704            logs: Vec::new(),
1705        }
1706    }
1707
1708    #[test]
1709    fn test_contract_name_decoding() {
1710        // Create a 3-level structure using real contract addresses
1711        let test_call = create_simple_test_call_structure();
1712        let geth_trace = GethTrace::CallTracer(test_call);
1713
1714        // Enhance the trace with decoded information
1715        let result = enhance_trace_with_decoded_info(&geth_trace);
1716        let result_str = serde_json::to_string_pretty(&result).unwrap();
1717
1718        assert!(result_str.contains("Cashier_BRLC_v2"));
1719        assert!(result_str.contains("BRLCToken"));
1720        assert!(result_str.contains("USJIMToken"));
1721        assert!(result_str.contains("Compound_Agent_4"));
1722
1723        // Verify function signature decoding for all the expected functions
1724        assert!(result_str.contains("transfer(address,uint256)"));
1725        assert!(result_str.contains("approve(address,uint256)"));
1726        assert!(result_str.contains("transferFrom(address,address,uint256)"));
1727        assert!(result_str.contains("balanceOf(address)"));
1728        assert!(result_str.contains("allowance(address,address)"));
1729    }
1730}