1use std::collections::HashMap;
4use std::ops::Deref;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::LazyLock;
8use std::time::Duration;
9
10use alloy_primitives::U256;
11use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
12use alloy_rpc_types_trace::geth::GethTrace;
13use anyhow::Result;
14use futures::join;
15use http::Method;
16use itertools::Itertools;
17use jsonrpsee::Extensions;
18use jsonrpsee::IntoSubscriptionCloseResponse;
19use jsonrpsee::PendingSubscriptionSink;
20use jsonrpsee::server::BatchRequestConfig;
21use jsonrpsee::server::RandomStringIdProvider;
22use jsonrpsee::server::RpcModule;
23use jsonrpsee::server::Server as RpcServer;
24use jsonrpsee::server::ServerConfig;
25use jsonrpsee::server::ServerHandle;
26use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
27use jsonrpsee::types::Params;
28use jsonrpsee::ws_client::RpcServiceBuilder;
29use parking_lot::RwLock;
30use serde_json::json;
31use tokio::runtime::Handle;
32use tokio::select;
33use tokio::sync::Semaphore;
34use tokio::sync::SemaphorePermit;
35use tower_http::cors::Any;
36use tower_http::cors::CorsLayer;
37use tracing::Instrument;
38use tracing::Span;
39use tracing::field;
40use tracing::info_span;
41
42use crate::GlobalState;
43use crate::NodeMode;
44use crate::alias::AlloyReceipt;
45use crate::alias::JsonValue;
46use crate::config::StratusConfig;
47use crate::eth::codegen;
48use crate::eth::codegen::CONTRACTS;
49use crate::eth::decode;
50use crate::eth::executor::Executor;
51use crate::eth::follower::consensus::Consensus;
52use crate::eth::follower::importer::ImporterConfig;
53use crate::eth::follower::importer::ImporterConsensus;
54use crate::eth::miner::Miner;
55use crate::eth::miner::MinerMode;
56use crate::eth::primitives::Address;
57use crate::eth::primitives::BlockFilter;
58use crate::eth::primitives::Bytes;
59use crate::eth::primitives::CallInput;
60use crate::eth::primitives::ChainId;
61use crate::eth::primitives::ConsensusError;
62use crate::eth::primitives::DecodeInputError;
63use crate::eth::primitives::EvmExecution;
64use crate::eth::primitives::Hash;
65use crate::eth::primitives::ImporterError;
66use crate::eth::primitives::LogFilterInput;
67#[cfg(feature = "dev")]
68use crate::eth::primitives::Nonce;
69use crate::eth::primitives::PointInTime;
70use crate::eth::primitives::RpcError;
71use crate::eth::primitives::SlotIndex;
72#[cfg(feature = "dev")]
73use crate::eth::primitives::SlotValue;
74use crate::eth::primitives::StateError;
75use crate::eth::primitives::StorageError;
76use crate::eth::primitives::StratusError;
77use crate::eth::primitives::TransactionError;
78use crate::eth::primitives::TransactionInput;
79use crate::eth::primitives::TransactionStage;
80#[cfg(feature = "dev")]
81use crate::eth::primitives::Wei;
82use crate::eth::rpc::RpcContext;
83use crate::eth::rpc::RpcHttpMiddleware;
84use crate::eth::rpc::RpcMiddleware;
85use crate::eth::rpc::RpcServerConfig;
86use crate::eth::rpc::RpcSubscriptions;
87use crate::eth::rpc::next_rpc_param;
88use crate::eth::rpc::next_rpc_param_or_default;
89use crate::eth::rpc::rpc_parser::RpcExtensionsExt;
90use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsHandles;
91use crate::eth::storage::ReadKind;
92use crate::eth::storage::StratusStorage;
93use crate::ext::InfallibleExt;
94use crate::ext::WatchReceiverExt;
95use crate::ext::not;
96use crate::ext::parse_duration;
97use crate::ext::to_json_string;
98use crate::ext::to_json_value;
99use crate::infra::build_info;
100use crate::infra::metrics;
101use crate::infra::tracing::SpanExt;
102use crate::log_and_err;
103#[derive(Clone, derive_new::new)]
108pub struct Server {
109 pub storage: Arc<StratusStorage>,
111 pub executor: Arc<Executor>,
112 pub miner: Arc<Miner>,
113 pub importer: Arc<RwLock<Option<Arc<ImporterConsensus>>>>,
114
115 pub app_config: StratusConfig,
117 pub rpc_config: RpcServerConfig,
118 pub chain_id: ChainId,
119}
120
121impl Server {
122 pub async fn serve(self) -> Result<()> {
123 let this = Arc::new(self);
124 this.update_health().await;
125
126 const TASK_NAME: &str = "rpc-server";
127 let health_worker_handle = tokio::task::spawn({
128 let this = Arc::clone(&this);
129 async move {
130 this.health_worker().await;
131 }
132 });
133 this.update_health().await;
134 let mut health = GlobalState::get_health_receiver();
135 health.mark_unchanged();
136 let (server_handle, subscriptions) = loop {
137 let (server_handle, subscriptions) = this._serve().await?;
138 let server_handle_watch = server_handle.clone();
139
140 select! {
142 _ = server_handle_watch.stopped() => {
144 GlobalState::shutdown_from(TASK_NAME, "finished unexpectedly");
145 break (server_handle, subscriptions);
146 },
147 _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
149 let _ = server_handle.stop();
150 break (server_handle, subscriptions);
151 },
152 _ = health.wait_for_change(|healthy| GlobalState::restart_on_unhealthy() && !healthy) => {
154 tracing::info!("health state changed to unhealthy, restarting the rpc server");
155 let _ = server_handle.stop();
156 subscriptions.abort();
157 join!(server_handle.stopped(), subscriptions.stopped());
158 }
159 }
160 };
161 let res = join!(server_handle.stopped(), subscriptions.stopped(), health_worker_handle);
162 res.2?;
163 Ok(())
164 }
165
166 async fn _serve(&self) -> anyhow::Result<(ServerHandle, RpcSubscriptionsHandles)> {
168 let this = self.clone();
169 const TASK_NAME: &str = "rpc-server";
170 tracing::info!(%this.rpc_config.rpc_address, %this.rpc_config.rpc_max_connections, "creating {}", TASK_NAME);
171
172 let subs = RpcSubscriptions::spawn(
174 this.miner.notifier_pending_txs.subscribe(),
175 this.miner.notifier_blocks.subscribe(),
176 this.miner.notifier_logs.subscribe(),
177 );
178
179 let ctx = RpcContext {
181 server: Arc::new(this.clone()),
182 client_version: "stratus",
183 subs: Arc::clone(&subs.connected),
184 };
185
186 let mut module = RpcModule::<RpcContext>::new(ctx);
188 module = register_methods(module)?;
189
190 let cors = CorsLayer::new().allow_methods([Method::POST]).allow_origin(Any).allow_headers(Any);
192 let rpc_middleware = RpcServiceBuilder::new().layer_fn(RpcMiddleware::new);
193 let http_middleware = tower::ServiceBuilder::new().layer(cors).layer_fn(RpcHttpMiddleware::new).layer(
194 ProxyGetRequestLayer::new([
195 ("/health", "stratus_health"),
196 ("/version", "stratus_version"),
197 ("/config", "stratus_config"),
198 ("/state", "stratus_state"),
199 ])
200 .unwrap(),
201 );
202
203 let server_config = ServerConfig::builder()
204 .set_id_provider(RandomStringIdProvider::new(8))
205 .max_connections(this.rpc_config.rpc_max_connections)
206 .max_response_body_size(this.rpc_config.rpc_max_response_size_bytes)
207 .set_batch_request_config(BatchRequestConfig::Limit(this.rpc_config.batch_request_limit))
208 .build();
209
210 let server = RpcServer::builder()
212 .set_rpc_middleware(rpc_middleware)
213 .set_http_middleware(http_middleware)
214 .set_config(server_config)
215 .build(this.rpc_config.rpc_address)
216 .await?;
217
218 let handle_rpc_server = server.start(module);
219 Ok((handle_rpc_server, subs.handles))
220 }
221
222 pub async fn health_worker(&self) {
223 let mut interval = tokio::time::interval(std::time::Duration::from_millis(self.rpc_config.health_check_interval_ms));
225
226 loop {
228 self.update_health().await;
230
231 interval.tick().await;
233 if GlobalState::is_shutdown() {
234 break;
235 }
236 }
237 }
238
239 async fn update_health(&self) {
240 let is_healthy = self.health().await;
241 GlobalState::set_health(is_healthy);
242 metrics::set_consensus_is_ready(if is_healthy { 1u64 } else { 0u64 });
243 }
244
245 fn read_importer(&self) -> Option<Arc<ImporterConsensus>> {
246 self.importer.read().as_ref().map(Arc::clone)
247 }
248
249 pub fn set_importer(&self, importer: Option<Arc<ImporterConsensus>>) {
250 *self.importer.write() = importer;
251 }
252
253 async fn health(&self) -> bool {
254 match GlobalState::get_node_mode() {
255 NodeMode::Leader | NodeMode::FakeLeader => true,
256 NodeMode::Follower =>
257 if GlobalState::is_importer_shutdown() {
258 tracing::warn!("stratus is unhealthy because importer is shutdown");
259 false
260 } else {
261 match self.read_importer() {
262 Some(importer) => importer.should_serve().await,
263 None => {
264 tracing::warn!("stratus is unhealthy because importer is not available");
265 false
266 }
267 }
268 },
269 }
270 }
271}
272
273fn register_methods(mut module: RpcModule<RpcContext>) -> anyhow::Result<RpcModule<RpcContext>> {
274 #[cfg(feature = "dev")]
276 {
277 module.register_blocking_method("evm_setNextBlockTimestamp", evm_set_next_block_timestamp)?;
278 module.register_blocking_method("evm_mine", evm_mine)?;
279 module.register_blocking_method("hardhat_reset", stratus_reset)?;
280 module.register_blocking_method("stratus_reset", stratus_reset)?;
281 module.register_blocking_method("hardhat_setStorageAt", stratus_set_storage_at)?;
282 module.register_blocking_method("stratus_setStorageAt", stratus_set_storage_at)?;
283 module.register_blocking_method("hardhat_setNonce", stratus_set_nonce)?;
284 module.register_blocking_method("stratus_setNonce", stratus_set_nonce)?;
285 module.register_blocking_method("hardhat_setBalance", stratus_set_balance)?;
286 module.register_blocking_method("stratus_setBalance", stratus_set_balance)?;
287 module.register_blocking_method("hardhat_setCode", stratus_set_code)?;
288 module.register_blocking_method("stratus_setCode", stratus_set_code)?;
289 module.register_blocking_method("stratus_clearCache", stratus_clear_cache)?;
290 }
291
292 module.register_async_method("stratus_health", stratus_health)?;
294
295 module.register_method("stratus_enableTransactions", stratus_enable_transactions)?;
297 module.register_method("stratus_disableTransactions", stratus_disable_transactions)?;
298 module.register_method("stratus_enableMiner", stratus_enable_miner)?;
299 module.register_method("stratus_disableMiner", stratus_disable_miner)?;
300 module.register_method("stratus_enableUnknownClients", stratus_enable_unknown_clients)?;
301 module.register_method("stratus_disableUnknownClients", stratus_disable_unknown_clients)?;
302 module.register_method("stratus_enableRestartOnUnhealthy", stratus_enable_restart_on_unhealthy)?;
303 module.register_method("stratus_disableRestartOnUnhealthy", stratus_disable_restart_on_unhealthy)?;
304 module.register_async_method("stratus_changeToLeader", stratus_change_to_leader)?;
305 module.register_async_method("stratus_changeToFollower", stratus_change_to_follower)?;
306 module.register_async_method("stratus_initImporter", stratus_init_importer)?;
307 module.register_method("stratus_shutdownImporter", stratus_shutdown_importer)?;
308 module.register_async_method("stratus_changeMinerMode", stratus_change_miner_mode)?;
309
310 module.register_method("stratus_version", stratus_version)?;
312 module.register_method("stratus_config", stratus_config)?;
313 module.register_method("stratus_state", stratus_state)?;
314
315 module.register_async_method("stratus_getSubscriptions", stratus_get_subscriptions)?;
316 module.register_method("stratus_pendingTransactionsCount", stratus_pending_transactions_count)?;
317
318 module.register_method("net_version", net_version)?;
320 module.register_async_method("net_listening", net_listening)?;
321 module.register_method("eth_chainId", eth_chain_id)?;
322 module.register_method("web3_clientVersion", web3_client_version)?;
323
324 module.register_method("eth_gasPrice", eth_gas_price)?;
326
327 module.register_blocking_method("stratus_getBlockAndReceipts", stratus_get_block_and_receipts)?;
329 module.register_blocking_method("stratus_getBlockWithChanges", stratus_get_block_with_changes)?;
330
331 module.register_blocking_method("eth_blockNumber", eth_block_number)?;
333 module.register_blocking_method("eth_getBlockByNumber", eth_get_block_by_number)?;
334 module.register_blocking_method("eth_getBlockByHash", eth_get_block_by_hash)?;
335 module.register_blocking_method("stratus_getBlockByTimestamp", stratus_get_block_by_timestamp)?;
336 module.register_method("eth_getUncleByBlockHashAndIndex", eth_get_uncle_by_block_hash_and_index)?;
337
338 module.register_blocking_method("eth_getTransactionByHash", eth_get_transaction_by_hash)?;
340 module.register_blocking_method("eth_getTransactionReceipt", eth_get_transaction_receipt)?;
341 module.register_blocking_method("eth_estimateGas", eth_estimate_gas)?;
342 module.register_blocking_method("eth_call", eth_call)?;
343 module.register_blocking_method("eth_sendRawTransaction", eth_send_raw_transaction)?;
344 module.register_blocking_method("stratus_call", stratus_call)?;
345 module.register_blocking_method("stratus_getTransactionResult", stratus_get_transaction_result)?;
346 module.register_blocking_method("debug_traceTransaction", debug_trace_transaction)?;
347
348 module.register_blocking_method("eth_getLogs", eth_get_logs)?;
350
351 module.register_method("eth_accounts", eth_accounts)?;
353 module.register_blocking_method("eth_getTransactionCount", eth_get_transaction_count)?;
354 module.register_blocking_method("eth_getBalance", eth_get_balance)?;
355 module.register_blocking_method("eth_getCode", eth_get_code)?;
356
357 module.register_blocking_method("eth_getStorageAt", eth_get_storage_at)?;
359
360 module.register_subscription("eth_subscribe", "eth_subscription", "eth_unsubscribe", eth_subscribe)?;
362
363 Ok(module)
364}
365
366#[cfg(feature = "dev")]
371fn evm_mine(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
372 ctx.server.miner.mine_local_and_commit()?;
373 Ok(to_json_value(true))
374}
375
376#[cfg(feature = "dev")]
377fn stratus_clear_cache(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
378 ctx.server.storage.clear_cache();
379 Ok(to_json_value(true))
380}
381
382#[cfg(feature = "dev")]
383fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
384 use crate::eth::primitives::UnixTime;
385 use crate::log_and_err;
386
387 let (_, timestamp) = next_rpc_param::<UnixTime>(params.sequence())?;
388 let latest = ctx.server.storage.read_block(BlockFilter::Latest)?;
389 match latest {
390 Some(block) => UnixTime::set_offset(block.header.timestamp, timestamp)?,
391 None => return log_and_err!("reading latest block returned None")?,
392 }
393 Ok(to_json_value(timestamp))
394}
395
396#[allow(unused_variables)]
401async fn stratus_health(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
402 #[cfg(feature = "dev")]
403 ctx.server.update_health().await;
404
405 if GlobalState::is_shutdown() {
406 tracing::warn!("liveness check failed because of shutdown");
407 return Err(StateError::StratusShutdown.into());
408 }
409
410 if GlobalState::is_healthy() {
411 Ok(json!(true))
412 } else {
413 Err(StateError::StratusNotReady.into())
414 }
415}
416
417#[cfg(feature = "dev")]
422fn stratus_reset(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
423 ctx.server.storage.reset_to_genesis()?;
424 Ok(to_json_value(true))
425}
426
427#[cfg(feature = "dev")]
428fn stratus_set_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
429 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
430 let (params, index) = next_rpc_param::<SlotIndex>(params)?;
431 let (_, value) = next_rpc_param::<SlotValue>(params)?;
432
433 tracing::info!(%address, %index, %value, "setting storage at address and index");
434
435 ctx.server.storage.set_storage_at(address, index, value)?;
436
437 Ok(to_json_value(true))
438}
439
440#[cfg(feature = "dev")]
441fn stratus_set_nonce(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
442 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
443 let (_, nonce) = next_rpc_param::<Nonce>(params)?;
444
445 tracing::info!(%address, %nonce, "setting nonce for address");
446
447 ctx.server.storage.set_nonce(address, nonce)?;
448
449 Ok(to_json_value(true))
450}
451
452#[cfg(feature = "dev")]
453fn stratus_set_balance(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
454 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
455 let (_, balance) = next_rpc_param::<Wei>(params)?;
456
457 tracing::info!(%address, %balance, "setting balance for address");
458
459 ctx.server.storage.set_balance(address, balance)?;
460
461 Ok(to_json_value(true))
462}
463
464#[cfg(feature = "dev")]
465fn stratus_set_code(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
466 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
467 let (_, code) = next_rpc_param::<Bytes>(params)?;
468
469 tracing::info!(%address, code_size = %code.0.len(), "setting code for address");
470
471 ctx.server.storage.set_code(address, code)?;
472
473 Ok(to_json_value(true))
474}
475
476static MODE_CHANGE_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
477
478async fn stratus_change_to_leader(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
479 ext.authentication().auth_admin()?;
480 let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
481 let _permit: SemaphorePermit = match permit {
482 Ok(permit) => permit,
483 Err(_) => return Err(StateError::ModeChangeInProgress.into()),
484 };
485
486 const LEADER_MINER_INTERVAL: Duration = Duration::from_secs(1);
487 tracing::info!("starting process to change node to leader");
488
489 if GlobalState::get_node_mode() == NodeMode::Leader {
490 tracing::info!("node is already in leader mode, no changes made");
491 return Ok(json!(false));
492 }
493
494 if GlobalState::is_transactions_enabled() {
495 tracing::error!("transactions are currently enabled, cannot change node mode");
496 return Err(StateError::TransactionsEnabled.into());
497 }
498
499 tracing::info!("shutting down importer");
500 let shutdown_importer_result = stratus_shutdown_importer(Params::new(None), &ctx, &ext);
501 match shutdown_importer_result {
502 Ok(_) => tracing::info!("importer shutdown successfully"),
503 Err(StratusError::Importer(ImporterError::AlreadyShutdown)) => {
504 tracing::warn!("importer is already shutdown, continuing");
505 }
506 Err(e) => {
507 tracing::error!(reason = ?e, "failed to shutdown importer");
508 return Err(e);
509 }
510 }
511
512 tracing::info!("wait for importer to shutdown");
513 GlobalState::wait_for_importer_to_finish().await;
514
515 let change_miner_mode_result = change_miner_mode(MinerMode::Interval(LEADER_MINER_INTERVAL), &ctx).await;
516 if let Err(e) = change_miner_mode_result {
517 tracing::error!(reason = ?e, "failed to change miner mode");
518 return Err(e);
519 }
520 tracing::info!("miner mode changed to interval(1s) successfully");
521
522 GlobalState::set_node_mode(NodeMode::Leader);
523 ctx.server.storage.clear_cache();
524 tracing::info!("node mode changed to leader successfully, cache cleared");
525
526 Ok(json!(true))
527}
528
529async fn stratus_change_to_follower(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
530 ext.authentication().auth_admin()?;
531 let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
532 let _permit: SemaphorePermit = match permit {
533 Ok(permit) => permit,
534 Err(_) => return Err(StateError::ModeChangeInProgress.into()),
535 };
536
537 tracing::info!("starting process to change node to follower");
538
539 if GlobalState::get_node_mode() == NodeMode::Follower {
540 tracing::info!("node is already in follower mode, no changes made");
541 return Ok(json!(false));
542 }
543
544 if GlobalState::is_transactions_enabled() {
545 tracing::error!("transactions are currently enabled, cannot change node mode");
546 return Err(StateError::TransactionsEnabled.into());
547 }
548
549 let pending_txs = ctx.server.storage.pending_transactions();
550 if not(pending_txs.is_empty()) {
551 tracing::error!(pending_txs = ?pending_txs.len(), "cannot change to follower mode with pending transactions");
552 return Err(StorageError::PendingTransactionsExist {
553 pending_txs: pending_txs.len(),
554 }
555 .into());
556 }
557
558 let change_miner_mode_result = change_miner_mode(MinerMode::External, &ctx).await;
559 if let Err(e) = change_miner_mode_result {
560 tracing::error!(reason = ?e, "failed to change miner mode");
561 return Err(e);
562 }
563 tracing::info!("miner mode changed to external successfully");
564
565 GlobalState::set_node_mode(NodeMode::Follower);
566 ctx.server.storage.clear_cache();
567 tracing::info!("storage cache cleared");
568
569 tracing::info!("initializing importer");
570 let init_importer_result = stratus_init_importer(params, Arc::clone(&ctx), ext).await;
571 match init_importer_result {
572 Ok(_) => tracing::info!("importer initialized successfully"),
573 Err(StratusError::Importer(ImporterError::AlreadyRunning)) => {
574 tracing::warn!("importer is already running, continuing");
575 }
576 Err(e) => {
577 tracing::error!(reason = ?e, "failed to initialize importer, reverting node mode to leader");
578 GlobalState::set_node_mode(NodeMode::Leader);
579 return Err(e);
580 }
581 }
582 tracing::info!("node mode changed to follower successfully");
583
584 Ok(json!(true))
585}
586
587async fn stratus_init_importer(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
588 ext.authentication().auth_admin()?;
589 let (params, external_rpc) = next_rpc_param::<String>(params.sequence())?;
590 let (params, external_rpc_ws) = next_rpc_param::<String>(params)?;
591 let (params, raw_external_rpc_timeout) = next_rpc_param::<String>(params)?;
592 let (params, raw_sync_interval) = next_rpc_param::<String>(params)?;
593 let (_, raw_external_rpc_max_response_size_bytes) = next_rpc_param::<String>(params)?;
594
595 let external_rpc_timeout = parse_duration(&raw_external_rpc_timeout).map_err(|e| {
596 tracing::error!(reason = ?e, "failed to parse external_rpc_timeout");
597 ImporterError::ConfigParseError
598 })?;
599
600 let sync_interval = parse_duration(&raw_sync_interval).map_err(|e| {
601 tracing::error!(reason = ?e, "failed to parse sync_interval");
602 ImporterError::ConfigParseError
603 })?;
604
605 let external_rpc_max_response_size_bytes = raw_external_rpc_max_response_size_bytes.parse::<u32>().map_err(|e| {
606 tracing::error!(reason = ?e, "failed to parse external_rpc_max_response_size_bytes");
607 ImporterError::ConfigParseError
608 })?;
609
610 let importer_config = ImporterConfig {
611 external_rpc,
612 external_rpc_ws: Some(external_rpc_ws),
613 external_rpc_timeout,
614 sync_interval,
615 external_rpc_max_response_size_bytes,
616 enable_block_changes_replication: std::env::var("ENABLE_BLOCK_CHANGES_REPLICATION")
617 .ok()
618 .is_some_and(|val| val == "1" || val == "true"),
619 };
620
621 importer_config.init_follower_importer(ctx).await
622}
623
624fn stratus_shutdown_importer(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<JsonValue, StratusError> {
625 ext.authentication().auth_admin()?;
626 if GlobalState::get_node_mode() != NodeMode::Follower {
627 tracing::error!("node is currently not a follower");
628 return Err(StateError::StratusNotFollower.into());
629 }
630
631 if GlobalState::is_importer_shutdown() && ctx.server.read_importer().is_none() {
632 tracing::error!("importer is already shut down");
633 return Err(ImporterError::AlreadyShutdown.into());
634 }
635
636 ctx.server.set_importer(None);
637
638 const TASK_NAME: &str = "rpc-server::importer-shutdown";
639 GlobalState::shutdown_importer_from(TASK_NAME, "received importer shutdown request");
640
641 Ok(json!(true))
642}
643
644async fn stratus_change_miner_mode(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
645 ext.authentication().auth_admin()?;
646 let (_, mode_str) = next_rpc_param::<String>(params.sequence())?;
647
648 let mode = MinerMode::from_str(&mode_str).map_err(|e| {
649 tracing::error!(reason = ?e, "failed to parse miner mode");
650 RpcError::MinerModeParamInvalid
651 })?;
652
653 change_miner_mode(mode, &ctx).await
654}
655
656async fn change_miner_mode(new_mode: MinerMode, ctx: &RpcContext) -> Result<JsonValue, StratusError> {
660 if GlobalState::is_transactions_enabled() {
661 tracing::error!("cannot change miner mode while transactions are enabled");
662 return Err(StateError::TransactionsEnabled.into());
663 }
664
665 let previous_mode = ctx.server.miner.mode();
666
667 if previous_mode == new_mode {
668 tracing::warn!(?new_mode, current = ?new_mode, "miner mode already set, skipping");
669 return Ok(json!(false));
670 }
671
672 if not(ctx.server.miner.is_paused()) && previous_mode.is_interval() {
673 return log_and_err!("can't change miner mode from Interval without pausing it first").map_err(Into::into);
674 }
675
676 match new_mode {
677 MinerMode::External => {
678 tracing::info!("changing miner mode to External");
679
680 let pending_txs = ctx.server.storage.pending_transactions();
681 if not(pending_txs.is_empty()) {
682 tracing::error!(pending_txs = ?pending_txs.len(), "cannot change miner mode to External with pending transactions");
683 return Err(StorageError::PendingTransactionsExist {
684 pending_txs: pending_txs.len(),
685 }
686 .into());
687 }
688
689 ctx.server.miner.switch_to_external_mode().await;
690 }
691 MinerMode::Interval(duration) => {
692 tracing::info!(duration = ?duration, "changing miner mode to Interval");
693
694 if ctx.server.read_importer().is_some() {
695 tracing::error!("cannot change miner mode to Interval with consensus set");
696 return Err(ConsensusError::Set.into());
697 }
698
699 ctx.server.miner.start_interval_mining(duration).await;
700 }
701 MinerMode::Automine => {
702 return log_and_err!("Miner mode change to 'automine' is unsupported.").map_err(Into::into);
703 }
704 }
705
706 Ok(json!(true))
707}
708
709fn stratus_enable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
710 ext.authentication().auth_admin()?;
711 GlobalState::set_unknown_client_enabled(true);
712 Ok(GlobalState::is_unknown_client_enabled())
713}
714
715fn stratus_disable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
716 ext.authentication().auth_admin()?;
717 GlobalState::set_unknown_client_enabled(false);
718 Ok(GlobalState::is_unknown_client_enabled())
719}
720
721fn stratus_enable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
722 ext.authentication().auth_admin()?;
723 GlobalState::set_transactions_enabled(true);
724 Ok(GlobalState::is_transactions_enabled())
725}
726
727fn stratus_disable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
728 ext.authentication().auth_admin()?;
729 GlobalState::set_transactions_enabled(false);
730 Ok(GlobalState::is_transactions_enabled())
731}
732
733fn stratus_enable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
734 ext.authentication().auth_admin()?;
735 ctx.server.miner.unpause();
736 Ok(true)
737}
738
739fn stratus_disable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
740 ext.authentication().auth_admin()?;
741 ctx.server.miner.pause();
742 Ok(false)
743}
744
745fn stratus_pending_transactions_count(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> usize {
747 ctx.server.storage.pending_transactions().len()
748}
749
750fn stratus_disable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
751 ext.authentication().auth_admin()?;
752 GlobalState::set_restart_on_unhealthy(false);
753 Ok(GlobalState::restart_on_unhealthy())
754}
755
756fn stratus_enable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
757 ext.authentication().auth_admin()?;
758 GlobalState::set_restart_on_unhealthy(true);
759 Ok(GlobalState::restart_on_unhealthy())
760}
761
762fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
767 Ok(build_info::as_json())
768}
769
770fn stratus_config(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<StratusConfig, StratusError> {
771 ext.authentication().auth_admin()?;
772 Ok(ctx.server.app_config.clone())
773}
774
775fn stratus_state(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
776 Ok(GlobalState::get_global_state_as_json(ctx))
777}
778
779async fn stratus_get_subscriptions(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
780 let pending_txs = serde_json::to_value(ctx.subs.pending_txs.read().await.values().collect_vec()).expect_infallible();
782 let new_heads = serde_json::to_value(ctx.subs.new_heads.read().await.values().collect_vec()).expect_infallible();
783 let logs = serde_json::to_value(ctx.subs.logs.read().await.values().flat_map(HashMap::values).collect_vec()).expect_infallible();
784
785 let response = json!({
786 "newPendingTransactions": pending_txs,
787 "newHeads": new_heads,
788 "logs": logs,
789 });
790 Ok(response)
791}
792
793async fn net_listening(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
798 let net_listening = stratus_health(params, ctx, ext).await;
799
800 tracing::info!(net_listening = ?net_listening, "network listening status");
801
802 net_listening
803}
804
805fn net_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
806 ctx.server.chain_id.to_string()
807}
808
809fn eth_chain_id(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
810 hex_num(ctx.server.chain_id)
811}
812
813fn web3_client_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
814 ctx.client_version.to_owned()
815}
816
817fn eth_gas_price(_: Params<'_>, _: &RpcContext, _: &Extensions) -> String {
822 hex_zero()
823}
824
825fn eth_block_number(_params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
830 let _middleware_enter = ext.enter_middleware_span();
832 let _method_enter = info_span!("rpc::eth_blockNumber", block_number = field::Empty).entered();
833
834 let block_number = ctx.server.storage.read_mined_block_number();
836 Span::with(|s| s.rec_str("block_number", &block_number));
837
838 Ok(to_json_value(block_number))
839}
840
841fn stratus_get_block_and_receipts(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
842 let _middleware_enter = ext.enter_middleware_span();
844 let _method_enter = info_span!("rpc::stratus_getBlockAndReceipts").entered();
845
846 let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
848
849 tracing::info!(%filter, "reading block and receipts");
851
852 let Some(block) = ctx.server.storage.read_block(filter)? else {
853 tracing::info!(%filter, "block not found");
854 return Ok(JsonValue::Null);
855 };
856
857 tracing::info!(%filter, "block with transactions found");
858 let receipts = block.transactions.iter().cloned().map(AlloyReceipt::from).collect::<Vec<_>>();
859
860 Ok(json!({
861 "block": block.to_json_rpc_with_full_transactions(),
862 "receipts": receipts,
863 }))
864}
865
866fn stratus_get_block_with_changes(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
867 let _middleware_enter = ext.enter_middleware_span();
869 let _method_enter = info_span!("rpc::stratus_getBlockWithChanges").entered();
870
871 let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
873
874 tracing::info!(%filter, "reading block and changes");
876
877 let Some(block) = ctx.server.storage.read_block_with_changes(filter)? else {
878 tracing::info!(%filter, "block not found");
879 return Ok(JsonValue::Null);
880 };
881
882 tracing::info!(%filter, "block with changes found");
883
884 Ok(json!(block))
885}
886
887fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
888 let _middleware_enter = ext.enter_middleware_span();
889
890 let _method_enter = info_span!(
891 "rpc::eth_getBlockByHash",
892 filter = field::Empty,
893 block_number = field::Empty,
894 found = field::Empty
895 )
896 .entered();
897
898 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
899 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
900
901 match &filter {
902 BlockFilter::Hash(_) => (),
903 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
904 }
905
906 eth_get_block_by_selector(filter, full_transactions, ctx)
907}
908
909fn eth_get_block_by_number(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
910 let _middleware_enter = ext.enter_middleware_span();
911
912 let _method_enter = info_span!(
913 "rpc::eth_getBlockByNumber",
914 filter = field::Empty,
915 block_number = field::Empty,
916 found = field::Empty
917 )
918 .entered();
919
920 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
921 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
922
923 match &filter {
924 BlockFilter::Earliest | BlockFilter::Latest | BlockFilter::Number(_) | BlockFilter::Pending => (),
925 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
926 }
927
928 eth_get_block_by_selector(filter, full_transactions, ctx)
929}
930
931#[inline(always)]
932fn eth_get_block_by_selector(filter: BlockFilter, full_transactions: bool, ctx: Arc<RpcContext>) -> Result<JsonValue, StratusError> {
933 Span::with(|s| s.rec_str("filter", &filter));
935 tracing::info!(%filter, %full_transactions, "reading block");
936
937 let block = ctx.server.storage.read_block(filter)?;
939 Span::with(|s| {
940 s.record("found", block.is_some());
941 if let Some(ref block) = block {
942 s.rec_str("block_number", &block.number());
943 }
944 });
945 match (block, full_transactions) {
946 (Some(block), true) => {
947 tracing::info!(%filter, "block with full transactions found");
948 Ok(block.to_json_rpc_with_full_transactions())
949 }
950 (Some(block), false) => {
951 tracing::info!(%filter, "block with only hashes found");
952 Ok(block.to_json_rpc_with_transactions_hashes())
953 }
954 (None, _) => {
955 tracing::info!(%filter, "block not found");
956 Ok(JsonValue::Null)
957 }
958 }
959}
960
961fn eth_get_uncle_by_block_hash_and_index(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
962 Ok(JsonValue::Null)
963}
964
965fn stratus_get_block_by_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
966 let _middleware_enter = ext.enter_middleware_span();
967 let _method_enter = info_span!(
968 "rpc::stratus_getBlockByTimestamp",
969 timestamp = field::Empty,
970 block_number = field::Empty,
971 found = field::Empty
972 )
973 .entered();
974
975 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
976 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
977
978 match &filter {
979 BlockFilter::Timestamp { .. } => (),
980 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
981 }
982
983 eth_get_block_by_selector(filter, full_transactions, ctx)
984}
985
986fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
991 let _middleware_enter = ext.enter_middleware_span();
993 let _method_enter = info_span!("rpc::eth_getTransactionByHash", tx_hash = field::Empty, found = field::Empty).entered();
994
995 let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
997
998 Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1000 tracing::info!(%tx_hash, "reading transaction");
1001
1002 let tx = ctx.server.storage.read_transaction(tx_hash)?;
1004 Span::with(|s| {
1005 s.record("found", tx.is_some());
1006 });
1007
1008 match tx {
1009 Some(tx) => {
1010 tracing::info!(%tx_hash, "transaction found");
1011 Ok(tx.to_json_rpc_transaction())
1012 }
1013 None => {
1014 tracing::info!(%tx_hash, "transaction not found");
1015 Ok(JsonValue::Null)
1016 }
1017 }
1018}
1019
1020fn rpc_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<Option<TransactionStage>, StratusError> {
1021 let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1023
1024 Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1026 tracing::info!("reading transaction receipt");
1027
1028 let tx = ctx.server.storage.read_transaction(tx_hash)?;
1030 Span::with(|s| {
1031 s.record("found", tx.is_some());
1032 });
1033
1034 Ok(tx)
1035}
1036
1037fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1038 let _middleware_enter = ext.enter_middleware_span();
1040 let _method_enter = info_span!("rpc::eth_getTransactionReceipt", tx_hash = field::Empty, found = field::Empty).entered();
1041
1042 match rpc_get_transaction_receipt(params, ctx)? {
1043 Some(tx) => {
1044 tracing::info!("transaction receipt found");
1045 Ok(tx.to_json_rpc_receipt())
1046 }
1047 None => {
1048 tracing::info!("transaction receipt not found");
1049 Ok(JsonValue::Null)
1050 }
1051 }
1052}
1053
1054fn stratus_get_transaction_result(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1055 let _middleware_enter = ext.enter_middleware_span();
1057 let _method_enter = info_span!("rpc::stratus_get_transaction_result", tx_hash = field::Empty, found = field::Empty).entered();
1058
1059 match rpc_get_transaction_receipt(params, ctx)? {
1060 Some(tx) => {
1061 tracing::info!("transaction receipt found");
1062 Ok(to_json_value(tx.to_result().execution.result))
1063 }
1064 None => {
1065 tracing::info!("transaction receipt not found");
1066 Ok(JsonValue::Null)
1067 }
1068 }
1069}
1070
1071fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1072 let _middleware_enter = ext.enter_middleware_span();
1074 let _method_enter = info_span!("rpc::eth_estimateGas", tx_from = field::Empty, tx_to = field::Empty).entered();
1075
1076 let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;
1078
1079 Span::with(|s| {
1081 s.rec_opt("tx_from", &call.from);
1082 s.rec_opt("tx_to", &call.to);
1083 });
1084 tracing::info!("executing eth_estimateGas");
1085
1086 match ctx.server.executor.execute_local_call(call, PointInTime::Mined) {
1088 Ok(result) if result.is_success() => {
1090 tracing::info!(tx_output = %result.output, "executed eth_estimateGas with success");
1091 let overestimated_gas = (result.gas_used.as_u64()) as f64 * 1.1;
1092 Ok(hex_num(U256::from(overestimated_gas as u64)))
1093 }
1094
1095 Ok(result) => {
1097 tracing::warn!(tx_output = %result.output, "executed eth_estimateGas with failure");
1098 Err(TransactionError::RevertedCall { output: result.output }.into())
1099 }
1100
1101 Err(e) => {
1103 tracing::warn!(reason = ?e, "failed to execute eth_estimateGas");
1104 Err(e)
1105 }
1106 }
1107}
1108
1109fn rpc_call(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<EvmExecution, StratusError> {
1110 let (params, call) = next_rpc_param::<CallInput>(params.sequence())?;
1112 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1113
1114 Span::with(|s| {
1116 s.rec_opt("tx_from", &call.from);
1117 s.rec_opt("tx_to", &call.to);
1118 s.rec_str("filter", &filter);
1119 });
1120 tracing::info!(%filter, "executing eth_call");
1121
1122 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1124 ctx.server.executor.execute_local_call(call, point_in_time)
1125}
1126
1127fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1128 let _middleware_enter = ext.enter_middleware_span();
1130 let _method_enter = info_span!("rpc::eth_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1131
1132 match rpc_call(params, ctx) {
1133 Ok(result) if result.is_success() => {
1135 tracing::info!(tx_output = %result.output, "executed eth_call with success");
1136 Ok(hex_data(result.output))
1137 }
1138 Ok(result) => {
1140 tracing::warn!(tx_output = %result.output, "executed eth_call with failure");
1141 Err(TransactionError::RevertedCall { output: result.output }.into())
1142 }
1143 Err(e) => {
1145 tracing::warn!(reason = ?e, "failed to execute eth_call");
1146 Err(e)
1147 }
1148 }
1149}
1150
1151fn debug_trace_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1152 let _middleware_enter = ext.enter_middleware_span();
1154 let _method_enter = info_span!("rpc::debug_traceTransaction", tx_hash = field::Empty,).entered();
1155
1156 let (params, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1157 let (_, opts) = next_rpc_param_or_default::<Option<GethDebugTracingOptions>>(params)?;
1158 let trace_unsuccessful_only = ctx
1159 .server
1160 .rpc_config
1161 .rpc_debug_trace_unsuccessful_only
1162 .as_ref()
1163 .is_some_and(|inner| inner.contains(ext.rpc_client()));
1164
1165 match ctx.server.executor.trace_transaction(tx_hash, opts, trace_unsuccessful_only) {
1166 Ok(result) => {
1167 tracing::info!(?tx_hash, "executed debug_traceTransaction successfully");
1168
1169 let enhanced_response = enhance_trace_with_decoded_info(&result);
1171
1172 Ok(enhanced_response)
1173 }
1174 Err(err) => {
1175 tracing::warn!(?err, "error executing debug_traceTransaction");
1176 Err(err)
1177 }
1178 }
1179}
1180
1181fn stratus_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1182 let _middleware_enter = ext.enter_middleware_span();
1184 let _method_enter = info_span!("rpc::stratus_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1185
1186 match rpc_call(params, ctx) {
1187 Ok(result) if result.is_success() => {
1189 tracing::info!(tx_output = %result.output, "executed stratus_call with success");
1190 Ok(hex_data(result.output))
1191 }
1192 Ok(result) => {
1194 tracing::warn!(tx_output = %result.output, "executed stratus_call with failure");
1195 Err(TransactionError::RevertedCallWithReason {
1196 reason: (&result.output).into(),
1197 }
1198 .into())
1199 }
1200 Err(e) => {
1202 tracing::warn!(reason = ?e, "failed to execute stratus_call");
1203 Err(e)
1204 }
1205 }
1206}
1207
1208fn eth_send_raw_transaction(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1209 let _middleware_enter = ext.enter_middleware_span();
1211 let _method_enter = info_span!(
1212 "rpc::eth_sendRawTransaction",
1213 tx_hash = field::Empty,
1214 tx_from = field::Empty,
1215 tx_to = field::Empty,
1216 tx_nonce = field::Empty
1217 )
1218 .entered();
1219
1220 let (tx, tx_data) = match (ext.get::<TransactionInput>(), ext.get::<Bytes>()) {
1222 (Some(tx), Some(data)) => (tx.clone(), data.clone()),
1223 _ => {
1224 tracing::error!("failed to execute eth_sendRawTransaction because transaction input is not available");
1225 return Err(RpcError::TransactionInvalid {
1226 decode_error: "transaction input is not available".to_string(),
1227 }
1228 .into());
1229 }
1230 };
1231 let tx_hash = tx.transaction_info.hash;
1232
1233 Span::with(|s| {
1235 s.rec_str("tx_hash", &tx_hash);
1236 s.rec_str("tx_from", &tx.execution_info.signer);
1237 s.rec_opt("tx_to", &tx.execution_info.to);
1238 s.rec_str("tx_nonce", &tx.execution_info.nonce);
1239 });
1240
1241 if not(GlobalState::is_transactions_enabled()) {
1242 tracing::warn!(%tx_hash, "failed to execute eth_sendRawTransaction because transactions are disabled");
1243 return Err(StateError::TransactionsDisabled.into());
1244 }
1245
1246 match GlobalState::get_node_mode() {
1248 NodeMode::Leader | NodeMode::FakeLeader => match ctx.server.executor.execute_local_transaction(tx) {
1249 Ok(_) => Ok(hex_data(tx_hash)),
1250 Err(e) => {
1251 tracing::warn!(reason = ?e, ?tx_hash, "failed to execute eth_sendRawTransaction");
1252 Err(e)
1253 }
1254 },
1255 NodeMode::Follower => match &ctx.server.read_importer() {
1256 Some(importer) => match Handle::current().block_on(importer.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) {
1257 Ok(hash) => Ok(hex_data(hash)),
1258 Err(e) => Err(e),
1259 },
1260 None => {
1261 tracing::error!("unable to forward transaction because consensus is temporarily unavailable for follower node");
1262 Err(ConsensusError::Unavailable.into())
1263 }
1264 },
1265 }
1266}
1267
1268fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1273 const MAX_BLOCK_RANGE: u64 = 5_000;
1274
1275 let _middleware_enter = ext.enter_middleware_span();
1277 let _method_enter = info_span!(
1278 "rpc::eth_getLogs",
1279 filter = field::Empty,
1280 filter_from = field::Empty,
1281 filter_to = field::Empty,
1282 filter_range = field::Empty,
1283 filter_event = field::Empty
1284 )
1285 .entered();
1286
1287 let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params.sequence())?;
1289 let mut filter = filter_input.parse(&ctx.server.storage)?;
1290
1291 let to_block = match filter.to_block {
1293 Some(block) => block,
1294 None => {
1295 let block = ctx.server.storage.read_mined_block_number();
1296 filter.to_block = Some(block);
1297 block
1298 }
1299 };
1300 let blocks_in_range = filter.from_block.count_to(to_block);
1301
1302 let event_name = codegen::event_names_from_filter(&filter);
1303
1304 Span::with(|s| {
1306 s.rec_str("filter", &to_json_string(&filter));
1307 s.rec_str("filter_from", &filter.from_block);
1308 s.rec_str("filter_to", &to_block);
1309 s.rec_str("filter_range", &blocks_in_range);
1310 s.rec_str("filter_event", &event_name);
1311 });
1312 tracing::info!(?filter, filter_event = %event_name, "reading logs");
1313
1314 if blocks_in_range > MAX_BLOCK_RANGE {
1316 return Err(RpcError::BlockRangeInvalid {
1317 actual: blocks_in_range,
1318 max: MAX_BLOCK_RANGE,
1319 }
1320 .into());
1321 }
1322
1323 let logs = ctx.server.storage.read_logs(&filter)?;
1325 Ok(JsonValue::Array(logs.into_iter().map(|x| x.to_json_rpc_log()).collect()))
1326}
1327
1328fn eth_accounts(_: Params<'_>, _ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
1333 Ok(json!([]))
1334}
1335
1336fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1337 let _middleware_enter = ext.enter_middleware_span();
1339 let _method_enter = info_span!("rpc::eth_getTransactionCount", address = field::Empty, filter = field::Empty).entered();
1340
1341 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1343 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1344
1345 Span::with(|s| {
1347 s.rec_str("address", &address);
1348 s.rec_str("address", &filter);
1349 });
1350 tracing::info!(%address, %filter, "reading account nonce");
1351
1352 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1353 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1354 Ok(hex_num(account.nonce))
1355}
1356
1357fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1358 let _middleware_enter = ext.enter_middleware_span();
1360 let _method_enter = info_span!("rpc::eth_getBalance", address = field::Empty, filter = field::Empty).entered();
1361
1362 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1364 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1365
1366 Span::with(|s| {
1368 s.rec_str("address", &address);
1369 s.rec_str("filter", &filter);
1370 });
1371 tracing::info!(%address, %filter, "reading account native balance");
1372
1373 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1375 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1376 Ok(hex_num(account.balance))
1377}
1378
1379fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1380 let _middleware_enter = ext.enter_middleware_span();
1382 let _method_enter = info_span!("rpc::eth_getCode", address = field::Empty, filter = field::Empty).entered();
1383
1384 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1386 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1387
1388 Span::with(|s| {
1390 s.rec_str("address", &address);
1391 s.rec_str("filter", &filter);
1392 });
1393
1394 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1396 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1397
1398 Ok(account.bytecode.map(|bytecode| hex_data(bytecode.original_bytes())).unwrap_or_else(hex_null))
1399}
1400
1401async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc<RpcContext>, ext: Extensions) -> impl IntoSubscriptionCloseResponse {
1406 let middleware_enter = ext.enter_middleware_span();
1408 let method_span = info_span!("rpc::eth_subscribe", subscription = field::Empty, subscription_event = field::Empty);
1409 drop(middleware_enter);
1410
1411 async move {
1412 let client = ext.rpc_client();
1414 let (params, event) = match next_rpc_param::<String>(params.sequence()) {
1415 Ok((params, event)) => (params, event),
1416 Err(e) => {
1417 pending.reject(StratusError::from(e)).await;
1418 return Ok(());
1419 }
1420 };
1421
1422 if let Err(e) = ctx.subs.check_client_subscriptions(ctx.server.rpc_config.rpc_max_subscriptions, client).await {
1424 pending.reject(e).await;
1425 return Ok(());
1426 }
1427
1428 Span::with(|s| s.rec_str("subscription", &event));
1430 tracing::info!(%event, "subscribing to rpc event");
1431
1432 match event.deref() {
1434 "newPendingTransactions" => {
1435 ctx.subs.add_new_pending_txs_subscription(client, pending.accept().await?).await;
1436 }
1437
1438 "newHeads" => {
1439 ctx.subs.add_new_heads_subscription(client, pending.accept().await?).await;
1440 }
1441
1442 "logs" => {
1443 let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params)?;
1444 let filter = filter_input.parse(&ctx.server.storage)?;
1445
1446 let event_name = codegen::event_names_from_filter(&filter).to_string();
1447 Span::with(|s| s.rec_str("subscription_event", &event_name));
1448 tracing::info!(subscription_event = %event_name, "subscribing to logs with event filter");
1449
1450 ctx.subs.add_logs_subscription(client, filter, pending.accept().await?).await;
1451 }
1452
1453 event => {
1455 pending
1456 .reject(StratusError::from(RpcError::SubscriptionInvalid { event: event.to_string() }))
1457 .await;
1458 }
1459 }
1460
1461 Ok(())
1462 }
1463 .instrument(method_span)
1464 .await
1465}
1466
1467fn eth_get_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1472 let _middleware_enter = ext.enter_middleware_span();
1474 let _method_enter = info_span!("rpc::eth_getStorageAt", address = field::Empty, index = field::Empty).entered();
1475
1476 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1478 let (params, index) = next_rpc_param::<SlotIndex>(params)?;
1479 let (_, block_filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1480
1481 Span::with(|s| {
1482 s.rec_str("address", &address);
1483 s.rec_str("index", &index);
1484 });
1485
1486 let point_in_time = ctx.server.storage.translate_to_point_in_time(block_filter)?;
1488 let slot = ctx.server.storage.read_slot(address, index, point_in_time, ReadKind::RPC)?;
1489
1490 Ok(hex_num_zero_padded(slot.value.as_u256()))
1492}
1493
1494#[inline(always)]
1499fn hex_data<T: AsRef<[u8]>>(value: T) -> String {
1500 const_hex::encode_prefixed(value)
1501}
1502
1503#[inline(always)]
1504fn hex_num(value: impl Into<U256>) -> String {
1505 format!("{:#x}", value.into())
1506}
1507
1508#[inline(always)]
1509fn hex_num_zero_padded(value: impl Into<U256>) -> String {
1510 let width = 64 + 2; format!("{:#0width$x}", value.into(), width = width)
1512}
1513
1514fn hex_zero() -> String {
1515 "0x0".to_owned()
1516}
1517
1518fn hex_null() -> String {
1519 "0x".to_owned()
1520}
1521
1522fn enhance_trace_with_decoded_info(trace: &GethTrace) -> JsonValue {
1524 match trace {
1525 GethTrace::CallTracer(call_frame) => {
1526 let mut json = to_json_value(call_frame);
1528 enhance_serialized_call_frame(&mut json);
1529 json
1530 }
1531 _ => {
1532 to_json_value(trace)
1534 }
1535 }
1536}
1537
1538fn enhance_serialized_call_frame(json: &mut JsonValue) {
1539 if let Some(json_obj) = json.as_object_mut() {
1540 json_obj
1541 .get("from")
1542 .and_then(|v| v.as_str())
1543 .and_then(|s| s.parse::<Address>().ok())
1544 .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1545 .and_then(|s| json_obj.insert("decodedFromContract".to_string(), json!(s)));
1546
1547 json_obj
1548 .get("to")
1549 .and_then(|v| if v.is_null() { None } else { v.as_str() })
1550 .and_then(|s| s.parse::<Address>().ok())
1551 .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1552 .and_then(|s| json_obj.insert("decodedToContract".to_string(), json!(s)));
1553
1554 json_obj
1555 .get("input")
1556 .and_then(|v| v.as_str())
1557 .and_then(|s| const_hex::decode(s.trim_start_matches("0x")).ok())
1558 .inspect(|input| {
1559 if let Some(signature) = codegen::function_sig_opt(input) {
1560 json_obj.insert("decodedFunctionSignature".to_string(), json!(signature));
1561 }
1562
1563 match decode::decode_input_arguments(input) {
1564 Ok(args) => {
1565 json_obj.insert("decodedFunctionArguments".to_string(), json!(args));
1566 }
1567 Err(DecodeInputError::InvalidAbi { message }) => {
1568 tracing::warn!(
1569 message = %message,
1570 "Invalid ABI stored"
1571 );
1572 }
1573 _ => (),
1574 }
1575 });
1576 if let Some(calls) = json_obj.get_mut("calls").and_then(|v| v.as_array_mut()) {
1577 for call in calls {
1578 enhance_serialized_call_frame(call);
1579 }
1580 }
1581 }
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586 use alloy_primitives::Address;
1587 use alloy_primitives::Bytes;
1588 use alloy_primitives::U256;
1589 use alloy_rpc_types_trace::geth::CallFrame;
1590
1591 use super::*;
1592
1593 fn create_simple_test_call_structure() -> CallFrame {
1594 let deep_call_1 = CallFrame {
1596 from: "0x562689c910361ae21d12eadafbfca727b3bcbc24".parse::<Address>().unwrap(), to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), input: Bytes::from(
1599 const_hex::decode(
1600 "70a08231000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e", )
1602 .unwrap(),
1603 ),
1604 output: Some(Bytes::from(
1605 const_hex::decode("0000000000000000000000000000000000000000000000000de0b6b3a7640000").unwrap(),
1606 )),
1607 gas: U256::from(5000),
1608 gas_used: U256::from(3000),
1609 value: None,
1610 typ: "STATICCALL".to_string(),
1611 error: None,
1612 revert_reason: None,
1613 calls: Vec::new(),
1614 logs: Vec::new(),
1615 };
1616
1617 let deep_call_2 = CallFrame {
1618 from: "0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap(), to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), input: Bytes::from(
1621 const_hex::decode(
1622 "dd62ed3e000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e1", )
1624 .unwrap(),
1625 ),
1626 output: Some(Bytes::from(
1627 const_hex::decode("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(),
1628 )),
1629 gas: U256::from(8000),
1630 gas_used: U256::from(5000),
1631 value: None,
1632 typ: "STATICCALL".to_string(),
1633 error: None,
1634 revert_reason: None,
1635 calls: Vec::new(),
1636 logs: Vec::new(),
1637 };
1638
1639 let nested_call_1 = CallFrame {
1641 from: "0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap(), to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), input: Bytes::from(
1644 const_hex::decode(
1645 "a9059cbb000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1647 .unwrap(),
1648 ),
1649 output: Some(Bytes::from(
1650 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1651 )),
1652 gas: U256::from(21000),
1653 gas_used: U256::from(20000),
1654 value: None,
1655 typ: "CALL".to_string(),
1656 error: None,
1657 revert_reason: None,
1658 calls: vec![deep_call_1],
1659 logs: Vec::new(),
1660 };
1661
1662 let nested_call_2 = CallFrame {
1663 from: "0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap(), to: Some("0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap()), input: Bytes::from(
1666 const_hex::decode(
1667 "095ea7b3000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1669 .unwrap(),
1670 ),
1671 output: Some(Bytes::from(
1672 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1673 )),
1674 gas: U256::from(30000),
1675 gas_used: U256::from(25000),
1676 value: None,
1677 typ: "CALL".to_string(),
1678 error: None,
1679 revert_reason: None,
1680 calls: vec![deep_call_2],
1681 logs: Vec::new(),
1682 };
1683
1684 CallFrame {
1686 from: "0x742d35Cc6634C0532925a3b8D7C9be8813eeb02e".parse::<Address>().unwrap(),
1687 to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), input: Bytes::from(
1689 const_hex::decode(
1690 "23b872dd000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e10000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1692 .unwrap(),
1693 ),
1694 output: Some(Bytes::from(
1695 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1696 )),
1697 gas: U256::from(100000),
1698 gas_used: U256::from(85000),
1699 value: None,
1700 typ: "CALL".to_string(),
1701 error: None,
1702 revert_reason: None,
1703 calls: vec![nested_call_1, nested_call_2],
1704 logs: Vec::new(),
1705 }
1706 }
1707
1708 #[test]
1709 fn test_contract_name_decoding() {
1710 let test_call = create_simple_test_call_structure();
1712 let geth_trace = GethTrace::CallTracer(test_call);
1713
1714 let result = enhance_trace_with_decoded_info(&geth_trace);
1716 let result_str = serde_json::to_string_pretty(&result).unwrap();
1717
1718 assert!(result_str.contains("Cashier_BRLC_v2"));
1719 assert!(result_str.contains("BRLCToken"));
1720 assert!(result_str.contains("USJIMToken"));
1721 assert!(result_str.contains("Compound_Agent_4"));
1722
1723 assert!(result_str.contains("transfer(address,uint256)"));
1725 assert!(result_str.contains("approve(address,uint256)"));
1726 assert!(result_str.contains("transferFrom(address,address,uint256)"));
1727 assert!(result_str.contains("balanceOf(address)"));
1728 assert!(result_str.contains("allowance(address,address)"));
1729 }
1730}