1use std::collections::HashMap;
4use std::ops::Deref;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::sync::LazyLock;
8use std::time::Duration;
9
10use alloy_primitives::U256;
11use alloy_rpc_types_trace::geth::GethDebugTracingOptions;
12use alloy_rpc_types_trace::geth::GethTrace;
13use anyhow::Result;
14use futures::join;
15use http::Method;
16use itertools::Itertools;
17use jsonrpsee::Extensions;
18use jsonrpsee::IntoSubscriptionCloseResponse;
19use jsonrpsee::PendingSubscriptionSink;
20use jsonrpsee::server::BatchRequestConfig;
21use jsonrpsee::server::RandomStringIdProvider;
22use jsonrpsee::server::RpcModule;
23use jsonrpsee::server::Server as RpcServer;
24use jsonrpsee::server::ServerConfig;
25use jsonrpsee::server::ServerHandle;
26use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
27use jsonrpsee::types::Params;
28use jsonrpsee::ws_client::RpcServiceBuilder;
29use parking_lot::RwLock;
30use serde_json::json;
31use tokio::runtime::Handle;
32use tokio::select;
33use tokio::sync::Semaphore;
34use tokio::sync::SemaphorePermit;
35use tower_http::cors::Any;
36use tower_http::cors::CorsLayer;
37use tracing::Instrument;
38use tracing::Span;
39use tracing::field;
40use tracing::info_span;
41
42use crate::GlobalState;
43use crate::NodeMode;
44use crate::alias::AlloyReceipt;
45use crate::alias::JsonValue;
46use crate::config::StratusConfig;
47use crate::eth::codegen;
48use crate::eth::codegen::CONTRACTS;
49use crate::eth::decode;
50use crate::eth::executor::Executor;
51use crate::eth::follower::consensus::Consensus;
52use crate::eth::follower::importer::ImporterConfig;
53use crate::eth::follower::importer::ImporterConsensus;
54use crate::eth::follower::importer::send_block_to_kafka;
55use crate::eth::miner::Miner;
56use crate::eth::miner::MinerMode;
57use crate::eth::primitives::Address;
58use crate::eth::primitives::BlockFilter;
59use crate::eth::primitives::Bytes;
60use crate::eth::primitives::CallInput;
61use crate::eth::primitives::ChainId;
62use crate::eth::primitives::ConsensusError;
63use crate::eth::primitives::DecodeInputError;
64use crate::eth::primitives::EvmExecution;
65use crate::eth::primitives::Hash;
66use crate::eth::primitives::ImporterError;
67use crate::eth::primitives::LogFilterInput;
68#[cfg(feature = "dev")]
69use crate::eth::primitives::Nonce;
70use crate::eth::primitives::PointInTime;
71use crate::eth::primitives::RpcError;
72use crate::eth::primitives::SlotIndex;
73#[cfg(feature = "dev")]
74use crate::eth::primitives::SlotValue;
75use crate::eth::primitives::StateError;
76use crate::eth::primitives::StorageError;
77use crate::eth::primitives::StratusError;
78use crate::eth::primitives::TransactionError;
79use crate::eth::primitives::TransactionInput;
80use crate::eth::primitives::TransactionStage;
81#[cfg(feature = "dev")]
82use crate::eth::primitives::Wei;
83use crate::eth::rpc::RpcContext;
84use crate::eth::rpc::RpcHttpMiddleware;
85use crate::eth::rpc::RpcMiddleware;
86use crate::eth::rpc::RpcServerConfig;
87use crate::eth::rpc::RpcSubscriptions;
88use crate::eth::rpc::next_rpc_param;
89use crate::eth::rpc::next_rpc_param_or_default;
90use crate::eth::rpc::rpc_parser::RpcExtensionsExt;
91use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsHandles;
92use crate::eth::storage::ReadKind;
93use crate::eth::storage::StratusStorage;
94use crate::ext::InfallibleExt;
95use crate::ext::WatchReceiverExt;
96use crate::ext::not;
97use crate::ext::parse_duration;
98use crate::ext::to_json_string;
99use crate::ext::to_json_value;
100use crate::infra::build_info;
101use crate::infra::kafka::KafkaConfig;
102use crate::infra::metrics;
103use crate::infra::tracing::SpanExt;
104use crate::log_and_err;
105#[derive(Clone, derive_new::new)]
110pub struct Server {
111 pub storage: Arc<StratusStorage>,
113 pub executor: Arc<Executor>,
114 pub miner: Arc<Miner>,
115 pub importer: Arc<RwLock<Option<Arc<ImporterConsensus>>>>,
116
117 pub app_config: StratusConfig,
119 pub rpc_config: RpcServerConfig,
120 pub chain_id: ChainId,
121}
122
123impl Server {
124 pub async fn serve(self) -> Result<()> {
125 let this = Arc::new(self);
126 this.update_health().await;
127
128 const TASK_NAME: &str = "rpc-server";
129 let health_worker_handle = tokio::task::spawn({
130 let this = Arc::clone(&this);
131 async move {
132 this.health_worker().await;
133 }
134 });
135 this.update_health().await;
136 let mut health = GlobalState::get_health_receiver();
137 health.mark_unchanged();
138 let (server_handle, subscriptions) = loop {
139 let (server_handle, subscriptions) = this._serve().await?;
140 let server_handle_watch = server_handle.clone();
141
142 select! {
144 _ = server_handle_watch.stopped() => {
146 GlobalState::shutdown_from(TASK_NAME, "finished unexpectedly");
147 break (server_handle, subscriptions);
148 },
149 _ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
151 let _ = server_handle.stop();
152 break (server_handle, subscriptions);
153 },
154 _ = health.wait_for_change(|healthy| GlobalState::restart_on_unhealthy() && !healthy) => {
156 tracing::info!("health state changed to unhealthy, restarting the rpc server");
157 let _ = server_handle.stop();
158 subscriptions.abort();
159 join!(server_handle.stopped(), subscriptions.stopped());
160 }
161 }
162 };
163 let res = join!(server_handle.stopped(), subscriptions.stopped(), health_worker_handle);
164 res.2?;
165 Ok(())
166 }
167
168 async fn _serve(&self) -> anyhow::Result<(ServerHandle, RpcSubscriptionsHandles)> {
170 let this = self.clone();
171 const TASK_NAME: &str = "rpc-server";
172 tracing::info!(%this.rpc_config.rpc_address, %this.rpc_config.rpc_max_connections, "creating {}", TASK_NAME);
173
174 let subs = RpcSubscriptions::spawn(
176 this.miner.notifier_pending_txs.subscribe(),
177 this.miner.notifier_blocks.subscribe(),
178 this.miner.notifier_logs.subscribe(),
179 );
180
181 let ctx = RpcContext {
183 server: Arc::new(this.clone()),
184 client_version: "stratus",
185 subs: Arc::clone(&subs.connected),
186 };
187
188 let mut module = RpcModule::<RpcContext>::new(ctx);
190 module = register_methods(module)?;
191
192 let cors = CorsLayer::new().allow_methods([Method::POST]).allow_origin(Any).allow_headers(Any);
194 let rpc_middleware = RpcServiceBuilder::new().layer_fn(RpcMiddleware::new);
195 let http_middleware = tower::ServiceBuilder::new().layer(cors).layer_fn(RpcHttpMiddleware::new).layer(
196 ProxyGetRequestLayer::new([
197 ("/health", "stratus_health"),
198 ("/version", "stratus_version"),
199 ("/config", "stratus_config"),
200 ("/state", "stratus_state"),
201 ])
202 .unwrap(),
203 );
204
205 let server_config = ServerConfig::builder()
206 .set_id_provider(RandomStringIdProvider::new(8))
207 .max_connections(this.rpc_config.rpc_max_connections)
208 .max_response_body_size(this.rpc_config.rpc_max_response_size_bytes)
209 .set_batch_request_config(BatchRequestConfig::Limit(this.rpc_config.batch_request_limit))
210 .build();
211
212 let server = RpcServer::builder()
214 .set_rpc_middleware(rpc_middleware)
215 .set_http_middleware(http_middleware)
216 .set_config(server_config)
217 .build(this.rpc_config.rpc_address)
218 .await?;
219
220 let handle_rpc_server = server.start(module);
221 Ok((handle_rpc_server, subs.handles))
222 }
223
224 pub async fn health_worker(&self) {
225 let mut interval = tokio::time::interval(std::time::Duration::from_millis(self.rpc_config.health_check_interval_ms));
227
228 loop {
230 self.update_health().await;
232
233 interval.tick().await;
235 if GlobalState::is_shutdown() {
236 break;
237 }
238 }
239 }
240
241 async fn update_health(&self) {
242 let is_healthy = self.health().await;
243 GlobalState::set_health(is_healthy);
244 metrics::set_consensus_is_ready(if is_healthy { 1u64 } else { 0u64 });
245 }
246
247 fn read_importer(&self) -> Option<Arc<ImporterConsensus>> {
248 self.importer.read().as_ref().map(Arc::clone)
249 }
250
251 pub fn set_importer(&self, importer: Option<Arc<ImporterConsensus>>) {
252 *self.importer.write() = importer;
253 }
254
255 async fn health(&self) -> bool {
256 match GlobalState::get_node_mode() {
257 NodeMode::Leader | NodeMode::FakeLeader => true,
258 NodeMode::Follower =>
259 if GlobalState::is_importer_shutdown() {
260 tracing::warn!("stratus is unhealthy because importer is shutdown");
261 false
262 } else {
263 match self.read_importer() {
264 Some(importer) => importer.should_serve().await,
265 None => {
266 tracing::warn!("stratus is unhealthy because importer is not available");
267 false
268 }
269 }
270 },
271 }
272 }
273}
274
275fn register_methods(mut module: RpcModule<RpcContext>) -> anyhow::Result<RpcModule<RpcContext>> {
276 #[cfg(feature = "dev")]
278 {
279 module.register_blocking_method("evm_setNextBlockTimestamp", evm_set_next_block_timestamp)?;
280 module.register_blocking_method("evm_mine", evm_mine)?;
281 module.register_blocking_method("hardhat_reset", stratus_reset)?;
282 module.register_blocking_method("stratus_reset", stratus_reset)?;
283 module.register_blocking_method("hardhat_setStorageAt", stratus_set_storage_at)?;
284 module.register_blocking_method("stratus_setStorageAt", stratus_set_storage_at)?;
285 module.register_blocking_method("hardhat_setNonce", stratus_set_nonce)?;
286 module.register_blocking_method("stratus_setNonce", stratus_set_nonce)?;
287 module.register_blocking_method("hardhat_setBalance", stratus_set_balance)?;
288 module.register_blocking_method("stratus_setBalance", stratus_set_balance)?;
289 module.register_blocking_method("hardhat_setCode", stratus_set_code)?;
290 module.register_blocking_method("stratus_setCode", stratus_set_code)?;
291 module.register_blocking_method("stratus_clearCache", stratus_clear_cache)?;
292 }
293
294 module.register_async_method("stratus_health", stratus_health)?;
296
297 module.register_method("stratus_enableTransactions", stratus_enable_transactions)?;
299 module.register_method("stratus_disableTransactions", stratus_disable_transactions)?;
300 module.register_method("stratus_enableMiner", stratus_enable_miner)?;
301 module.register_method("stratus_disableMiner", stratus_disable_miner)?;
302 module.register_method("stratus_enableUnknownClients", stratus_enable_unknown_clients)?;
303 module.register_method("stratus_disableUnknownClients", stratus_disable_unknown_clients)?;
304 module.register_method("stratus_enableRestartOnUnhealthy", stratus_enable_restart_on_unhealthy)?;
305 module.register_method("stratus_disableRestartOnUnhealthy", stratus_disable_restart_on_unhealthy)?;
306 module.register_async_method("stratus_changeToLeader", stratus_change_to_leader)?;
307 module.register_async_method("stratus_changeToFollower", stratus_change_to_follower)?;
308 module.register_async_method("stratus_initImporter", stratus_init_importer)?;
309 module.register_method("stratus_shutdownImporter", stratus_shutdown_importer)?;
310 module.register_async_method("stratus_changeMinerMode", stratus_change_miner_mode)?;
311 module.register_async_method("stratus_emitBlockEvents", stratus_emit_block_events)?;
312
313 module.register_method("stratus_version", stratus_version)?;
315 module.register_method("stratus_config", stratus_config)?;
316 module.register_method("stratus_state", stratus_state)?;
317
318 module.register_async_method("stratus_getSubscriptions", stratus_get_subscriptions)?;
319 module.register_method("stratus_pendingTransactionsCount", stratus_pending_transactions_count)?;
320
321 module.register_method("net_version", net_version)?;
323 module.register_async_method("net_listening", net_listening)?;
324 module.register_method("eth_chainId", eth_chain_id)?;
325 module.register_method("web3_clientVersion", web3_client_version)?;
326
327 module.register_method("eth_gasPrice", eth_gas_price)?;
329
330 module.register_blocking_method("stratus_getBlockAndReceipts", stratus_get_block_and_receipts)?;
332 module.register_blocking_method("stratus_getBlockWithChanges", stratus_get_block_with_changes)?;
333
334 module.register_blocking_method("eth_blockNumber", eth_block_number)?;
336 module.register_blocking_method("eth_getBlockByNumber", eth_get_block_by_number)?;
337 module.register_blocking_method("eth_getBlockByHash", eth_get_block_by_hash)?;
338 module.register_blocking_method("stratus_getBlockByTimestamp", stratus_get_block_by_timestamp)?;
339 module.register_method("eth_getUncleByBlockHashAndIndex", eth_get_uncle_by_block_hash_and_index)?;
340
341 module.register_blocking_method("eth_getTransactionByHash", eth_get_transaction_by_hash)?;
343 module.register_blocking_method("eth_getTransactionReceipt", eth_get_transaction_receipt)?;
344 module.register_blocking_method("eth_estimateGas", eth_estimate_gas)?;
345 module.register_blocking_method("eth_call", eth_call)?;
346 module.register_blocking_method("eth_sendRawTransaction", eth_send_raw_transaction)?;
347 module.register_blocking_method("stratus_call", stratus_call)?;
348 module.register_blocking_method("stratus_getTransactionResult", stratus_get_transaction_result)?;
349 module.register_blocking_method("debug_traceTransaction", debug_trace_transaction)?;
350
351 module.register_blocking_method("eth_getLogs", eth_get_logs)?;
353
354 module.register_method("eth_accounts", eth_accounts)?;
356 module.register_blocking_method("eth_getTransactionCount", eth_get_transaction_count)?;
357 module.register_blocking_method("eth_getBalance", eth_get_balance)?;
358 module.register_blocking_method("eth_getCode", eth_get_code)?;
359
360 module.register_blocking_method("eth_getStorageAt", eth_get_storage_at)?;
362
363 module.register_subscription("eth_subscribe", "eth_subscription", "eth_unsubscribe", eth_subscribe)?;
365
366 Ok(module)
367}
368
369#[cfg(feature = "dev")]
374fn evm_mine(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
375 ctx.server.miner.mine_local_and_commit()?;
376 Ok(to_json_value(true))
377}
378
379#[cfg(feature = "dev")]
380fn stratus_clear_cache(_params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
381 ctx.server.storage.clear_cache();
382 Ok(to_json_value(true))
383}
384
385#[cfg(feature = "dev")]
386fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
387 use crate::eth::primitives::UnixTime;
388 use crate::log_and_err;
389
390 let (_, timestamp) = next_rpc_param::<UnixTime>(params.sequence())?;
391 let latest = ctx.server.storage.read_block(BlockFilter::Latest)?;
392 match latest {
393 Some(block) => UnixTime::set_offset(block.header.timestamp, timestamp)?,
394 None => return log_and_err!("reading latest block returned None")?,
395 }
396 Ok(to_json_value(timestamp))
397}
398
399#[allow(unused_variables)]
404async fn stratus_health(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
405 #[cfg(feature = "dev")]
406 ctx.server.update_health().await;
407
408 if GlobalState::is_shutdown() {
409 tracing::warn!("liveness check failed because of shutdown");
410 return Err(StateError::StratusShutdown.into());
411 }
412
413 if GlobalState::is_healthy() {
414 Ok(json!(true))
415 } else {
416 Err(StateError::StratusNotReady.into())
417 }
418}
419
420#[cfg(feature = "dev")]
425fn stratus_reset(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
426 ctx.server.storage.reset_to_genesis()?;
427 Ok(to_json_value(true))
428}
429
430#[cfg(feature = "dev")]
431fn stratus_set_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
432 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
433 let (params, index) = next_rpc_param::<SlotIndex>(params)?;
434 let (_, value) = next_rpc_param::<SlotValue>(params)?;
435
436 tracing::info!(%address, %index, %value, "setting storage at address and index");
437
438 ctx.server.storage.set_storage_at(address, index, value)?;
439
440 Ok(to_json_value(true))
441}
442
443#[cfg(feature = "dev")]
444fn stratus_set_nonce(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
445 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
446 let (_, nonce) = next_rpc_param::<Nonce>(params)?;
447
448 tracing::info!(%address, %nonce, "setting nonce for address");
449
450 ctx.server.storage.set_nonce(address, nonce)?;
451
452 Ok(to_json_value(true))
453}
454
455#[cfg(feature = "dev")]
456fn stratus_set_balance(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
457 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
458 let (_, balance) = next_rpc_param::<Wei>(params)?;
459
460 tracing::info!(%address, %balance, "setting balance for address");
461
462 ctx.server.storage.set_balance(address, balance)?;
463
464 Ok(to_json_value(true))
465}
466
467#[cfg(feature = "dev")]
468fn stratus_set_code(params: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
469 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
470 let (_, code) = next_rpc_param::<Bytes>(params)?;
471
472 tracing::info!(%address, code_size = %code.0.len(), "setting code for address");
473
474 ctx.server.storage.set_code(address, code)?;
475
476 Ok(to_json_value(true))
477}
478
479static MODE_CHANGE_SEMAPHORE: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
480
481async fn stratus_change_to_leader(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
482 ext.authentication().auth_admin()?;
483 let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
484 let _permit: SemaphorePermit = match permit {
485 Ok(permit) => permit,
486 Err(_) => return Err(StateError::ModeChangeInProgress.into()),
487 };
488
489 const LEADER_MINER_INTERVAL: Duration = Duration::from_secs(1);
490 tracing::info!("starting process to change node to leader");
491
492 if GlobalState::get_node_mode() == NodeMode::Leader {
493 tracing::info!("node is already in leader mode, no changes made");
494 return Ok(json!(false));
495 }
496
497 if GlobalState::is_transactions_enabled() {
498 tracing::error!("transactions are currently enabled, cannot change node mode");
499 return Err(StateError::TransactionsEnabled.into());
500 }
501
502 tracing::info!("shutting down importer");
503 let shutdown_importer_result = stratus_shutdown_importer(Params::new(None), &ctx, &ext);
504 match shutdown_importer_result {
505 Ok(_) => tracing::info!("importer shutdown successfully"),
506 Err(StratusError::Importer(ImporterError::AlreadyShutdown)) => {
507 tracing::warn!("importer is already shutdown, continuing");
508 }
509 Err(e) => {
510 tracing::error!(reason = ?e, "failed to shutdown importer");
511 return Err(e);
512 }
513 }
514
515 tracing::info!("wait for importer to shutdown");
516 GlobalState::wait_for_importer_to_finish().await;
517
518 let change_miner_mode_result = change_miner_mode(MinerMode::Interval(LEADER_MINER_INTERVAL), &ctx).await;
519 if let Err(e) = change_miner_mode_result {
520 tracing::error!(reason = ?e, "failed to change miner mode");
521 return Err(e);
522 }
523 tracing::info!("miner mode changed to interval(1s) successfully");
524
525 GlobalState::set_node_mode(NodeMode::Leader);
526 ctx.server.storage.clear_cache();
527 tracing::info!("node mode changed to leader successfully, cache cleared");
528
529 Ok(json!(true))
530}
531
532async fn stratus_change_to_follower(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
533 ext.authentication().auth_admin()?;
534 let permit = MODE_CHANGE_SEMAPHORE.try_acquire();
535 let _permit: SemaphorePermit = match permit {
536 Ok(permit) => permit,
537 Err(_) => return Err(StateError::ModeChangeInProgress.into()),
538 };
539
540 tracing::info!("starting process to change node to follower");
541
542 if GlobalState::get_node_mode() == NodeMode::Follower {
543 tracing::info!("node is already in follower mode, no changes made");
544 return Ok(json!(false));
545 }
546
547 if GlobalState::is_transactions_enabled() {
548 tracing::error!("transactions are currently enabled, cannot change node mode");
549 return Err(StateError::TransactionsEnabled.into());
550 }
551
552 let pending_txs = ctx.server.storage.pending_transactions();
553 if not(pending_txs.is_empty()) {
554 tracing::error!(pending_txs = ?pending_txs.len(), "cannot change to follower mode with pending transactions");
555 return Err(StorageError::PendingTransactionsExist {
556 pending_txs: pending_txs.len(),
557 }
558 .into());
559 }
560
561 let change_miner_mode_result = change_miner_mode(MinerMode::External, &ctx).await;
562 if let Err(e) = change_miner_mode_result {
563 tracing::error!(reason = ?e, "failed to change miner mode");
564 return Err(e);
565 }
566 tracing::info!("miner mode changed to external successfully");
567
568 GlobalState::set_node_mode(NodeMode::Follower);
569 ctx.server.storage.clear_cache();
570 tracing::info!("storage cache cleared");
571
572 tracing::info!("initializing importer");
573 let init_importer_result = stratus_init_importer(params, Arc::clone(&ctx), ext).await;
574 match init_importer_result {
575 Ok(_) => tracing::info!("importer initialized successfully"),
576 Err(StratusError::Importer(ImporterError::AlreadyRunning)) => {
577 tracing::warn!("importer is already running, continuing");
578 }
579 Err(e) => {
580 tracing::error!(reason = ?e, "failed to initialize importer, reverting node mode to leader");
581 GlobalState::set_node_mode(NodeMode::Leader);
582 return Err(e);
583 }
584 }
585 tracing::info!("node mode changed to follower successfully");
586
587 Ok(json!(true))
588}
589
590async fn stratus_init_importer(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
591 ext.authentication().auth_admin()?;
592 let (params, external_rpc) = next_rpc_param::<String>(params.sequence())?;
593 let (params, external_rpc_ws) = next_rpc_param::<String>(params)?;
594 let (params, raw_external_rpc_timeout) = next_rpc_param::<String>(params)?;
595 let (params, raw_sync_interval) = next_rpc_param::<String>(params)?;
596 let (_, raw_external_rpc_max_response_size_bytes) = next_rpc_param::<String>(params)?;
597
598 let external_rpc_timeout = parse_duration(&raw_external_rpc_timeout).map_err(|e| {
599 tracing::error!(reason = ?e, "failed to parse external_rpc_timeout");
600 ImporterError::ConfigParseError
601 })?;
602
603 let sync_interval = parse_duration(&raw_sync_interval).map_err(|e| {
604 tracing::error!(reason = ?e, "failed to parse sync_interval");
605 ImporterError::ConfigParseError
606 })?;
607
608 let external_rpc_max_response_size_bytes = raw_external_rpc_max_response_size_bytes.parse::<u32>().map_err(|e| {
609 tracing::error!(reason = ?e, "failed to parse external_rpc_max_response_size_bytes");
610 ImporterError::ConfigParseError
611 })?;
612
613 let importer_config = ImporterConfig {
614 external_rpc,
615 external_rpc_ws: Some(external_rpc_ws),
616 external_rpc_timeout,
617 sync_interval,
618 external_rpc_max_response_size_bytes,
619 enable_block_changes_replication: std::env::var("ENABLE_BLOCK_CHANGES_REPLICATION")
620 .ok()
621 .is_some_and(|val| val == "1" || val == "true"),
622 };
623
624 importer_config.init_follower_importer(ctx).await
625}
626
627fn stratus_shutdown_importer(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<JsonValue, StratusError> {
628 ext.authentication().auth_admin()?;
629 if GlobalState::get_node_mode() != NodeMode::Follower {
630 tracing::error!("node is currently not a follower");
631 return Err(StateError::StratusNotFollower.into());
632 }
633
634 if GlobalState::is_importer_shutdown() && ctx.server.read_importer().is_none() {
635 tracing::error!("importer is already shut down");
636 return Err(ImporterError::AlreadyShutdown.into());
637 }
638
639 ctx.server.set_importer(None);
640
641 const TASK_NAME: &str = "rpc-server::importer-shutdown";
642 GlobalState::shutdown_importer_from(TASK_NAME, "received importer shutdown request");
643
644 Ok(json!(true))
645}
646
647async fn stratus_change_miner_mode(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
648 ext.authentication().auth_admin()?;
649 let (_, mode_str) = next_rpc_param::<String>(params.sequence())?;
650
651 let mode = MinerMode::from_str(&mode_str).map_err(|e| {
652 tracing::error!(reason = ?e, "failed to parse miner mode");
653 RpcError::MinerModeParamInvalid
654 })?;
655
656 change_miner_mode(mode, &ctx).await
657}
658
659async fn change_miner_mode(new_mode: MinerMode, ctx: &RpcContext) -> Result<JsonValue, StratusError> {
663 if GlobalState::is_transactions_enabled() {
664 tracing::error!("cannot change miner mode while transactions are enabled");
665 return Err(StateError::TransactionsEnabled.into());
666 }
667
668 let previous_mode = ctx.server.miner.mode();
669
670 if previous_mode == new_mode {
671 tracing::warn!(?new_mode, current = ?new_mode, "miner mode already set, skipping");
672 return Ok(json!(false));
673 }
674
675 if not(ctx.server.miner.is_paused()) && previous_mode.is_interval() {
676 return log_and_err!("can't change miner mode from Interval without pausing it first").map_err(Into::into);
677 }
678
679 match new_mode {
680 MinerMode::External => {
681 tracing::info!("changing miner mode to External");
682
683 let pending_txs = ctx.server.storage.pending_transactions();
684 if not(pending_txs.is_empty()) {
685 tracing::error!(pending_txs = ?pending_txs.len(), "cannot change miner mode to External with pending transactions");
686 return Err(StorageError::PendingTransactionsExist {
687 pending_txs: pending_txs.len(),
688 }
689 .into());
690 }
691
692 ctx.server.miner.switch_to_external_mode().await;
693 }
694 MinerMode::Interval(duration) => {
695 tracing::info!(duration = ?duration, "changing miner mode to Interval");
696
697 if ctx.server.read_importer().is_some() {
698 tracing::error!("cannot change miner mode to Interval with consensus set");
699 return Err(ConsensusError::Set.into());
700 }
701
702 ctx.server.miner.start_interval_mining(duration).await;
703 }
704 MinerMode::Automine => {
705 return log_and_err!("Miner mode change to 'automine' is unsupported.").map_err(Into::into);
706 }
707 }
708
709 Ok(json!(true))
710}
711
712fn stratus_enable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
713 ext.authentication().auth_admin()?;
714 GlobalState::set_unknown_client_enabled(true);
715 Ok(GlobalState::is_unknown_client_enabled())
716}
717
718fn stratus_disable_unknown_clients(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
719 ext.authentication().auth_admin()?;
720 GlobalState::set_unknown_client_enabled(false);
721 Ok(GlobalState::is_unknown_client_enabled())
722}
723
724fn stratus_enable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
725 ext.authentication().auth_admin()?;
726 GlobalState::set_transactions_enabled(true);
727 Ok(GlobalState::is_transactions_enabled())
728}
729
730fn stratus_disable_transactions(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
731 ext.authentication().auth_admin()?;
732 GlobalState::set_transactions_enabled(false);
733 Ok(GlobalState::is_transactions_enabled())
734}
735
736fn stratus_enable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
737 ext.authentication().auth_admin()?;
738 ctx.server.miner.unpause();
739 Ok(true)
740}
741
742fn stratus_disable_miner(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
743 ext.authentication().auth_admin()?;
744 ctx.server.miner.pause();
745 Ok(false)
746}
747
748fn stratus_pending_transactions_count(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> usize {
750 ctx.server.storage.pending_transactions().len()
751}
752
753fn stratus_disable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
754 ext.authentication().auth_admin()?;
755 GlobalState::set_restart_on_unhealthy(false);
756 Ok(GlobalState::restart_on_unhealthy())
757}
758
759fn stratus_enable_restart_on_unhealthy(_: Params<'_>, _: &RpcContext, ext: &Extensions) -> Result<bool, StratusError> {
760 ext.authentication().auth_admin()?;
761 GlobalState::set_restart_on_unhealthy(true);
762 Ok(GlobalState::restart_on_unhealthy())
763}
764
765async fn stratus_emit_block_events(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<(), StratusError> {
767 ext.authentication().auth_admin()?;
768 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
769 let (_, kafka_config_override) = next_rpc_param_or_default::<Option<KafkaConfig>>(params)?;
770
771 let Some(block) = ctx.server.storage.read_block(filter)? else {
772 return Err(StorageError::BlockNotFound { filter }.into());
773 };
774
775 let kafka_config = kafka_config_override.as_ref().or(ctx.server.app_config.kafka_config.as_ref());
776 let Some(kafka_connector) = kafka_config.map(|inner| inner.init()).transpose()? else {
777 return Err(StateError::Misconfigured {
778 details: "kafka configuration is not set",
779 }
780 .into());
781 };
782
783 send_block_to_kafka(&Some(kafka_connector), &block).await?;
784
785 Ok(())
786}
787
788fn stratus_version(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
793 Ok(build_info::as_json())
794}
795
796fn stratus_config(_: Params<'_>, ctx: &RpcContext, ext: &Extensions) -> Result<StratusConfig, StratusError> {
797 ext.authentication().auth_admin()?;
798 Ok(ctx.server.app_config.clone())
799}
800
801fn stratus_state(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
802 Ok(GlobalState::get_global_state_as_json(ctx))
803}
804
805async fn stratus_get_subscriptions(_: Params<'_>, ctx: Arc<RpcContext>, _: Extensions) -> Result<JsonValue, StratusError> {
806 let pending_txs = serde_json::to_value(ctx.subs.pending_txs.read().await.values().collect_vec()).expect_infallible();
808 let new_heads = serde_json::to_value(ctx.subs.new_heads.read().await.values().collect_vec()).expect_infallible();
809 let logs = serde_json::to_value(ctx.subs.logs.read().await.values().flat_map(HashMap::values).collect_vec()).expect_infallible();
810
811 let response = json!({
812 "newPendingTransactions": pending_txs,
813 "newHeads": new_heads,
814 "logs": logs,
815 });
816 Ok(response)
817}
818
819async fn net_listening(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
824 let net_listening = stratus_health(params, ctx, ext).await;
825
826 tracing::info!(net_listening = ?net_listening, "network listening status");
827
828 net_listening
829}
830
831fn net_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
832 ctx.server.chain_id.to_string()
833}
834
835fn eth_chain_id(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
836 hex_num(ctx.server.chain_id)
837}
838
839fn web3_client_version(_: Params<'_>, ctx: &RpcContext, _: &Extensions) -> String {
840 ctx.client_version.to_owned()
841}
842
843fn eth_gas_price(_: Params<'_>, _: &RpcContext, _: &Extensions) -> String {
848 hex_zero()
849}
850
851fn eth_block_number(_params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
856 let _middleware_enter = ext.enter_middleware_span();
858 let _method_enter = info_span!("rpc::eth_blockNumber", block_number = field::Empty).entered();
859
860 let block_number = ctx.server.storage.read_mined_block_number();
862 Span::with(|s| s.rec_str("block_number", &block_number));
863
864 Ok(to_json_value(block_number))
865}
866
867fn stratus_get_block_and_receipts(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
868 let _middleware_enter = ext.enter_middleware_span();
870 let _method_enter = info_span!("rpc::stratus_getBlockAndReceipts").entered();
871
872 let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
874
875 tracing::info!(%filter, "reading block and receipts");
877
878 let Some(block) = ctx.server.storage.read_block(filter)? else {
879 tracing::info!(%filter, "block not found");
880 return Ok(JsonValue::Null);
881 };
882
883 tracing::info!(%filter, "block with transactions found");
884 let receipts = block.transactions.iter().cloned().map(AlloyReceipt::from).collect::<Vec<_>>();
885
886 Ok(json!({
887 "block": block.to_json_rpc_with_full_transactions(),
888 "receipts": receipts,
889 }))
890}
891
892fn stratus_get_block_with_changes(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
893 let _middleware_enter = ext.enter_middleware_span();
895 let _method_enter = info_span!("rpc::stratus_getBlockWithChanges").entered();
896
897 let (_, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
899
900 tracing::info!(%filter, "reading block and changes");
902
903 let Some(block) = ctx.server.storage.read_block_with_changes(filter)? else {
904 tracing::info!(%filter, "block not found");
905 return Ok(JsonValue::Null);
906 };
907
908 tracing::info!(%filter, "block with changes found");
909
910 Ok(json!(block))
911}
912
913fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
914 let _middleware_enter = ext.enter_middleware_span();
915
916 let _method_enter = info_span!(
917 "rpc::eth_getBlockByHash",
918 filter = field::Empty,
919 block_number = field::Empty,
920 found = field::Empty
921 )
922 .entered();
923
924 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
925 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
926
927 match &filter {
928 BlockFilter::Hash(_) => (),
929 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
930 }
931
932 eth_get_block_by_selector(filter, full_transactions, ctx)
933}
934
935fn eth_get_block_by_number(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
936 let _middleware_enter = ext.enter_middleware_span();
937
938 let _method_enter = info_span!(
939 "rpc::eth_getBlockByNumber",
940 filter = field::Empty,
941 block_number = field::Empty,
942 found = field::Empty
943 )
944 .entered();
945
946 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
947 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
948
949 match &filter {
950 BlockFilter::Earliest | BlockFilter::Latest | BlockFilter::Number(_) | BlockFilter::Pending => (),
951 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
952 }
953
954 eth_get_block_by_selector(filter, full_transactions, ctx)
955}
956
957#[inline(always)]
958fn eth_get_block_by_selector(filter: BlockFilter, full_transactions: bool, ctx: Arc<RpcContext>) -> Result<JsonValue, StratusError> {
959 Span::with(|s| s.rec_str("filter", &filter));
961 tracing::info!(%filter, %full_transactions, "reading block");
962
963 let block = ctx.server.storage.read_block(filter)?;
965 Span::with(|s| {
966 s.record("found", block.is_some());
967 if let Some(ref block) = block {
968 s.rec_str("block_number", &block.number());
969 }
970 });
971 match (block, full_transactions) {
972 (Some(block), true) => {
973 tracing::info!(%filter, "block with full transactions found");
974 Ok(block.to_json_rpc_with_full_transactions())
975 }
976 (Some(block), false) => {
977 tracing::info!(%filter, "block with only hashes found");
978 Ok(block.to_json_rpc_with_transactions_hashes())
979 }
980 (None, _) => {
981 tracing::info!(%filter, "block not found");
982 Ok(JsonValue::Null)
983 }
984 }
985}
986
987fn eth_get_uncle_by_block_hash_and_index(_: Params<'_>, _: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
988 Ok(JsonValue::Null)
989}
990
991fn stratus_get_block_by_timestamp(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
992 let _middleware_enter = ext.enter_middleware_span();
993 let _method_enter = info_span!(
994 "rpc::stratus_getBlockByTimestamp",
995 timestamp = field::Empty,
996 block_number = field::Empty,
997 found = field::Empty
998 )
999 .entered();
1000
1001 let (params, filter) = next_rpc_param::<BlockFilter>(params.sequence())?;
1002 let (_, full_transactions) = next_rpc_param::<bool>(params)?;
1003
1004 match &filter {
1005 BlockFilter::Timestamp { .. } => (),
1006 _ => return Err(StratusError::RPC(RpcError::ParameterInvalid)),
1007 }
1008
1009 eth_get_block_by_selector(filter, full_transactions, ctx)
1010}
1011
1012fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1017 let _middleware_enter = ext.enter_middleware_span();
1019 let _method_enter = info_span!("rpc::eth_getTransactionByHash", tx_hash = field::Empty, found = field::Empty).entered();
1020
1021 let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1023
1024 Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1026 tracing::info!(%tx_hash, "reading transaction");
1027
1028 let tx = ctx.server.storage.read_transaction(tx_hash)?;
1030 Span::with(|s| {
1031 s.record("found", tx.is_some());
1032 });
1033
1034 match tx {
1035 Some(tx) => {
1036 tracing::info!(%tx_hash, "transaction found");
1037 Ok(tx.to_json_rpc_transaction())
1038 }
1039 None => {
1040 tracing::info!(%tx_hash, "transaction not found");
1041 Ok(JsonValue::Null)
1042 }
1043 }
1044}
1045
1046fn rpc_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<Option<TransactionStage>, StratusError> {
1047 let (_, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1049
1050 Span::with(|s| s.rec_str("tx_hash", &tx_hash));
1052 tracing::info!("reading transaction receipt");
1053
1054 let tx = ctx.server.storage.read_transaction(tx_hash)?;
1056 Span::with(|s| {
1057 s.record("found", tx.is_some());
1058 });
1059
1060 Ok(tx)
1061}
1062
1063fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1064 let _middleware_enter = ext.enter_middleware_span();
1066 let _method_enter = info_span!("rpc::eth_getTransactionReceipt", tx_hash = field::Empty, found = field::Empty).entered();
1067
1068 match rpc_get_transaction_receipt(params, ctx)? {
1069 Some(tx) => {
1070 tracing::info!("transaction receipt found");
1071 Ok(tx.to_json_rpc_receipt())
1072 }
1073 None => {
1074 tracing::info!("transaction receipt not found");
1075 Ok(JsonValue::Null)
1076 }
1077 }
1078}
1079
1080fn stratus_get_transaction_result(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1081 let _middleware_enter = ext.enter_middleware_span();
1083 let _method_enter = info_span!("rpc::stratus_get_transaction_result", tx_hash = field::Empty, found = field::Empty).entered();
1084
1085 match rpc_get_transaction_receipt(params, ctx)? {
1086 Some(tx) => {
1087 tracing::info!("transaction receipt found");
1088 Ok(to_json_value(tx.to_result().execution.result))
1089 }
1090 None => {
1091 tracing::info!("transaction receipt not found");
1092 Ok(JsonValue::Null)
1093 }
1094 }
1095}
1096
1097fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1098 let _middleware_enter = ext.enter_middleware_span();
1100 let _method_enter = info_span!("rpc::eth_estimateGas", tx_from = field::Empty, tx_to = field::Empty).entered();
1101
1102 let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;
1104
1105 Span::with(|s| {
1107 s.rec_opt("tx_from", &call.from);
1108 s.rec_opt("tx_to", &call.to);
1109 });
1110 tracing::info!("executing eth_estimateGas");
1111
1112 match ctx.server.executor.execute_local_call(call, PointInTime::Mined) {
1114 Ok(result) if result.is_success() => {
1116 tracing::info!(tx_output = %result.output, "executed eth_estimateGas with success");
1117 let overestimated_gas = (result.gas_used.as_u64()) as f64 * 1.1;
1118 Ok(hex_num(U256::from(overestimated_gas as u64)))
1119 }
1120
1121 Ok(result) => {
1123 tracing::warn!(tx_output = %result.output, "executed eth_estimateGas with failure");
1124 Err(TransactionError::RevertedCall { output: result.output }.into())
1125 }
1126
1127 Err(e) => {
1129 tracing::warn!(reason = ?e, "failed to execute eth_estimateGas");
1130 Err(e)
1131 }
1132 }
1133}
1134
1135fn rpc_call(params: Params<'_>, ctx: Arc<RpcContext>) -> Result<EvmExecution, StratusError> {
1136 let (params, call) = next_rpc_param::<CallInput>(params.sequence())?;
1138 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1139
1140 Span::with(|s| {
1142 s.rec_opt("tx_from", &call.from);
1143 s.rec_opt("tx_to", &call.to);
1144 s.rec_str("filter", &filter);
1145 });
1146 tracing::info!(%filter, "executing eth_call");
1147
1148 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1150 ctx.server.executor.execute_local_call(call, point_in_time)
1151}
1152
1153fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1154 let _middleware_enter = ext.enter_middleware_span();
1156 let _method_enter = info_span!("rpc::eth_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1157
1158 match rpc_call(params, ctx) {
1159 Ok(result) if result.is_success() => {
1161 tracing::info!(tx_output = %result.output, "executed eth_call with success");
1162 Ok(hex_data(result.output))
1163 }
1164 Ok(result) => {
1166 tracing::warn!(tx_output = %result.output, "executed eth_call with failure");
1167 Err(TransactionError::RevertedCall { output: result.output }.into())
1168 }
1169 Err(e) => {
1171 tracing::warn!(reason = ?e, "failed to execute eth_call");
1172 Err(e)
1173 }
1174 }
1175}
1176
1177fn debug_trace_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1178 let _middleware_enter = ext.enter_middleware_span();
1180 let _method_enter = info_span!("rpc::debug_traceTransaction", tx_hash = field::Empty,).entered();
1181
1182 let (params, tx_hash) = next_rpc_param::<Hash>(params.sequence())?;
1183 let (_, opts) = next_rpc_param_or_default::<Option<GethDebugTracingOptions>>(params)?;
1184 let trace_unsuccessful_only = ctx
1185 .server
1186 .rpc_config
1187 .rpc_debug_trace_unsuccessful_only
1188 .as_ref()
1189 .is_some_and(|inner| inner.contains(ext.rpc_client()));
1190
1191 match ctx.server.executor.trace_transaction(tx_hash, opts, trace_unsuccessful_only) {
1192 Ok(result) => {
1193 tracing::info!(?tx_hash, "executed debug_traceTransaction successfully");
1194
1195 let enhanced_response = enhance_trace_with_decoded_info(&result);
1197
1198 Ok(enhanced_response)
1199 }
1200 Err(err) => {
1201 tracing::warn!(?err, "error executing debug_traceTransaction");
1202 Err(err)
1203 }
1204 }
1205}
1206
1207fn stratus_call(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1208 let _middleware_enter = ext.enter_middleware_span();
1210 let _method_enter = info_span!("rpc::stratus_call", tx_from = field::Empty, tx_to = field::Empty, filter = field::Empty).entered();
1211
1212 match rpc_call(params, ctx) {
1213 Ok(result) if result.is_success() => {
1215 tracing::info!(tx_output = %result.output, "executed stratus_call with success");
1216 Ok(hex_data(result.output))
1217 }
1218 Ok(result) => {
1220 tracing::warn!(tx_output = %result.output, "executed stratus_call with failure");
1221 Err(TransactionError::RevertedCallWithReason {
1222 reason: (&result.output).into(),
1223 }
1224 .into())
1225 }
1226 Err(e) => {
1228 tracing::warn!(reason = ?e, "failed to execute stratus_call");
1229 Err(e)
1230 }
1231 }
1232}
1233
1234fn eth_send_raw_transaction(_: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1235 let _middleware_enter = ext.enter_middleware_span();
1237 let _method_enter = info_span!(
1238 "rpc::eth_sendRawTransaction",
1239 tx_hash = field::Empty,
1240 tx_from = field::Empty,
1241 tx_to = field::Empty,
1242 tx_nonce = field::Empty
1243 )
1244 .entered();
1245
1246 let (tx, tx_data) = match (ext.get::<TransactionInput>(), ext.get::<Bytes>()) {
1248 (Some(tx), Some(data)) => (tx.clone(), data.clone()),
1249 _ => {
1250 tracing::error!("failed to execute eth_sendRawTransaction because transaction input is not available");
1251 return Err(RpcError::TransactionInvalid {
1252 decode_error: "transaction input is not available".to_string(),
1253 }
1254 .into());
1255 }
1256 };
1257 let tx_hash = tx.transaction_info.hash;
1258
1259 Span::with(|s| {
1261 s.rec_str("tx_hash", &tx_hash);
1262 s.rec_str("tx_from", &tx.execution_info.signer);
1263 s.rec_opt("tx_to", &tx.execution_info.to);
1264 s.rec_str("tx_nonce", &tx.execution_info.nonce);
1265 });
1266
1267 if not(GlobalState::is_transactions_enabled()) {
1268 tracing::warn!(%tx_hash, "failed to execute eth_sendRawTransaction because transactions are disabled");
1269 return Err(StateError::TransactionsDisabled.into());
1270 }
1271
1272 if tx.transaction_info.tx_type.is_some_and(|t| t > 3) {
1274 tracing::warn!(%tx_hash, "rejecting unsuported transaction type");
1275 return Err(RpcError::ParameterInvalid.into());
1276 }
1277
1278 match GlobalState::get_node_mode() {
1280 NodeMode::Leader | NodeMode::FakeLeader => match ctx.server.executor.execute_local_transaction(tx) {
1281 Ok(_) => Ok(hex_data(tx_hash)),
1282 Err(e) => {
1283 tracing::warn!(reason = ?e, ?tx_hash, "failed to execute eth_sendRawTransaction");
1284 Err(e)
1285 }
1286 },
1287 NodeMode::Follower => match &ctx.server.read_importer() {
1288 Some(importer) => match Handle::current().block_on(importer.forward_to_leader(tx_hash, tx_data, ext.rpc_client())) {
1289 Ok(hash) => Ok(hex_data(hash)),
1290 Err(e) => Err(e),
1291 },
1292 None => {
1293 tracing::error!("unable to forward transaction because consensus is temporarily unavailable for follower node");
1294 Err(ConsensusError::Unavailable.into())
1295 }
1296 },
1297 }
1298}
1299
1300fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<JsonValue, StratusError> {
1305 const MAX_BLOCK_RANGE: u64 = 5_000;
1306
1307 let _middleware_enter = ext.enter_middleware_span();
1309 let _method_enter = info_span!(
1310 "rpc::eth_getLogs",
1311 filter = field::Empty,
1312 filter_from = field::Empty,
1313 filter_to = field::Empty,
1314 filter_range = field::Empty,
1315 filter_event = field::Empty
1316 )
1317 .entered();
1318
1319 let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params.sequence())?;
1321 let mut filter = filter_input.parse(&ctx.server.storage)?;
1322
1323 let to_block = match filter.to_block {
1325 Some(block) => block,
1326 None => {
1327 let block = ctx.server.storage.read_mined_block_number();
1328 filter.to_block = Some(block);
1329 block
1330 }
1331 };
1332 let blocks_in_range = filter.from_block.count_to(to_block);
1333
1334 let event_name = codegen::event_names_from_filter(&filter);
1335
1336 Span::with(|s| {
1338 s.rec_str("filter", &to_json_string(&filter));
1339 s.rec_str("filter_from", &filter.from_block);
1340 s.rec_str("filter_to", &to_block);
1341 s.rec_str("filter_range", &blocks_in_range);
1342 s.rec_str("filter_event", &event_name);
1343 });
1344 tracing::info!(?filter, filter_event = %event_name, "reading logs");
1345
1346 if blocks_in_range > MAX_BLOCK_RANGE {
1348 return Err(RpcError::BlockRangeInvalid {
1349 actual: blocks_in_range,
1350 max: MAX_BLOCK_RANGE,
1351 }
1352 .into());
1353 }
1354
1355 let logs = ctx.server.storage.read_logs(&filter)?;
1357 Ok(JsonValue::Array(logs.into_iter().map(|x| x.to_json_rpc_log()).collect()))
1358}
1359
1360fn eth_accounts(_: Params<'_>, _ctx: &RpcContext, _: &Extensions) -> Result<JsonValue, StratusError> {
1365 Ok(json!([]))
1366}
1367
1368fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1369 let _middleware_enter = ext.enter_middleware_span();
1371 let _method_enter = info_span!("rpc::eth_getTransactionCount", address = field::Empty, filter = field::Empty).entered();
1372
1373 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1375 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1376
1377 Span::with(|s| {
1379 s.rec_str("address", &address);
1380 s.rec_str("address", &filter);
1381 });
1382 tracing::info!(%address, %filter, "reading account nonce");
1383
1384 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1385 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1386 Ok(hex_num(account.nonce))
1387}
1388
1389fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1390 let _middleware_enter = ext.enter_middleware_span();
1392 let _method_enter = info_span!("rpc::eth_getBalance", address = field::Empty, filter = field::Empty).entered();
1393
1394 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1396 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1397
1398 Span::with(|s| {
1400 s.rec_str("address", &address);
1401 s.rec_str("filter", &filter);
1402 });
1403 tracing::info!(%address, %filter, "reading account native balance");
1404
1405 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1407 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1408 Ok(hex_num(account.balance))
1409}
1410
1411fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1412 let _middleware_enter = ext.enter_middleware_span();
1414 let _method_enter = info_span!("rpc::eth_getCode", address = field::Empty, filter = field::Empty).entered();
1415
1416 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1418 let (_, filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1419
1420 Span::with(|s| {
1422 s.rec_str("address", &address);
1423 s.rec_str("filter", &filter);
1424 });
1425
1426 let point_in_time = ctx.server.storage.translate_to_point_in_time(filter)?;
1428 let account = ctx.server.storage.read_account(address, point_in_time, ReadKind::RPC)?;
1429
1430 Ok(account.bytecode.map(|bytecode| hex_data(bytecode.original_bytes())).unwrap_or_else(hex_null))
1431}
1432
1433async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc<RpcContext>, ext: Extensions) -> impl IntoSubscriptionCloseResponse {
1438 let middleware_enter = ext.enter_middleware_span();
1440 let method_span = info_span!("rpc::eth_subscribe", subscription = field::Empty, subscription_event = field::Empty);
1441 drop(middleware_enter);
1442
1443 async move {
1444 let client = ext.rpc_client();
1446 let (params, event) = match next_rpc_param::<String>(params.sequence()) {
1447 Ok((params, event)) => (params, event),
1448 Err(e) => {
1449 pending.reject(StratusError::from(e)).await;
1450 return Ok(());
1451 }
1452 };
1453
1454 if let Err(e) = ctx.subs.check_client_subscriptions(ctx.server.rpc_config.rpc_max_subscriptions, client).await {
1456 pending.reject(e).await;
1457 return Ok(());
1458 }
1459
1460 Span::with(|s| s.rec_str("subscription", &event));
1462 tracing::info!(%event, "subscribing to rpc event");
1463
1464 match event.deref() {
1466 "newPendingTransactions" => {
1467 ctx.subs.add_new_pending_txs_subscription(client, pending.accept().await?).await;
1468 }
1469
1470 "newHeads" => {
1471 ctx.subs.add_new_heads_subscription(client, pending.accept().await?).await;
1472 }
1473
1474 "logs" => {
1475 let (_, filter_input) = next_rpc_param_or_default::<LogFilterInput>(params)?;
1476 let filter = filter_input.parse(&ctx.server.storage)?;
1477
1478 let event_name = codegen::event_names_from_filter(&filter).to_string();
1479 Span::with(|s| s.rec_str("subscription_event", &event_name));
1480 tracing::info!(subscription_event = %event_name, "subscribing to logs with event filter");
1481
1482 ctx.subs.add_logs_subscription(client, filter, pending.accept().await?).await;
1483 }
1484
1485 event => {
1487 pending
1488 .reject(StratusError::from(RpcError::SubscriptionInvalid { event: event.to_string() }))
1489 .await;
1490 }
1491 }
1492
1493 Ok(())
1494 }
1495 .instrument(method_span)
1496 .await
1497}
1498
1499fn eth_get_storage_at(params: Params<'_>, ctx: Arc<RpcContext>, ext: Extensions) -> Result<String, StratusError> {
1504 let _middleware_enter = ext.enter_middleware_span();
1506 let _method_enter = info_span!("rpc::eth_getStorageAt", address = field::Empty, index = field::Empty).entered();
1507
1508 let (params, address) = next_rpc_param::<Address>(params.sequence())?;
1510 let (params, index) = next_rpc_param::<SlotIndex>(params)?;
1511 let (_, block_filter) = next_rpc_param_or_default::<BlockFilter>(params)?;
1512
1513 Span::with(|s| {
1514 s.rec_str("address", &address);
1515 s.rec_str("index", &index);
1516 });
1517
1518 let point_in_time = ctx.server.storage.translate_to_point_in_time(block_filter)?;
1520 let slot = ctx.server.storage.read_slot(address, index, point_in_time, ReadKind::RPC)?;
1521
1522 Ok(hex_num_zero_padded(slot.value.as_u256()))
1524}
1525
1526#[inline(always)]
1531fn hex_data<T: AsRef<[u8]>>(value: T) -> String {
1532 const_hex::encode_prefixed(value)
1533}
1534
1535#[inline(always)]
1536fn hex_num(value: impl Into<U256>) -> String {
1537 format!("{:#x}", value.into())
1538}
1539
1540#[inline(always)]
1541fn hex_num_zero_padded(value: impl Into<U256>) -> String {
1542 let width = 64 + 2; format!("{:#0width$x}", value.into(), width = width)
1544}
1545
1546fn hex_zero() -> String {
1547 "0x0".to_owned()
1548}
1549
1550fn hex_null() -> String {
1551 "0x".to_owned()
1552}
1553
1554fn enhance_trace_with_decoded_info(trace: &GethTrace) -> JsonValue {
1556 match trace {
1557 GethTrace::CallTracer(call_frame) => {
1558 let mut json = to_json_value(call_frame);
1560 enhance_serialized_call_frame(&mut json);
1561 json
1562 }
1563 _ => {
1564 to_json_value(trace)
1566 }
1567 }
1568}
1569
1570fn enhance_serialized_call_frame(json: &mut JsonValue) {
1571 if let Some(json_obj) = json.as_object_mut() {
1572 json_obj
1573 .get("from")
1574 .and_then(|v| v.as_str())
1575 .and_then(|s| s.parse::<Address>().ok())
1576 .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1577 .and_then(|s| json_obj.insert("decodedFromContract".to_string(), json!(s)));
1578
1579 json_obj
1580 .get("to")
1581 .and_then(|v| if v.is_null() { None } else { v.as_str() })
1582 .and_then(|s| s.parse::<Address>().ok())
1583 .and_then(|addr| CONTRACTS.get(addr.as_slice()).map(|s| s.to_string()))
1584 .and_then(|s| json_obj.insert("decodedToContract".to_string(), json!(s)));
1585
1586 json_obj
1587 .get("input")
1588 .and_then(|v| v.as_str())
1589 .and_then(|s| const_hex::decode(s.trim_start_matches("0x")).ok())
1590 .inspect(|input| {
1591 if let Some(signature) = codegen::function_sig_opt(input) {
1592 json_obj.insert("decodedFunctionSignature".to_string(), json!(signature));
1593 }
1594
1595 match decode::decode_input_arguments(input) {
1596 Ok(args) => {
1597 json_obj.insert("decodedFunctionArguments".to_string(), json!(args));
1598 }
1599 Err(DecodeInputError::InvalidAbi { message }) => {
1600 tracing::warn!(
1601 message = %message,
1602 "Invalid ABI stored"
1603 );
1604 }
1605 _ => (),
1606 }
1607 });
1608 if let Some(calls) = json_obj.get_mut("calls").and_then(|v| v.as_array_mut()) {
1609 for call in calls {
1610 enhance_serialized_call_frame(call);
1611 }
1612 }
1613 }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 use alloy_primitives::Address;
1619 use alloy_primitives::Bytes;
1620 use alloy_primitives::U256;
1621 use alloy_rpc_types_trace::geth::CallFrame;
1622
1623 use super::*;
1624
1625 fn create_simple_test_call_structure() -> CallFrame {
1626 let deep_call_1 = CallFrame {
1628 from: "0x562689c910361ae21d12eadafbfca727b3bcbc24".parse::<Address>().unwrap(), to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), input: Bytes::from(
1631 const_hex::decode(
1632 "70a08231000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e", )
1634 .unwrap(),
1635 ),
1636 output: Some(Bytes::from(
1637 const_hex::decode("0000000000000000000000000000000000000000000000000de0b6b3a7640000").unwrap(),
1638 )),
1639 gas: U256::from(5000),
1640 gas_used: U256::from(3000),
1641 value: None,
1642 typ: "STATICCALL".to_string(),
1643 error: None,
1644 revert_reason: None,
1645 calls: Vec::new(),
1646 logs: Vec::new(),
1647 };
1648
1649 let deep_call_2 = CallFrame {
1650 from: "0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap(), to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), input: Bytes::from(
1653 const_hex::decode(
1654 "dd62ed3e000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e1", )
1656 .unwrap(),
1657 ),
1658 output: Some(Bytes::from(
1659 const_hex::decode("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap(),
1660 )),
1661 gas: U256::from(8000),
1662 gas_used: U256::from(5000),
1663 value: None,
1664 typ: "STATICCALL".to_string(),
1665 error: None,
1666 revert_reason: None,
1667 calls: Vec::new(),
1668 logs: Vec::new(),
1669 };
1670
1671 let nested_call_1 = CallFrame {
1673 from: "0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap(), to: Some("0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap()), input: Bytes::from(
1676 const_hex::decode(
1677 "a9059cbb000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1679 .unwrap(),
1680 ),
1681 output: Some(Bytes::from(
1682 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1683 )),
1684 gas: U256::from(21000),
1685 gas_used: U256::from(20000),
1686 value: None,
1687 typ: "CALL".to_string(),
1688 error: None,
1689 revert_reason: None,
1690 calls: vec![deep_call_1],
1691 logs: Vec::new(),
1692 };
1693
1694 let nested_call_2 = CallFrame {
1695 from: "0x6d8da3c039d1d78622f27d4739e1e00b324afaaa".parse::<Address>().unwrap(), to: Some("0x3181ab023a4d4788754258be5a3b8cf3d8276b98".parse::<Address>().unwrap()), input: Bytes::from(
1698 const_hex::decode(
1699 "095ea7b3000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e0000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1701 .unwrap(),
1702 ),
1703 output: Some(Bytes::from(
1704 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1705 )),
1706 gas: U256::from(30000),
1707 gas_used: U256::from(25000),
1708 value: None,
1709 typ: "CALL".to_string(),
1710 error: None,
1711 revert_reason: None,
1712 calls: vec![deep_call_2],
1713 logs: Vec::new(),
1714 };
1715
1716 CallFrame {
1718 from: "0x742d35Cc6634C0532925a3b8D7C9be8813eeb02e".parse::<Address>().unwrap(),
1719 to: Some("0xa9a55a81a4c085ec0c31585aed4cfb09d78dfd53".parse::<Address>().unwrap()), input: Bytes::from(
1721 const_hex::decode(
1722 "23b872dd000000000000000000000000742d35cc6634c0532925a3b8d7c9be8813eeb02e000000000000000000000000a0b86a33e6441366ac2ed2e3a8da88e61c66a5e10000000000000000000000000000000000000000000000000de0b6b3a7640000", )
1724 .unwrap(),
1725 ),
1726 output: Some(Bytes::from(
1727 const_hex::decode("0000000000000000000000000000000000000000000000000000000000000001").unwrap(),
1728 )),
1729 gas: U256::from(100000),
1730 gas_used: U256::from(85000),
1731 value: None,
1732 typ: "CALL".to_string(),
1733 error: None,
1734 revert_reason: None,
1735 calls: vec![nested_call_1, nested_call_2],
1736 logs: Vec::new(),
1737 }
1738 }
1739
1740 #[test]
1741 fn test_contract_name_decoding() {
1742 let test_call = create_simple_test_call_structure();
1744 let geth_trace = GethTrace::CallTracer(test_call);
1745
1746 let result = enhance_trace_with_decoded_info(&geth_trace);
1748 let result_str = serde_json::to_string_pretty(&result).unwrap();
1749
1750 assert!(result_str.contains("Cashier_BRLC_v2"));
1751 assert!(result_str.contains("BRLCToken"));
1752 assert!(result_str.contains("USJIMToken"));
1753 assert!(result_str.contains("Compound_Agent_4"));
1754
1755 assert!(result_str.contains("transfer(address,uint256)"));
1757 assert!(result_str.contains("approve(address,uint256)"));
1758 assert!(result_str.contains("transferFrom(address,address,uint256)"));
1759 assert!(result_str.contains("balanceOf(address)"));
1760 assert!(result_str.contains("allowance(address,address)"));
1761 }
1762}