stratus/eth/storage/permanent/rocks/
rocks_state.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::sync::atomic::AtomicU32;
6use std::time::Duration;
7use std::time::Instant;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use rocksdb::DB;
13use rocksdb::Direction;
14use rocksdb::Options;
15use rocksdb::WaitForCompactOptions;
16use rocksdb::WriteBatch;
17use rocksdb::WriteOptions;
18use serde::Deserialize;
19use serde::Serialize;
20use sugars::btmap;
21
22use super::cf_versions::CfAccountSlotsHistoryValue;
23use super::cf_versions::CfAccountSlotsValue;
24use super::cf_versions::CfAccountsHistoryValue;
25use super::cf_versions::CfAccountsValue;
26use super::cf_versions::CfBlocksByHashValue;
27use super::cf_versions::CfBlocksByNumberValue;
28use super::cf_versions::CfTransactionsValue;
29use super::rocks_cf::RocksCfRef;
30use super::rocks_config::CacheSetting;
31use super::rocks_config::DbConfig;
32use super::rocks_db::create_or_open_db;
33use super::types::AccountRocksdb;
34use super::types::AddressRocksdb;
35use super::types::BlockNumberRocksdb;
36use super::types::HashRocksdb;
37use super::types::SlotIndexRocksdb;
38use super::types::SlotValueRocksdb;
39use crate::eth::primitives::Account;
40use crate::eth::primitives::Address;
41use crate::eth::primitives::Block;
42use crate::eth::primitives::BlockFilter;
43use crate::eth::primitives::BlockNumber;
44#[cfg(feature = "dev")]
45use crate::eth::primitives::Bytes;
46use crate::eth::primitives::ExecutionChanges;
47use crate::eth::primitives::Hash;
48use crate::eth::primitives::LogFilter;
49use crate::eth::primitives::LogMined;
50#[cfg(feature = "dev")]
51use crate::eth::primitives::Nonce;
52use crate::eth::primitives::PointInTime;
53use crate::eth::primitives::Slot;
54use crate::eth::primitives::SlotIndex;
55use crate::eth::primitives::TransactionMined;
56#[cfg(feature = "dev")]
57use crate::eth::primitives::Wei;
58use crate::eth::storage::permanent::rocks::SerializeDeserializeWithContext;
59use crate::eth::storage::permanent::rocks::cf_versions::CfBlockChangesValue;
60use crate::eth::storage::permanent::rocks::types::AccountChangesRocksdb;
61use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
62use crate::ext::OptionExt;
63#[cfg(feature = "metrics")]
64use crate::infra::metrics;
65use crate::log_and_err;
66use crate::utils::GIGABYTE;
67
68cfg_if::cfg_if! {
69    if #[cfg(feature = "rocks_metrics")] {
70        use parking_lot::Mutex;
71
72        use rocksdb::statistics::Histogram;
73        use rocksdb::statistics::Ticker;
74        use std::collections::HashMap;
75
76
77        use crate::infra::metrics::{Count, HistogramInt, Sum};
78    }
79}
80
81pub fn generate_cf_options_map(cache_multiplier: Option<f32>) -> BTreeMap<&'static str, Options> {
82    let cache_multiplier = cache_multiplier.unwrap_or(1.0);
83
84    // multiplies the given size in GBs by the cache multiplier
85    let cached_in_gigs_and_multiplied = |size_in_gbs: u64| -> CacheSetting {
86        let size = (size_in_gbs as f32) * cache_multiplier;
87        let size = GIGABYTE * size as usize;
88        CacheSetting::Enabled(size)
89    };
90
91    btmap! {
92        "accounts" => DbConfig::OptimizedPointLookUp.to_options(cached_in_gigs_and_multiplied(10)),
93        "accounts_history" => DbConfig::HistoricalData.to_options(cached_in_gigs_and_multiplied(2)),
94        "account_slots" => DbConfig::OptimizedPointLookUp.to_options(cached_in_gigs_and_multiplied(30)),
95        "account_slots_history" => DbConfig::HistoricalData.to_options(cached_in_gigs_and_multiplied(2)),
96        "transactions" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
97        "blocks_by_number" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
98        "blocks_by_hash" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
99        "block_changes" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
100    }
101}
102
103/// Helper for creating a `RocksCfRef`, aborting if it wasn't declared in our option presets.
104fn new_cf_ref<'a, K, V>(db: &'a Arc<DB>, column_family: &str, cf_options_map: &BTreeMap<&str, Options>) -> Result<RocksCfRef<'a, K, V>>
105where
106    K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
107    V: Serialize + for<'de> Deserialize<'de> + Debug + Clone + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
108{
109    tracing::debug!(column_family = column_family, "creating new column family");
110
111    cf_options_map
112        .get(column_family)
113        .with_context(|| format!("matching column_family `{column_family}` given to `new_cf_ref` wasn't found in configuration map"))?;
114
115    // NOTE: this doesn't create the CFs in the database, read `RocksCfRef` docs for details
116    RocksCfRef::new(db, column_family)
117}
118
119/// State handler for our RocksDB storage, separating "tables" by column families.
120///
121/// With data separated by column families, writing and reading should be done via the `RocksCfRef` fields.
122pub struct RocksStorageState {
123    pub db: Arc<DB>,
124    db_path: String,
125    accounts: RocksCfRef<'static, AddressRocksdb, CfAccountsValue>,
126    accounts_history: RocksCfRef<'static, (AddressRocksdb, BlockNumberRocksdb), CfAccountsHistoryValue>,
127    account_slots: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb), CfAccountSlotsValue>,
128    account_slots_history: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb, BlockNumberRocksdb), CfAccountSlotsHistoryValue>,
129    pub transactions: RocksCfRef<'static, HashRocksdb, CfTransactionsValue>,
130    pub blocks_by_number: RocksCfRef<'static, BlockNumberRocksdb, CfBlocksByNumberValue>,
131    blocks_by_hash: RocksCfRef<'static, HashRocksdb, CfBlocksByHashValue>,
132    block_changes: RocksCfRef<'static, BlockNumberRocksdb, CfBlockChangesValue>,
133    /// Last collected stats for a histogram
134    #[cfg(feature = "rocks_metrics")]
135    prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
136    /// Options passed at DB creation, stored for metrics
137    ///
138    /// a newly created `rocksdb::Options` object is unique, with an underlying pointer identifier inside of it, and is used to access
139    /// the DB metrics, `Options` can be cloned, but two equal `Options` might not retrieve the same metrics
140    #[cfg(feature = "rocks_metrics")]
141    db_options: Options,
142    shutdown_timeout: Duration,
143    enable_sync_write: bool,
144}
145
146impl RocksStorageState {
147    pub fn new(path: String, shutdown_timeout: Duration, cache_multiplier: Option<f32>, enable_sync_write: bool) -> Result<Self> {
148        tracing::debug!("creating (or opening an existing) database with the specified column families");
149
150        let cf_options_map = generate_cf_options_map(cache_multiplier);
151
152        #[cfg_attr(not(feature = "rocks_metrics"), allow(unused_variables))]
153        let (db, db_options) = create_or_open_db(&path, &cf_options_map).context("when trying to create (or open) rocksdb")?;
154
155        if db.path().to_str().is_none() {
156            bail!("db path doesn't isn't valid UTF-8: {:?}", db.path());
157        }
158        if db.path().file_name().is_none() {
159            bail!("db path doesn't have a file name or isn't valid UTF-8: {:?}", db.path());
160        }
161
162        let state = Self {
163            db_path: path,
164            accounts: new_cf_ref(db, "accounts", &cf_options_map)?,
165            accounts_history: new_cf_ref(db, "accounts_history", &cf_options_map)?,
166            account_slots: new_cf_ref(db, "account_slots", &cf_options_map)?,
167            account_slots_history: new_cf_ref(db, "account_slots_history", &cf_options_map)?,
168            transactions: new_cf_ref(db, "transactions", &cf_options_map)?,
169            blocks_by_number: new_cf_ref(db, "blocks_by_number", &cf_options_map)?,
170            blocks_by_hash: new_cf_ref(db, "blocks_by_hash", &cf_options_map)?,
171            block_changes: new_cf_ref(db, "block_changes", &cf_options_map)?,
172            #[cfg(feature = "rocks_metrics")]
173            prev_stats: Mutex::default(),
174            #[cfg(feature = "rocks_metrics")]
175            db_options,
176            db: Arc::clone(db),
177            shutdown_timeout,
178            enable_sync_write,
179        };
180
181        tracing::debug!("opened database successfully");
182        Ok(state)
183    }
184
185    #[cfg(test)]
186    #[track_caller]
187    pub fn new_in_testdir() -> anyhow::Result<(Self, tempfile::TempDir)> {
188        let test_dir = tempfile::tempdir()?;
189        let path = test_dir.as_ref().display().to_string();
190        let state = Self::new(path, Duration::ZERO, None, true)?;
191        Ok((state, test_dir))
192    }
193
194    /// Get the filename of the database path.
195    #[cfg(feature = "metrics")]
196    fn db_path_filename(&self) -> &str {
197        self.db_path.rsplit('/').next().unwrap_or(&self.db_path)
198    }
199
200    pub fn preload_block_number(&self) -> Result<AtomicU32> {
201        let block_number = self.blocks_by_number.last_key()?.unwrap_or_default().0;
202        tracing::info!(%block_number, "preloaded block_number");
203        Ok(AtomicU32::new(block_number))
204    }
205
206    #[cfg(feature = "dev")]
207    pub fn reset(&self) -> Result<()> {
208        self.accounts.clear()?;
209        self.accounts_history.clear()?;
210        self.account_slots.clear()?;
211        self.account_slots_history.clear()?;
212        self.transactions.clear()?;
213        self.blocks_by_number.clear()?;
214        self.blocks_by_hash.clear()?;
215        Ok(())
216    }
217
218    /// Updates the in-memory state with changes from transaction execution
219    fn prepare_batch_with_execution_changes(&self, changes: ExecutionChanges, block_number: BlockNumber, batch: &mut WriteBatch) -> Result<()> {
220        let mut block_changes = BlockChangesRocksdb::default();
221        let block_number = block_number.into();
222
223        for (address, change) in changes {
224            let address: AddressRocksdb = address.into();
225            let mut account_change_entry = AccountChangesRocksdb::default();
226
227            if change.is_account_modified() {
228                let mut account_info_entry = self.accounts.get(&address)?.unwrap_or(AccountRocksdb::default().into());
229
230                if let Some(nonce) = change.nonce.take_modified() {
231                    account_info_entry.nonce = nonce.into();
232                    account_change_entry.nonce = Some(nonce.into());
233                }
234                if let Some(balance) = change.balance.take_modified() {
235                    account_info_entry.balance = balance.into();
236                    account_change_entry.balance = Some(balance.into());
237                }
238                if let Some(bytecode) = change.bytecode.take_modified() {
239                    account_info_entry.bytecode = bytecode.clone().map_into();
240                    account_change_entry.bytecode = bytecode.map_into();
241                }
242
243                self.accounts.prepare_batch_insertion([(address, account_info_entry.clone())], batch)?;
244                self.accounts_history
245                    .prepare_batch_insertion([((address, block_number), account_info_entry.into_inner().into())], batch)?;
246            }
247
248            for (&slot_index, slot_change) in &change.slots {
249                if let Some(slot) = slot_change.take_modified_ref() {
250                    let slot_index = slot_index.into();
251                    let slot_value: SlotValueRocksdb = slot.value.into();
252
253                    account_change_entry.slot_changes.insert(slot_index, slot_value);
254                    self.account_slots
255                        .prepare_batch_insertion([((address, slot_index), slot_value.into())], batch)?;
256                    self.account_slots_history
257                        .prepare_batch_insertion([((address, slot_index, block_number), slot_value.into())], batch)?;
258                }
259            }
260
261            account_change_entry
262                .has_changes()
263                .then(|| block_changes.account_changes.insert(address, account_change_entry));
264        }
265
266        self.block_changes.prepare_batch_insertion([(block_number, block_changes.into())], batch)?;
267
268        Ok(())
269    }
270
271    pub fn read_transaction(&self, tx_hash: Hash) -> Result<Option<TransactionMined>> {
272        let Some(block_number) = self.transactions.get(&tx_hash.into())? else {
273            return Ok(None);
274        };
275
276        let Some(block) = self.blocks_by_number.get(&block_number)? else {
277            return log_and_err!("the block that the transaction was supposed to be in was not found")
278                .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}"));
279        };
280        let block = block.into_inner();
281
282        let transaction = block.transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);
283
284        match transaction {
285            Some(tx) => {
286                tracing::trace!(%tx_hash, "transaction found");
287                Ok(Some(TransactionMined::from_rocks_primitives(tx, block_number.into_inner(), block.header.hash)))
288            }
289            None => log_and_err!("rocks error, transaction wasn't found in block where the index pointed at")
290                .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}")),
291        }
292    }
293
294    pub fn read_logs(&self, filter: &LogFilter) -> Result<Vec<LogMined>> {
295        let is_block_number_in_end_range = |number: BlockNumber| match filter.to_block.as_ref() {
296            Some(&last_block) => number <= last_block,
297            None => true,
298        };
299
300        let iter = self
301            .blocks_by_number
302            .iter_from(BlockNumberRocksdb::from(filter.from_block), Direction::Forward)?;
303
304        let mut logs_result = vec![];
305
306        for next in iter {
307            let (number, block) = next?;
308
309            if !is_block_number_in_end_range(number.into()) {
310                break;
311            }
312
313            let block = block.into_inner();
314            let logs = block.transactions.into_iter().enumerate().flat_map(|(tx_index, transaction)| {
315                transaction.logs.into_iter().map(move |log| {
316                    LogMined::from_rocks_primitives(log.log, block.header.number, block.header.hash, tx_index, transaction.input.hash, log.index)
317                })
318            });
319
320            let filtered_logs = logs.filter(|log| filter.matches(log));
321            logs_result.extend(filtered_logs);
322        }
323        Ok(logs_result)
324    }
325
326    pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> Result<Option<Slot>> {
327        if address.is_coinbase() {
328            return Ok(None);
329        }
330
331        match point_in_time {
332            PointInTime::Mined | PointInTime::Pending => {
333                let query_params = (address.into(), index.into());
334
335                let Some(account_slot_value) = self.account_slots.get(&query_params)? else {
336                    return Ok(None);
337                };
338
339                Ok(Some(Slot {
340                    index,
341                    value: account_slot_value.into_inner().into(),
342                }))
343            }
344            PointInTime::MinedPast(block_number) => {
345                tracing::debug!(?address, ?index, ?block_number, "searching slot");
346
347                let key = (address.into(), (index).into(), block_number.into());
348
349                if let Some(((rocks_address, rocks_index, block), value)) = self.account_slots_history.seek(key)?
350                    && rocks_index == index.into()
351                    && rocks_address == address.into()
352                {
353                    tracing::debug!(?block, ?rocks_index, ?rocks_address, "slot found in rocksdb storage");
354                    return Ok(Some(Slot {
355                        index: rocks_index.into(),
356                        value: value.into_inner().into(),
357                    }));
358                }
359                Ok(None)
360            }
361        }
362    }
363
364    pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> Result<Option<Account>> {
365        if address.is_coinbase() || address.is_zero() {
366            return Ok(None);
367        }
368
369        match point_in_time {
370            PointInTime::Mined | PointInTime::Pending => {
371                let Some(inner_account) = self.accounts.get(&address.into())? else {
372                    tracing::trace!(%address, "account not found");
373                    return Ok(None);
374                };
375
376                let account = inner_account.to_account(address);
377                tracing::trace!(%address, ?account, "account found");
378                Ok(Some(account))
379            }
380            PointInTime::MinedPast(block_number) => {
381                tracing::debug!(?address, ?block_number, "searching account");
382
383                let key = (address.into(), block_number.into());
384
385                if let Some(((addr, block), account_info)) = self.accounts_history.seek(key)?
386                    && addr == address.into()
387                {
388                    tracing::debug!(?block, ?address, "account found in rocksdb storage");
389                    return Ok(Some(account_info.to_account(address)));
390                }
391                Ok(None)
392            }
393        }
394    }
395
396    pub fn read_block(&self, selection: BlockFilter) -> Result<Option<Block>> {
397        tracing::debug!(?selection, "reading block");
398
399        let block = match selection {
400            BlockFilter::Latest | BlockFilter::Pending => self.blocks_by_number.last_value(),
401            BlockFilter::Earliest => self.blocks_by_number.first_value(),
402            BlockFilter::Number(block_number) => self.blocks_by_number.get(&block_number.into()),
403            BlockFilter::Hash(block_hash) =>
404                if let Some(block_number) = self.blocks_by_hash.get(&block_hash.into())? {
405                    self.blocks_by_number.get(&block_number)
406                } else {
407                    Ok(None)
408                },
409        };
410        block.map(|block_option| block_option.map(|block| block.into_inner().into()))
411    }
412
413    pub fn save_accounts(&self, accounts: Vec<Account>) -> Result<()> {
414        let mut write_batch = WriteBatch::default();
415
416        tracing::debug!(?accounts, "preparing accounts cf batch");
417        self.accounts.prepare_batch_insertion(
418            accounts.iter().cloned().map(|acc| {
419                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
420                (tup.0, tup.1.into())
421            }),
422            &mut write_batch,
423        )?;
424
425        tracing::debug!(?accounts, "preparing accounts history batch");
426        self.accounts_history.prepare_batch_insertion(
427            accounts.iter().cloned().map(|acc| {
428                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
429                ((tup.0, 0u32.into()), tup.1.into())
430            }),
431            &mut write_batch,
432        )?;
433
434        self.write_in_batch_for_multiple_cfs(write_batch)
435    }
436
437    pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> Result<()> {
438        let mut batch = WriteBatch::default();
439
440        let mut txs_batch = vec![];
441        for transaction in block.transactions.iter().cloned() {
442            txs_batch.push((transaction.input.hash.into(), transaction.block_number.into()));
443        }
444        self.transactions.prepare_batch_insertion(txs_batch, &mut batch)?;
445
446        let number = block.number();
447        let block_hash = block.hash();
448
449        let block_by_number = (number.into(), block.into());
450        self.blocks_by_number.prepare_batch_insertion([block_by_number], &mut batch)?;
451
452        let block_by_hash = (block_hash.into(), number.into());
453        self.blocks_by_hash.prepare_batch_insertion([block_by_hash], &mut batch)?;
454
455        self.prepare_batch_with_execution_changes(account_changes, number, &mut batch)?;
456
457        self.accounts.prepare_batch_insertion(
458            accounts.iter().cloned().map(|acc| {
459                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
460                (tup.0, tup.1.into())
461            }),
462            &mut batch,
463        )?;
464
465        self.accounts_history.prepare_batch_insertion(
466            accounts.iter().cloned().map(|acc| {
467                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
468                ((tup.0, 0u32.into()), tup.1.into())
469            }),
470            &mut batch,
471        )?;
472
473        self.write_in_batch_for_multiple_cfs(batch)
474    }
475
476    pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> Result<()> {
477        let mut batch = WriteBatch::default();
478        self.prepare_block_insertion(block, account_changes, &mut batch)?;
479        self.write_in_batch_for_multiple_cfs(batch)
480    }
481
482    pub fn prepare_block_insertion(&self, block: Block, account_changes: ExecutionChanges, batch: &mut WriteBatch) -> Result<()> {
483        let mut txs_batch = vec![];
484        for transaction in block.transactions.iter().cloned() {
485            txs_batch.push((transaction.input.hash.into(), transaction.block_number.into()));
486        }
487
488        self.transactions.prepare_batch_insertion(txs_batch, batch)?;
489
490        let number = block.number();
491        let block_hash = block.hash();
492
493        let block_by_number = (number.into(), block.into());
494        self.blocks_by_number.prepare_batch_insertion([block_by_number], batch)?;
495
496        let block_by_hash = (block_hash.into(), number.into());
497        self.blocks_by_hash.prepare_batch_insertion([block_by_hash], batch)?;
498
499        self.prepare_batch_with_execution_changes(account_changes, number, batch)?;
500
501        Ok(())
502    }
503
504    /// Write to DB in a batch
505    pub fn write_in_batch_for_multiple_cfs(&self, batch: WriteBatch) -> Result<()> {
506        tracing::debug!("writing batch");
507        let batch_len = batch.len();
508
509        let mut options = WriteOptions::default();
510        // By default, each write to rocksdb is asynchronous: it returns after pushing
511        // the write from the process into the operating system (buffer cache).
512        //
513        // This option offers the trade-off of waiting after each write to
514        // ensure data is persisted to disk before returning, preventing
515        // potential data loss in case of system failure.
516        options.set_sync(self.enable_sync_write);
517
518        self.db
519            .write_opt(batch, &options)
520            .context("failed to write in batch to (possibly) multiple column families")
521            .inspect_err(|e| {
522                tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
523            })
524    }
525
526    #[cfg(feature = "dev")]
527    pub fn save_slot(&self, address: Address, slot: Slot) -> Result<()> {
528        let mut batch = WriteBatch::default();
529        self.account_slots
530            .prepare_batch_insertion([((address.into(), slot.index.into()), slot.value.into())], &mut batch)?;
531        self.account_slots.apply_batch_with_context(batch)
532    }
533
534    #[cfg(feature = "dev")]
535    pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> Result<()> {
536        let mut batch = WriteBatch::default();
537
538        // Get the current account or create a new one
539        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
540
541        // Update the nonce
542        account_info_entry.nonce = nonce.into();
543
544        // Save the account
545        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
546        self.accounts.apply_batch_with_context(batch)
547    }
548
549    #[cfg(feature = "dev")]
550    pub fn save_account_balance(&self, address: Address, balance: Wei) -> Result<()> {
551        let mut batch = WriteBatch::default();
552
553        // Get the current account or create a new one
554        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
555
556        // Update the balance
557        account_info_entry.balance = balance.into();
558
559        // Save the account
560        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
561        self.accounts.apply_batch_with_context(batch)
562    }
563
564    #[cfg(feature = "dev")]
565    pub fn save_account_code(&self, address: Address, code: Bytes) -> Result<()> {
566        use crate::alias::RevmBytecode;
567
568        let mut batch = WriteBatch::default();
569
570        // Get the current account or create a new one
571        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
572
573        // Update the bytecode
574        account_info_entry.bytecode = if code.0.is_empty() {
575            None
576        } else {
577            Some(RevmBytecode::new_raw(code.0.into()))
578        }
579        .map_into();
580
581        // Save the account
582        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
583        self.accounts.apply_batch_with_context(batch)
584    }
585
586    #[cfg(test)]
587    pub fn read_all_accounts(&self) -> Result<Vec<AccountRocksdb>> {
588        self.accounts.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
589    }
590
591    #[cfg(test)]
592    pub fn read_all_historical_accounts(&self) -> Result<Vec<AccountRocksdb>> {
593        self.accounts_history.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
594    }
595
596    #[cfg(feature = "dev")]
597    pub fn clear(&self) -> Result<()> {
598        self.accounts.clear().context("when clearing accounts")?;
599        self.accounts_history.clear().context("when clearing accounts_history")?;
600        self.account_slots.clear().context("when clearing account_slots")?;
601        self.account_slots_history.clear().context("when clearing account_slots_history")?;
602        self.transactions.clear().context("when clearing transactions")?;
603        self.blocks_by_hash.clear().context("when clearing blocks_by_hash")?;
604        self.blocks_by_number.clear().context("when clearing blocks_by_number")?;
605        Ok(())
606    }
607
608    pub fn read_block_with_changes(&self, selection: BlockFilter) -> Result<Option<(Block, ExecutionChanges)>> {
609        let Some(block_wo_changes) = self.read_block(selection)? else {
610            return Ok(None);
611        };
612        let changes = self.block_changes.get(&block_wo_changes.number().into())?;
613        Ok(Some((block_wo_changes, changes.map(|changes| changes.into_inner().into()).unwrap_or_default())))
614    }
615}
616
617#[cfg(feature = "rocks_metrics")]
618impl RocksStorageState {
619    pub fn export_metrics(&self) -> Result<()> {
620        let db_get = self.db_options.get_histogram_data(Histogram::DbGet);
621        let db_write = self.db_options.get_histogram_data(Histogram::DbWrite);
622
623        let wal_file_synced = self.db_options.get_ticker_count(Ticker::WalFileSynced);
624        let block_cache_miss = self.db_options.get_ticker_count(Ticker::BlockCacheMiss);
625        let block_cache_hit = self.db_options.get_ticker_count(Ticker::BlockCacheHit);
626        let bytes_written = self.db_options.get_ticker_count(Ticker::BytesWritten);
627        let bytes_read = self.db_options.get_ticker_count(Ticker::BytesRead);
628
629        let cur_size_active_mem_table = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ACTIVE_MEM_TABLE).unwrap_or_default();
630        let cur_size_all_mem_tables = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ALL_MEM_TABLES).unwrap_or_default();
631        let size_all_mem_tables = self.db.property_int_value(rocksdb::properties::SIZE_ALL_MEM_TABLES).unwrap_or_default();
632        let block_cache_usage = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_USAGE).unwrap_or_default();
633        let block_cache_capacity = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_CAPACITY).unwrap_or_default();
634        let background_errors = self.db.property_int_value(rocksdb::properties::BACKGROUND_ERRORS).unwrap_or_default();
635
636        let db_name = self.db_path_filename();
637
638        metrics::set_rocks_db_get(db_get.count(), db_name);
639        metrics::set_rocks_db_write(db_write.count(), db_name);
640        metrics::set_rocks_block_cache_miss(block_cache_miss, db_name);
641        metrics::set_rocks_block_cache_hit(block_cache_hit, db_name);
642        metrics::set_rocks_bytes_written(bytes_written, db_name);
643        metrics::set_rocks_bytes_read(bytes_read, db_name);
644        metrics::set_rocks_wal_file_synced(wal_file_synced, db_name);
645
646        metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime)?, db_name);
647        metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime)?, db_name);
648        metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime)?, db_name);
649
650        if let Some(cur_size_active_mem_table) = cur_size_active_mem_table {
651            metrics::set_rocks_cur_size_active_mem_table(cur_size_active_mem_table, db_name);
652        }
653
654        if let Some(cur_size_all_mem_tables) = cur_size_all_mem_tables {
655            metrics::set_rocks_cur_size_all_mem_tables(cur_size_all_mem_tables, db_name);
656        }
657
658        if let Some(size_all_mem_tables) = size_all_mem_tables {
659            metrics::set_rocks_size_all_mem_tables(size_all_mem_tables, db_name);
660        }
661
662        if let Some(block_cache_usage) = block_cache_usage {
663            metrics::set_rocks_block_cache_usage(block_cache_usage, db_name);
664        }
665        if let Some(block_cache_capacity) = block_cache_capacity {
666            metrics::set_rocks_block_cache_capacity(block_cache_capacity, db_name);
667        }
668        if let Some(background_errors) = background_errors {
669            metrics::set_rocks_background_errors(background_errors, db_name);
670        }
671
672        self.account_slots.export_metrics();
673        self.account_slots_history.export_metrics();
674        self.accounts.export_metrics();
675        self.accounts_history.export_metrics();
676        self.blocks_by_hash.export_metrics();
677        self.blocks_by_number.export_metrics();
678        self.transactions.export_metrics();
679        Ok(())
680    }
681
682    fn get_histogram_average_in_interval(&self, hist: Histogram) -> Result<u64> {
683        // The stats are cumulative since opening the db
684        // we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)
685
686        let mut prev_values = self.prev_stats.lock();
687
688        let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
689        let data = self.db_options.get_histogram_data(hist);
690        let data_count = data.count();
691        let data_sum = data.sum();
692
693        let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else {
694            return Ok(0);
695        };
696
697        prev_values.insert(hist as u32, (data_sum, data_count));
698        Ok(avg)
699    }
700}
701
702#[cfg(feature = "metrics")]
703impl RocksStorageState {
704    pub fn export_column_family_size_metrics(&self) -> Result<()> {
705        let db_name = self.db_path_filename();
706
707        let cf_names = DB::list_cf(&Options::default(), &self.db_path)?;
708
709        for cf_name in cf_names {
710            if let Some(cf_handle) = self.db.cf_handle(&cf_name)
711                && let Ok(Some(size)) = self.db.property_int_value_cf(&cf_handle, "rocksdb.total-sst-files-size")
712            {
713                metrics::set_rocks_cf_size(size, db_name, &cf_name);
714            }
715        }
716
717        Ok(())
718    }
719}
720
721impl Drop for RocksStorageState {
722    fn drop(&mut self) {
723        let mut options = WaitForCompactOptions::default();
724        // if background jobs are paused, it makes no sense to keep waiting indefinitely
725        options.set_abort_on_pause(true);
726        // flush all write buffers before waiting
727        options.set_flush(true);
728        // wait for the duration passed to `--rocks-shutdown-timeout`, or the default value
729        options.set_timeout(self.shutdown_timeout.as_micros().try_into().unwrap_or_default());
730
731        tracing::info!("starting rocksdb shutdown");
732        let instant = Instant::now();
733
734        // here, waiting for is done to force WAL logs to be processed, to skip replaying most of them when booting
735        // up, which was slowing things down, we are sacrificing a long shutdown for a faster boot
736        let result = self.db.wait_for_compact(&options);
737        let waited_for = instant.elapsed();
738
739        #[cfg(feature = "metrics")]
740        {
741            let db_name = self.db_path_filename();
742            metrics::set_rocks_last_shutdown_delay_millis(waited_for.as_millis() as u64, db_name);
743        }
744
745        if let Err(e) = result {
746            tracing::error!(reason = ?e, ?waited_for, "rocksdb shutdown compaction didn't finish in time, shutting it down anyways");
747        } else {
748            tracing::info!(?waited_for, "finished rocksdb shutdown");
749        }
750    }
751}
752
753impl fmt::Debug for RocksStorageState {
754    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
755        f.debug_struct("RocksStorageState").field("db_path", &self.db_path).finish()
756    }
757}
758
759#[cfg(test)]
760mod tests {
761
762    use fake::Fake;
763    use fake::Faker;
764
765    use super::*;
766    use crate::alias::RevmBytecode;
767    use crate::eth::primitives::BlockHeader;
768    use crate::eth::primitives::ExecutionAccountChanges;
769    use crate::eth::primitives::ExecutionValueChange;
770
771    #[test]
772    #[cfg(feature = "dev")]
773    fn test_rocks_multi_get() {
774        use std::collections::HashMap;
775        use std::collections::HashSet;
776        use std::iter;
777
778        type Key = (AddressRocksdb, SlotIndexRocksdb);
779        type Value = CfAccountSlotsValue;
780
781        let (mut state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
782        let account_slots = &mut state.account_slots;
783
784        // slots and extra_slots to be inserted
785        let slots: HashMap<Key, Value> = (0..1000).map(|_| Faker.fake()).collect();
786        let extra_slots: HashMap<Key, Value> = iter::repeat_with(|| Faker.fake())
787            .filter(|(key, _)| !slots.contains_key(key))
788            .take(1000)
789            .collect();
790
791        let mut batch = WriteBatch::default();
792        account_slots.prepare_batch_insertion(slots.clone(), &mut batch).unwrap();
793        account_slots.prepare_batch_insertion(extra_slots.clone(), &mut batch).unwrap();
794        account_slots.apply_batch_with_context(batch).unwrap();
795
796        // keys that don't match any entry
797        let extra_keys: HashSet<Key> = (0..1000)
798            .map(|_| Faker.fake())
799            .filter(|key| !extra_slots.contains_key(key) && !slots.contains_key(key))
800            .collect();
801
802        // concatenation of keys for inserted elements, and keys that aren't in the DB
803        let keys: Vec<Key> = slots.keys().copied().chain(extra_keys).collect();
804        let result = account_slots.multi_get(keys).expect("this should not fail");
805
806        // check that, besides having extra slots inserted, and extra keys when querying,
807        // only the expected slots are returned
808        assert_eq!(result.len(), slots.keys().len());
809        for (idx, value) in result {
810            assert_eq!(value, *slots.get(&idx).expect("slot should be found"));
811        }
812    }
813
814    #[test]
815    fn regression_test_read_logs_without_providing_filter_address() {
816        let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
817
818        assert_eq!(state.read_logs(&LogFilter::default()).unwrap(), vec![]);
819
820        // 100 blocks with 1 transaction, with 2 logs, total: 200 logs
821        for number in 0..100 {
822            let block = Block {
823                header: BlockHeader {
824                    number: number.into(),
825                    ..Faker.fake()
826                },
827                transactions: vec![TransactionMined {
828                    block_number: number.into(),
829                    logs: vec![Faker.fake(), Faker.fake()],
830                    ..Faker.fake()
831                }],
832            };
833
834            state.save_block(block, ExecutionChanges::default()).unwrap();
835        }
836
837        let filter = LogFilter {
838            // address is empty
839            addresses: vec![],
840            ..Default::default()
841        };
842
843        assert_eq!(state.read_logs(&filter).unwrap().len(), 200);
844    }
845
846    #[test]
847    fn regression_test_saving_account_changes_for_accounts_that_didnt_change() {
848        let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
849
850        let addr: Address = Faker.fake();
851        let change_base = ExecutionAccountChanges {
852            nonce: ExecutionValueChange::from_original(Faker.fake()),
853            balance: ExecutionValueChange::from_original(Faker.fake()),
854            bytecode: ExecutionValueChange::from_original(Some(RevmBytecode::new_raw(Faker.fake::<Vec<u8>>().into()))),
855            slots: BTreeMap::new(),
856        };
857
858        let change_1 = ExecutionChanges::from([(addr, ExecutionAccountChanges { ..change_base.clone() })]);
859        let change_2 = ExecutionChanges::from([(
860            addr,
861            ExecutionAccountChanges {
862                nonce: ExecutionValueChange::from_modified(Faker.fake()),
863                ..change_base.clone()
864            },
865        )]);
866        let change_3 = ExecutionChanges::from([(
867            addr,
868            ExecutionAccountChanges {
869                balance: ExecutionValueChange::from_modified(Faker.fake()),
870                ..change_base.clone()
871            },
872        )]);
873        let change_4 = ExecutionChanges::from([(
874            addr,
875            ExecutionAccountChanges {
876                bytecode: ExecutionValueChange::from_modified(Some(RevmBytecode::new_raw(Faker.fake::<Vec<u8>>().into()))),
877                ..change_base
878            },
879        )]);
880
881        let mut batch = WriteBatch::default();
882        // add accounts in separate blocks so they show up in history
883        state.prepare_batch_with_execution_changes(change_1, 1.into(), &mut batch).unwrap();
884        state.prepare_batch_with_execution_changes(change_2, 2.into(), &mut batch).unwrap();
885        state.prepare_batch_with_execution_changes(change_3, 3.into(), &mut batch).unwrap();
886        state.prepare_batch_with_execution_changes(change_4, 4.into(), &mut batch).unwrap();
887        state.write_in_batch_for_multiple_cfs(batch).unwrap();
888
889        let accounts = state.read_all_accounts().unwrap();
890        assert_eq!(accounts.len(), 1);
891
892        let history = state.read_all_historical_accounts().unwrap();
893        assert_eq!(history.len(), 3);
894    }
895
896    #[test]
897    fn test_column_families_creation_order_is_deterministic() {
898        let mut previous_order: Option<Vec<String>> = None;
899
900        // Run the test multiple times to ensure RocksDB CF creation order is deterministic
901        for i in 0..10 {
902            let test_dir = tempfile::tempdir().unwrap();
903            let path = test_dir.as_ref().display().to_string();
904
905            // Create the database with the column families
906            let cf_options_map = generate_cf_options_map(None);
907            let (_db, _) = create_or_open_db(&path, &cf_options_map).unwrap();
908
909            // Get the actual CF order from RocksDB
910            let actual_cf_order = DB::list_cf(&Options::default(), &path).unwrap();
911
912            // Compare with previous iteration
913            if let Some(prev_order) = &previous_order {
914                assert_eq!(
915                    &actual_cf_order,
916                    prev_order,
917                    "RocksDB column family order changed between iterations {} and {}",
918                    i - 1,
919                    i
920                );
921            } else {
922                previous_order = Some(actual_cf_order);
923            }
924        }
925    }
926
927    #[test]
928    fn test_bincode_lexicographical_ordering() {
929        use super::super::types::BlockNumberRocksdb;
930        use crate::rocks_bincode_config;
931
932        let boundary_tests = vec![(250, 251), (255, 256), (511, 512), (1023, 1024), (2047, 2048)];
933
934        for (before, after) in boundary_tests {
935            let before_rocks = BlockNumberRocksdb::from(before as u32);
936            let after_rocks = BlockNumberRocksdb::from(after as u32);
937
938            let before_bytes = bincode::encode_to_vec(before_rocks, rocks_bincode_config()).unwrap();
939            let after_bytes = bincode::encode_to_vec(after_rocks, rocks_bincode_config()).unwrap();
940
941            let lexicographic_order_correct = before_bytes < after_bytes;
942
943            assert!(
944                lexicographic_order_correct,
945                "Block {before} should serialize to bytes < Block {after} for RocksDB ordering",
946            );
947        }
948    }
949}