stratus/eth/storage/
stratus_storage.rs

1use parking_lot::RwLockReadGuard;
2use tracing::Span;
3
4use super::InMemoryTemporaryStorage;
5use super::RocksPermanentStorage;
6use super::StorageCache;
7#[cfg(feature = "dev")]
8use crate::eth::genesis::GenesisConfig;
9use crate::eth::primitives::Account;
10use crate::eth::primitives::Address;
11use crate::eth::primitives::Block;
12use crate::eth::primitives::BlockFilter;
13use crate::eth::primitives::BlockNumber;
14#[cfg(feature = "dev")]
15use crate::eth::primitives::Bytes;
16use crate::eth::primitives::ExecutionChanges;
17use crate::eth::primitives::ExternalBlock;
18use crate::eth::primitives::Hash;
19use crate::eth::primitives::LogFilter;
20use crate::eth::primitives::LogMessage;
21#[cfg(feature = "dev")]
22use crate::eth::primitives::Nonce;
23use crate::eth::primitives::PendingBlock;
24use crate::eth::primitives::PendingBlockHeader;
25use crate::eth::primitives::PointInTime;
26use crate::eth::primitives::Slot;
27use crate::eth::primitives::SlotIndex;
28#[cfg(feature = "dev")]
29use crate::eth::primitives::SlotValue;
30use crate::eth::primitives::StorageError;
31use crate::eth::primitives::TransactionExecution;
32use crate::eth::primitives::TransactionStage;
33#[cfg(feature = "dev")]
34use crate::eth::primitives::Wei;
35#[cfg(feature = "dev")]
36use crate::eth::primitives::test_accounts;
37use crate::eth::storage::ReadKind;
38use crate::eth::storage::TxCount;
39use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
40use crate::ext::not;
41use crate::infra::metrics;
42use crate::infra::metrics::timed;
43use crate::infra::tracing::SpanExt;
44
45mod label {
46    pub(super) const TEMP: &str = "temporary";
47    pub(super) const PERM: &str = "permanent";
48    pub(super) const CACHE: &str = "cache";
49}
50
51/// Proxy that simplifies interaction with permanent and temporary storages.
52///
53/// Additionaly it tracks metrics that are independent of the storage implementation.
54pub struct StratusStorage {
55    temp: InMemoryTemporaryStorage,
56    cache: StorageCache,
57    pub perm: RocksPermanentStorage,
58    // CONTRACT: Always acquire a lock when reading slots or accounts from latest (cache OR perm) and when saving a block
59    transient_state_lock: parking_lot::RwLock<()>,
60    #[cfg(feature = "dev")]
61    perm_config: crate::eth::storage::permanent::PermanentStorageConfig,
62}
63
64impl StratusStorage {
65    /// Creates a new storage with the specified temporary and permanent implementations.
66    pub fn new(
67        temp: InMemoryTemporaryStorage,
68        perm: RocksPermanentStorage,
69        cache: StorageCache,
70        #[cfg(feature = "dev")] perm_config: crate::eth::storage::permanent::PermanentStorageConfig,
71    ) -> Result<Self, StorageError> {
72        let this = Self {
73            temp,
74            cache,
75            perm,
76            transient_state_lock: parking_lot::RwLock::new(()),
77            #[cfg(feature = "dev")]
78            perm_config,
79        };
80
81        // create genesis block and accounts if necessary
82        #[cfg(feature = "dev")]
83        if !this.has_genesis()? {
84            this.reset_to_genesis()?;
85        }
86
87        Ok(this)
88    }
89
90    /// Returns whether the genesis block exists
91    pub fn has_genesis(&self) -> Result<bool, StorageError> {
92        self.perm.has_genesis()
93    }
94
95    /// Clears the storage cache.
96    pub fn clear_cache(&self) {
97        tracing::info!("clearing storage cache");
98        self.cache.clear();
99    }
100
101    #[cfg(test)]
102    pub fn new_test() -> Result<Self, StorageError> {
103        use tempfile::tempdir;
104
105        use super::cache::CacheConfig;
106
107        let temp = InMemoryTemporaryStorage::new(0.into());
108
109        // Create a temporary directory for RocksDB
110        let rocks_dir = tempdir().expect("Failed to create temporary directory for tests");
111        let rocks_path_prefix = rocks_dir.path().to_str().unwrap().to_string();
112
113        let perm = RocksPermanentStorage::new(
114            Some(rocks_path_prefix.clone()),
115            std::time::Duration::from_secs(240),
116            super::permanent::RocksCfCacheConfig::default(),
117            true,
118            None,
119            1024,
120        )
121        .expect("Failed to create RocksPermanentStorage for tests");
122
123        let cache = CacheConfig {
124            slot_cache_capacity: 100000,
125            account_cache_capacity: 20000,
126            account_history_cache_capacity: 20000,
127            slot_history_cache_capacity: 100000,
128        }
129        .init();
130
131        Self::new(
132            temp,
133            perm,
134            cache,
135            #[cfg(feature = "dev")]
136            super::permanent::PermanentStorageConfig {
137                rocks_path_prefix: Some(rocks_path_prefix),
138                rocks_shutdown_timeout: std::time::Duration::from_secs(240),
139                rocks_cf_cache: super::permanent::RocksCfCacheConfig::default(),
140                rocks_disable_sync_write: false,
141                rocks_cf_size_metrics_interval: None,
142                genesis_file: crate::config::GenesisFileConfig::default(),
143                rocks_file_descriptors_limit: 1024,
144            },
145        )
146    }
147
148    pub fn read_block_number_to_resume_import(&self) -> Result<BlockNumber, StorageError> {
149        #[cfg(feature = "tracing")]
150        let _span = tracing::info_span!("storage::read_block_number_to_resume_import").entered();
151
152        let number = self.read_pending_block_header().0.number;
153        tracing::info!(?number, "got block number to resume import");
154
155        Ok(number)
156    }
157
158    pub fn read_pending_block_header(&self) -> (PendingBlockHeader, TxCount) {
159        #[cfg(feature = "tracing")]
160        let _span = tracing::info_span!("storage::read_pending_block_number").entered();
161        tracing::debug!(storage = %label::TEMP, "reading pending block number");
162
163        timed(|| self.temp.read_pending_block_header()).with(|m| {
164            metrics::inc_storage_read_pending_block_number(m.elapsed, label::TEMP, true);
165        })
166    }
167
168    pub fn read_mined_block_number(&self) -> BlockNumber {
169        #[cfg(feature = "tracing")]
170        let _span = tracing::info_span!("storage::read_mined_block_number").entered();
171        tracing::debug!(storage = %label::PERM, "reading mined block number");
172
173        timed(|| self.perm.read_mined_block_number()).with(|m| {
174            metrics::inc_storage_read_mined_block_number(m.elapsed, label::PERM, true);
175        })
176    }
177
178    pub fn set_pending_from_external(&self, block: &ExternalBlock) {
179        self.temp.set_pending_from_external(block);
180    }
181
182    pub fn set_mined_block_number(&self, block_number: BlockNumber) {
183        #[cfg(feature = "tracing")]
184        let _span = tracing::info_span!("storage::set_mined_block_number", %block_number).entered();
185        tracing::debug!(storage = %label::PERM, %block_number, "setting mined block number");
186
187        timed(|| self.perm.set_mined_block_number(block_number)).with(|m| {
188            metrics::inc_storage_set_mined_block_number(m.elapsed, label::PERM, true);
189        });
190    }
191
192    // -------------------------------------------------------------------------
193    // Accounts and slots
194    // -------------------------------------------------------------------------
195
196    pub fn save_accounts(&self, accounts: Vec<Account>) -> Result<(), StorageError> {
197        #[cfg(feature = "tracing")]
198        let _span = tracing::info_span!("storage::save_accounts").entered();
199
200        // keep only accounts that does not exist in permanent storage
201        let mut missing_accounts = Vec::new();
202        for account in accounts {
203            let perm_account = self.perm.read_account(account.address, PointInTime::Mined)?;
204            if perm_account.is_none() {
205                missing_accounts.push(account);
206            }
207        }
208
209        tracing::debug!(storage = %label::PERM, accounts = ?missing_accounts, "saving initial accounts");
210        timed(|| self.perm.save_accounts(missing_accounts)).with(|m| {
211            metrics::inc_storage_save_accounts(m.elapsed, label::PERM, m.result.is_ok());
212            if let Err(ref e) = m.result {
213                tracing::error!(reason = ?e, "failed to save accounts");
214            }
215        })
216    }
217
218    fn _read_account_pending_cache(&self, address: Address) -> Option<Account> {
219        timed(|| self.cache.get_account(address)).with(|m| {
220            if m.result.is_some() {
221                tracing::debug!(storage = %label::CACHE, %address, "account found in cache");
222                metrics::inc_storage_read_account(m.elapsed, label::CACHE, PointInTime::Pending);
223            }
224        })
225    }
226
227    fn _read_account_latest_cache(&self, address: Address) -> Option<Account> {
228        timed(|| self.cache.get_account_latest(address)).with(|m| {
229            if m.result.is_some() {
230                tracing::debug!(storage = %label::CACHE, %address, "account found in cache");
231                metrics::inc_storage_read_account(m.elapsed, label::CACHE, PointInTime::Mined);
232            }
233        })
234    }
235
236    fn _read_account_temp(&self, address: Address, kind: ReadKind) -> Result<Option<Account>, StorageError> {
237        tracing::debug!(storage = %label::TEMP, %address, "reading account");
238        timed(|| self.temp.read_account(address, kind)).with(|m| {
239            if m.result.as_ref().is_ok_and(|opt| opt.is_some()) {
240                metrics::inc_storage_read_account(m.elapsed, label::TEMP, PointInTime::Pending);
241            }
242            if let Err(ref e) = m.result {
243                tracing::error!(reason = ?e, "failed to read account from temporary storage");
244            }
245        })
246    }
247
248    fn _read_account_perm(&self, address: Address, point_in_time: PointInTime) -> Result<Account, StorageError> {
249        tracing::debug!(storage = %label::PERM, %address, "reading account");
250        let account = timed(|| self.perm.read_account(address, point_in_time)).with(|m| {
251            if m.result.as_ref().is_ok_and(|opt| opt.is_some()) {
252                metrics::inc_storage_read_account(m.elapsed, label::PERM, point_in_time);
253            }
254            if let Err(ref e) = m.result {
255                tracing::error!(reason = ?e, "failed to read account from permanent storage");
256            }
257        })?;
258        Ok(match account {
259            Some(account) => {
260                tracing::debug!(storage = %label::PERM, %address, ?account, "account found in permanent storage");
261                account
262            }
263            None => {
264                tracing::debug!(storage = %label::PERM, %address, "account not found, assuming default value");
265                Account::new_empty(address)
266            }
267        })
268    }
269
270    // For calls, this function returns a guard if latest is safe to read
271    fn _latest_is_valid(&self, point_in_time: PointInTime, kind: ReadKind) -> (bool, Option<RwLockReadGuard<'_, ()>>) {
272        if matches!(point_in_time, PointInTime::MinedPast(_)) {
273            return (false, None);
274        }
275        match kind {
276            ReadKind::Call((block_number, _)) => {
277                let guard = self.transient_state_lock.read();
278                // Check if the provided block number is less than or equal to the mined block number
279                let is_valid = block_number <= self.read_mined_block_number();
280                (is_valid, (is_valid).then_some(guard))
281            }
282            _ => (true, None),
283        }
284    }
285
286    pub fn read_account(&self, address: Address, mut point_in_time: PointInTime, kind: ReadKind) -> Result<Account, StorageError> {
287        #[cfg(feature = "tracing")]
288        let _span = tracing::debug_span!("storage::read_account", %address, %point_in_time).entered();
289
290        let (account, found_in_perm) = 'query: {
291            if point_in_time == PointInTime::Pending {
292                if matches!(kind, ReadKind::Transaction)
293                    && let Some(account) = self._read_account_pending_cache(address)
294                {
295                    return Ok(account);
296                }
297
298                if let Some(account) = self._read_account_temp(address, kind)? {
299                    tracing::debug!(storage = %label::TEMP, %address, ?account, "account found in temporary storage");
300                    break 'query (account, false);
301                }
302            }
303
304            let (is_valid, guard) = self._latest_is_valid(point_in_time, kind);
305            if is_valid {
306                if let Some(account) = self._read_account_latest_cache(address) {
307                    return Ok(account);
308                }
309            } else if let ReadKind::Call((block_number, _)) = kind
310                && !matches!(point_in_time, PointInTime::MinedPast(_))
311            {
312                point_in_time = PointInTime::MinedPast(block_number);
313            }
314
315            // always read from perm if necessary
316            let ret = (self._read_account_perm(address, point_in_time)?, true);
317            if let Some(inner) = guard {
318                RwLockReadGuard::unlock_fair(inner);
319            }
320            ret
321        };
322
323        match (point_in_time, found_in_perm) {
324            // Pending accounts found in the permanent storage (or not found in any storage) are always mined already
325            (PointInTime::Pending, true) => {
326                self.cache.cache_account_if_missing(account.clone());
327                self.cache.cache_account_latest_if_missing(address, account.clone());
328            }
329            (PointInTime::Pending, false) => {
330                self.cache.cache_account_if_missing(account.clone());
331            }
332            (PointInTime::Mined, _) => {
333                self.cache.cache_account_latest_if_missing(address, account.clone());
334            }
335            _ => {}
336        }
337        Ok(account)
338    }
339
340    fn _read_slot_pending_cache(&self, address: Address, index: SlotIndex) -> Option<Slot> {
341        timed(|| self.cache.get_slot(address, index)).with(|m| {
342            if m.result.is_some() {
343                tracing::debug!(storage = %label::CACHE, %address, slot = ?m.result, "slot found in cache");
344                metrics::inc_storage_read_slot(m.elapsed, label::CACHE, PointInTime::Pending);
345            }
346        })
347    }
348
349    fn _read_slot_latest_cache(&self, address: Address, index: SlotIndex) -> Option<Slot> {
350        timed(|| self.cache.get_slot_latest(address, index)).with(|m| {
351            if m.result.is_some() {
352                tracing::debug!(storage = %label::CACHE, %address, slot = ?m.result, "slot found in cache");
353                metrics::inc_storage_read_slot(m.elapsed, label::CACHE, PointInTime::Mined);
354            }
355        })
356    }
357
358    fn _read_slot_temp(&self, address: Address, index: SlotIndex, kind: ReadKind) -> Result<Option<Slot>, StorageError> {
359        tracing::debug!(storage = %label::TEMP, %address, %index, "reading slot");
360        timed(|| self.temp.read_slot(address, index, kind)).with(|m| {
361            if m.result.as_ref().is_ok_and(|opt| opt.is_some()) {
362                metrics::inc_storage_read_slot(m.elapsed, label::TEMP, PointInTime::Pending);
363            }
364            if let Err(ref e) = m.result {
365                tracing::error!(reason = ?e, "failed to read slot from temporary storage");
366            }
367        })
368    }
369
370    fn _read_slot_perm(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> Result<Slot, StorageError> {
371        tracing::debug!(storage = %label::PERM, %address, %index, %point_in_time, "reading slot");
372        let slot = timed(|| self.perm.read_slot(address, index, point_in_time)).with(|m| {
373            if m.result.as_ref().is_ok_and(|opt| opt.is_some()) {
374                metrics::inc_storage_read_slot(m.elapsed, label::PERM, point_in_time);
375            }
376            if let Err(ref e) = m.result {
377                tracing::error!(reason = ?e, "failed to read slot from permanent storage");
378            }
379        })?;
380        Ok(match slot {
381            Some(slot) => {
382                tracing::debug!(storage = %label::PERM, %address, %index, ?slot, "slot found in permanent storage");
383                slot
384            }
385            None => {
386                tracing::debug!(storage = %label::PERM, %address, %index, "slot not found, assuming default value");
387                Slot::new_empty(index)
388            }
389        })
390    }
391
392    pub fn read_slot(&self, address: Address, index: SlotIndex, mut point_in_time: PointInTime, kind: ReadKind) -> Result<Slot, StorageError> {
393        #[cfg(feature = "tracing")]
394        let _span = tracing::debug_span!("storage::read_slot", %address, %index, %point_in_time).entered();
395
396        let (slot, found_in_perm) = 'query: {
397            if point_in_time == PointInTime::Pending {
398                // tx can read from pending cache
399                if matches!(kind, ReadKind::Transaction)
400                    && let Some(slot) = self._read_slot_pending_cache(address, index)
401                {
402                    return Ok(slot);
403                }
404
405                if let Some(slot) = self._read_slot_temp(address, index, kind)? {
406                    tracing::debug!(storage = %label::TEMP, %address, %index, value = %slot.value, "slot found in temporary storage");
407                    break 'query (slot, false);
408                }
409            }
410
411            let (is_valid, guard) = self._latest_is_valid(point_in_time, kind);
412            if is_valid {
413                if let Some(slot) = self._read_slot_latest_cache(address, index) {
414                    return Ok(slot);
415                }
416            } else if let ReadKind::Call((block_number, _)) = kind
417                && !matches!(point_in_time, PointInTime::MinedPast(_))
418            {
419                point_in_time = PointInTime::MinedPast(block_number);
420            }
421
422            // always read from perm if necessary
423            let ret = (self._read_slot_perm(address, index, point_in_time)?, true);
424            if let Some(inner) = guard {
425                RwLockReadGuard::unlock_fair(inner);
426            }
427            ret
428        };
429
430        match (point_in_time, found_in_perm) {
431            // Pending slots found in the permanent storage (or not found in any storage) are always mined already
432            (PointInTime::Pending, true) => {
433                self.cache.cache_slot_if_missing(address, slot);
434                self.cache.cache_slot_latest_if_missing(address, slot);
435            }
436            (PointInTime::Pending, false) => {
437                self.cache.cache_slot_if_missing(address, slot);
438            }
439            (PointInTime::Mined, _) => {
440                self.cache.cache_slot_latest_if_missing(address, slot);
441            }
442            _ => {}
443        }
444        Ok(slot)
445    }
446
447    // -------------------------------------------------------------------------
448    // Blocks
449    // -------------------------------------------------------------------------
450
451    pub fn save_execution(&self, tx: TransactionExecution) -> Result<(), StorageError> {
452        let changes = tx.result.execution.changes.clone();
453
454        #[cfg(feature = "tracing")]
455        let _span = tracing::info_span!("storage::save_execution", tx_hash = %tx.info.hash).entered();
456        tracing::debug!(storage = %label::TEMP, tx_hash = %tx.info.hash, changes = ?tx.result.execution.changes, "saving execution");
457
458        // Log warning if a failed transaction has slot changes
459        if !tx.result.execution.result.is_success() {
460            let total_slot_changes: usize = changes.slots.len();
461
462            if total_slot_changes > 0 {
463                tracing::warn!(?tx, "Failed transaction contains {} slot change(s)", total_slot_changes);
464            }
465        }
466
467        timed(|| self.temp.save_pending_execution(tx))
468            .with(|m| {
469                metrics::inc_storage_save_execution(m.elapsed, label::TEMP, m.result.is_ok());
470                match &m.result {
471                    Err(StorageError::EvmInputMismatch { .. }) => {
472                        tracing::warn!("failed to save execution due to mismatch, will retry");
473                    }
474                    Err(e) => tracing::error!(reason = ?e, "failed to save execution"),
475                    _ => (),
476                }
477            })
478            .inspect(|_| self.cache.cache_account_and_slots_from_changes(changes))
479    }
480
481    /// Retrieves pending transactions being mined.
482    pub fn pending_transactions(&self) -> Vec<TransactionExecution> {
483        self.temp.read_pending_executions()
484    }
485
486    pub fn finish_pending_block(&self) -> Result<(PendingBlock, ExecutionChanges), StorageError> {
487        #[cfg(feature = "tracing")]
488        let _span = tracing::info_span!("storage::finish_pending_block", block_number = tracing::field::Empty).entered();
489        tracing::debug!(storage = %label::TEMP, "finishing pending block");
490
491        let result = timed(|| self.temp.finish_pending_block()).with(|m| {
492            metrics::inc_storage_finish_pending_block(m.elapsed, label::TEMP, m.result.is_ok());
493            if let Err(ref e) = m.result {
494                tracing::error!(reason = ?e, "failed to finish pending block");
495            }
496        });
497
498        if let Ok((ref block, _)) = result {
499            Span::with(|s| s.rec_str("block_number", &block.header.number));
500        }
501
502        result
503    }
504
505    pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, changes: ExecutionChanges) -> Result<(), StorageError> {
506        let block_number = block.number();
507
508        #[cfg(feature = "tracing")]
509        let _span = tracing::info_span!("storage::save_genesis_block", block_number = %block_number).entered();
510        tracing::debug!(storage = %label::PERM, "saving genesis block");
511
512        timed(|| self.perm.save_genesis_block(block, accounts, changes)).with(|m| {
513            metrics::inc_storage_save_block(m.elapsed, label::PERM, "genesis", "genesis", m.result.is_ok());
514            if let Err(ref e) = m.result {
515                tracing::error!(reason = ?e, "failed to save genesis block");
516            }
517        })
518    }
519
520    pub fn save_block(&self, block: Block, changes: ExecutionChanges) -> Result<(), StorageError> {
521        let block_number = block.number();
522
523        #[cfg(feature = "tracing")]
524        let _span = tracing::info_span!("storage::save_block", block_number = %block.number()).entered();
525        tracing::debug!(storage = %label::PERM, block_number = %block_number, transactions_len = %block.transactions.len(), ?changes, "saving block");
526
527        // check mined number
528        let mined_number = self.read_mined_block_number();
529        if not(block_number.is_zero()) && block_number != mined_number.next_block_number() {
530            tracing::error!(%block_number, %mined_number, "failed to save block because mismatch with mined block number");
531            return Err(StorageError::MinedNumberConflict {
532                new: block_number,
533                mined: mined_number,
534            });
535        }
536
537        // check pending number
538        let pending_header = self.read_pending_block_header();
539        if block_number >= pending_header.0.number {
540            tracing::error!(%block_number, pending_number = %pending_header.0.number, "failed to save block because mismatch with pending block number");
541            return Err(StorageError::PendingNumberConflict {
542                new: block_number,
543                pending: pending_header.0.number,
544            });
545        }
546
547        // check mined block
548        let existing_block = self.read_block(BlockFilter::Number(block_number))?;
549        if existing_block.is_some() {
550            tracing::error!(%block_number, %mined_number, "failed to save block because block with the same number already exists in the permanent storage");
551            return Err(StorageError::BlockConflict { number: block_number });
552        }
553
554        // save block
555        let (label_size_by_tx, label_size_by_gas) = (block.label_size_by_transactions(), block.label_size_by_gas());
556
557        timed(|| {
558            let guard = self.transient_state_lock.write();
559            self.perm.save_block(block, changes.clone())?;
560            self.cache.cache_account_and_slots_latest_from_changes(changes);
561            drop(guard);
562            Ok(())
563        })
564        .with(|m| {
565            metrics::inc_storage_save_block(m.elapsed, label::PERM, label_size_by_tx, label_size_by_gas, m.result.is_ok());
566            if let Err(ref e) = m.result {
567                tracing::error!(reason = ?e, %block_number, "failed to save block");
568            }
569        })?;
570
571        self.set_mined_block_number(block_number);
572
573        Ok(())
574    }
575
576    pub fn read_block(&self, filter: BlockFilter) -> Result<Option<Block>, StorageError> {
577        #[cfg(feature = "tracing")]
578        let _span = tracing::info_span!("storage::read_block", %filter).entered();
579        tracing::debug!(storage = %label::PERM, ?filter, "reading block");
580
581        timed(|| self.perm.read_block(filter)).with(|m| {
582            metrics::inc_storage_read_block(m.elapsed, label::PERM, m.result.is_ok());
583            if let Err(ref e) = m.result {
584                tracing::error!(reason = ?e, "failed to read block");
585            }
586        })
587    }
588
589    pub fn read_block_with_changes(&self, filter: BlockFilter) -> Result<Option<(Block, BlockChangesRocksdb)>, StorageError> {
590        #[cfg(feature = "tracing")]
591        let _span = tracing::info_span!("storage::read_block_with_changes", %filter).entered();
592        tracing::debug!(storage = %label::PERM, ?filter, "reading block with changes");
593
594        timed(|| self.perm.read_block_with_changes(filter)).with(|m| {
595            metrics::inc_storage_read_block_with_changes(m.elapsed, label::PERM, m.result.is_ok());
596            if let Err(ref e) = m.result {
597                tracing::error!(reason = ?e, "failed to read block with changes");
598            }
599        })
600    }
601
602    pub fn read_transaction(&self, tx_hash: Hash) -> Result<Option<TransactionStage>, StorageError> {
603        #[cfg(feature = "tracing")]
604        let _span = tracing::info_span!("storage::read_transaction", %tx_hash).entered();
605
606        // read from temp
607        tracing::debug!(storage = %label::TEMP, %tx_hash, "reading transaction");
608        let temp_tx = timed(|| self.temp.read_pending_execution(tx_hash)).with(|m| {
609            metrics::inc_storage_read_transaction(m.elapsed, label::TEMP, m.result.is_ok());
610            if let Err(ref e) = m.result {
611                tracing::error!(reason = ?e, "failed to read transaction from temporary storage");
612            }
613        })?;
614        if let Some(tx_temp) = temp_tx {
615            return Ok(Some(TransactionStage::Pending(tx_temp)));
616        }
617
618        // read from perm
619        tracing::debug!(storage = %label::PERM, %tx_hash, "reading transaction");
620        let perm_tx = timed(|| self.perm.read_transaction(tx_hash)).with(|m| {
621            metrics::inc_storage_read_transaction(m.elapsed, label::PERM, m.result.is_ok());
622            if let Err(ref e) = m.result {
623                tracing::error!(reason = ?e, "failed to read transaction from permanent storage");
624            }
625        })?;
626        Ok(perm_tx.map(TransactionStage::Mined))
627    }
628
629    pub fn read_logs(&self, filter: &LogFilter) -> Result<Vec<LogMessage>, StorageError> {
630        #[cfg(feature = "tracing")]
631        let _span = tracing::info_span!("storage::read_logs", ?filter).entered();
632        tracing::debug!(storage = %label::PERM, ?filter, "reading logs");
633
634        timed(|| self.perm.read_logs(filter)).with(|m| {
635            metrics::inc_storage_read_logs(m.elapsed, label::PERM, m.result.is_ok());
636            if let Err(ref e) = m.result {
637                tracing::error!(reason = ?e, "failed to read logs");
638            }
639        })
640    }
641
642    // -------------------------------------------------------------------------
643    // Direct state manipulation (for testing)
644    // -------------------------------------------------------------------------
645
646    #[cfg(feature = "dev")]
647    pub fn set_storage_at(&self, address: Address, index: SlotIndex, value: SlotValue) -> Result<(), StorageError> {
648        // Create a slot with the given index and value
649        let slot = Slot::new(index, value);
650        self.cache.clear();
651
652        // Update permanent storage
653        self.perm.save_slot(address, slot)?;
654
655        // Update temporary storage
656        self.temp.save_slot(address, slot)?;
657
658        Ok(())
659    }
660
661    #[cfg(feature = "dev")]
662    pub fn set_nonce(&self, address: Address, nonce: Nonce) -> Result<(), StorageError> {
663        self.cache.clear();
664
665        // Update permanent storage
666        self.perm.save_account_nonce(address, nonce)?;
667
668        // Update temporary storage
669        self.temp.save_account_nonce(address, nonce)?;
670
671        Ok(())
672    }
673
674    #[cfg(feature = "dev")]
675    pub fn set_balance(&self, address: Address, balance: Wei) -> Result<(), StorageError> {
676        self.cache.clear();
677
678        // Update permanent storage
679        self.perm.save_account_balance(address, balance)?;
680
681        // Update temporary storage
682        self.temp.save_account_balance(address, balance)?;
683
684        Ok(())
685    }
686
687    #[cfg(feature = "dev")]
688    pub fn set_code(&self, address: Address, code: Bytes) -> Result<(), StorageError> {
689        self.cache.clear();
690        // Update permanent storage
691        self.perm.save_account_code(address, code.clone())?;
692
693        // Update temporary storage
694        self.temp.save_account_code(address, code)?;
695
696        Ok(())
697    }
698
699    // -------------------------------------------------------------------------
700    // General state
701    // -------------------------------------------------------------------------
702
703    #[cfg(feature = "dev")]
704    /// Resets the storage to the genesis state.
705    /// If a genesis.json file is available, it will be used.
706    /// Otherwise, it will use the default genesis configuration.
707    pub fn reset_to_genesis(&self) -> Result<(), StorageError> {
708        tracing::info!("resetting storage to genesis state");
709
710        self.cache.clear();
711
712        #[cfg(feature = "tracing")]
713        let _span = tracing::info_span!("storage::reset").entered();
714
715        // reset perm
716        tracing::debug!(storage = %label::PERM, "resetting permanent storage");
717        timed(|| self.perm.reset()).with(|m| {
718            metrics::inc_storage_reset(m.elapsed, label::PERM, m.result.is_ok());
719            if let Err(ref e) = m.result {
720                tracing::error!(reason = ?e, "failed to reset permanent storage");
721            }
722        })?;
723
724        // reset temp
725        tracing::debug!(storage = %label::TEMP, "reseting temporary storage");
726        timed(|| self.temp.reset()).with(|m| {
727            metrics::inc_storage_reset(m.elapsed, label::TEMP, m.result.is_ok());
728            if let Err(ref e) = m.result {
729                tracing::error!(reason = ?e, "failed to reset temporary storage");
730            }
731        })?;
732
733        // Try to load genesis block from the genesis file or use default
734        let genesis_block = if let Some(genesis_path) = &self.perm_config.genesis_file.genesis_path {
735            if std::path::Path::new(genesis_path).exists() {
736                match GenesisConfig::load_from_file(genesis_path) {
737                    Ok(genesis_config) => match genesis_config.to_genesis_block() {
738                        Ok(block) => {
739                            tracing::info!("using genesis block from file: {:?}", genesis_path);
740                            block
741                        }
742                        Err(e) => {
743                            tracing::error!("failed to create genesis block from file: {:?}", e);
744                            Block::genesis()
745                        }
746                    },
747                    Err(e) => {
748                        tracing::error!("failed to load genesis file: {:?}", e);
749                        Block::genesis()
750                    }
751                }
752            } else {
753                tracing::error!("genesis file not found at: {:?}", genesis_path);
754                Block::genesis()
755            }
756        } else {
757            tracing::info!("using default genesis block");
758            Block::genesis()
759        };
760        // Try to load genesis.json from the path specified in GenesisFileConfig
761        // or use default genesis configuration
762        let (genesis_accounts, genesis_slots) = if let Some(genesis_path) = &self.perm_config.genesis_file.genesis_path {
763            if std::path::Path::new(genesis_path).exists() {
764                tracing::info!("found genesis file at: {:?}", genesis_path);
765                match GenesisConfig::load_from_file(genesis_path) {
766                    Ok(genesis) => match genesis.to_stratus_accounts_and_slots() {
767                        Ok((accounts, slots)) => {
768                            tracing::info!("loaded {} accounts from genesis.json", accounts.len());
769                            if !slots.is_empty() {
770                                tracing::info!("loaded {} storage slots from genesis.json", slots.len());
771                            }
772                            (accounts, slots)
773                        }
774                        Err(e) => {
775                            tracing::error!("failed to convert genesis accounts: {:?}", e);
776                            // Fallback to test accounts
777                            (test_accounts(), vec![])
778                        }
779                    },
780                    Err(e) => {
781                        tracing::error!("failed to load genesis file: {:?}", e);
782                        // Fallback to test accounts
783                        (test_accounts(), vec![])
784                    }
785                }
786            } else {
787                tracing::error!("genesis file not found at: {:?}", genesis_path);
788                // Fallback to test accounts
789                (test_accounts(), vec![])
790            }
791        } else {
792            // No genesis path specified, use default genesis configuration
793            match GenesisConfig::default().to_stratus_accounts_and_slots() {
794                Ok((accounts, slots)) => {
795                    tracing::info!("using default genesis configuration with {} accounts", accounts.len());
796                    (accounts, slots)
797                }
798                Err(e) => {
799                    tracing::error!("failed to convert default genesis accounts: {:?}", e);
800                    // Fallback to test accounts
801                    (test_accounts(), vec![])
802                }
803            }
804        };
805        // Save the genesis block
806        self.save_block(genesis_block, ExecutionChanges::default())?;
807
808        // accounts
809        self.save_accounts(genesis_accounts)?;
810
811        // Save slots if any
812        if !genesis_slots.is_empty() {
813            tracing::info!("saving {} storage slots from genesis", genesis_slots.len());
814            for (address, slot) in genesis_slots {
815                self.perm.save_slot(address, slot)?;
816            }
817        }
818
819        // block number
820        self.set_mined_block_number(BlockNumber::ZERO);
821
822        Ok(())
823    }
824
825    // -------------------------------------------------------------------------
826    // Utils
827    // -------------------------------------------------------------------------
828
829    /// Translates a block filter to a specific storage point-in-time indicator.
830    pub fn translate_to_point_in_time(&self, block_filter: BlockFilter) -> Result<PointInTime, StorageError> {
831        match block_filter {
832            BlockFilter::Pending => Ok(PointInTime::Pending),
833            BlockFilter::Latest => Ok(PointInTime::Mined),
834            BlockFilter::Earliest => Ok(PointInTime::MinedPast(BlockNumber::ZERO)),
835            BlockFilter::Number(number) => Ok(PointInTime::MinedPast(number)),
836            BlockFilter::Hash(_) | BlockFilter::Timestamp(_) => match self.read_block(block_filter)? {
837                Some(block) => Ok(PointInTime::MinedPast(block.header.number)),
838                None => Err(StorageError::BlockNotFound { filter: block_filter }),
839            },
840        }
841    }
842}