stratus/eth/storage/permanent/rocks/
rocks_state.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::sync::atomic::AtomicU32;
6use std::time::Duration;
7use std::time::Instant;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use itertools::Itertools;
13use rocksdb::DB;
14use rocksdb::Direction;
15use rocksdb::Options;
16use rocksdb::WaitForCompactOptions;
17use rocksdb::WriteBatch;
18use rocksdb::WriteOptions;
19use serde::Deserialize;
20use serde::Serialize;
21use sugars::btmap;
22
23use super::cf_versions::CfAccountSlotsHistoryValue;
24use super::cf_versions::CfAccountSlotsValue;
25use super::cf_versions::CfAccountsHistoryValue;
26use super::cf_versions::CfAccountsValue;
27use super::cf_versions::CfBlocksByHashValue;
28use super::cf_versions::CfBlocksByNumberValue;
29use super::cf_versions::CfTransactionsValue;
30use super::rocks_cf::RocksCfRef;
31use super::rocks_cf_cache_config::RocksCfCacheConfig;
32use super::rocks_config::CacheSetting;
33use super::rocks_config::DbConfig;
34use super::rocks_db::create_or_open_db;
35use super::types::AccountRocksdb;
36use super::types::AddressRocksdb;
37use super::types::BlockNumberRocksdb;
38use super::types::HashRocksdb;
39use super::types::SlotIndexRocksdb;
40use super::types::SlotValueRocksdb;
41use super::types::UnixTimeRocksdb;
42use crate::eth::primitives::Account;
43use crate::eth::primitives::Address;
44use crate::eth::primitives::Block;
45use crate::eth::primitives::BlockFilter;
46use crate::eth::primitives::BlockNumber;
47#[cfg(feature = "dev")]
48use crate::eth::primitives::Bytes;
49use crate::eth::primitives::ExecutionChanges;
50use crate::eth::primitives::Hash;
51use crate::eth::primitives::LogFilter;
52use crate::eth::primitives::LogMessage;
53#[cfg(feature = "dev")]
54use crate::eth::primitives::Nonce;
55use crate::eth::primitives::PointInTime;
56use crate::eth::primitives::Slot;
57use crate::eth::primitives::SlotIndex;
58use crate::eth::primitives::TransactionMined;
59#[cfg(feature = "dev")]
60use crate::eth::primitives::Wei;
61use crate::eth::storage::permanent::rocks::SerializeDeserializeWithContext;
62use crate::eth::storage::permanent::rocks::cf_versions::CfBlockChangesValue;
63use crate::eth::storage::permanent::rocks::types::AccountChangesRocksdb;
64use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
65use crate::ext::OptionExt;
66#[cfg(feature = "metrics")]
67use crate::infra::metrics;
68use crate::log_and_err;
69
70cfg_if::cfg_if! {
71    if #[cfg(feature = "rocks_metrics")] {
72        use parking_lot::Mutex;
73
74        use rocksdb::statistics::Histogram;
75        use rocksdb::statistics::Ticker;
76        use std::collections::HashMap;
77
78
79        use crate::infra::metrics::{Count, HistogramInt, Sum};
80    }
81}
82
83pub fn generate_cf_options_map(cf_cache_config: &RocksCfCacheConfig) -> BTreeMap<&'static str, Options> {
84    // Helper function to create cache setting from size
85    let cache_setting_from_size = |size: usize| -> CacheSetting { CacheSetting::Enabled(size) };
86
87    btmap! {
88        "accounts" => DbConfig::OptimizedPointLookUp.to_options(cache_setting_from_size(cf_cache_config.accounts)),
89        "accounts_history" => DbConfig::HistoricalData.to_options(cache_setting_from_size(cf_cache_config.accounts_history)),
90        "account_slots" => DbConfig::OptimizedPointLookUp.to_options(cache_setting_from_size(cf_cache_config.account_slots)),
91        "account_slots_history" => DbConfig::HistoricalData.to_options(cache_setting_from_size(cf_cache_config.account_slots_history)),
92        "transactions" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.transactions)),
93        "blocks_by_number" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_number)),
94        "blocks_by_hash" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_hash)),
95        "blocks_by_timestamp" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_timestamp)),
96        "block_changes" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.block_changes)),
97    }
98}
99
100/// Helper for creating a `RocksCfRef`, aborting if it wasn't declared in our option presets.
101fn new_cf_ref<'a, K, V>(db: &'a Arc<DB>, column_family: &str, cf_options_map: &BTreeMap<&str, Options>) -> Result<RocksCfRef<'a, K, V>>
102where
103    K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
104    V: Serialize + for<'de> Deserialize<'de> + Debug + Clone + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
105{
106    tracing::debug!(column_family = column_family, "creating new column family");
107
108    cf_options_map
109        .get(column_family)
110        .with_context(|| format!("matching column_family `{column_family}` given to `new_cf_ref` wasn't found in configuration map"))?;
111
112    // NOTE: this doesn't create the CFs in the database, read `RocksCfRef` docs for details
113    RocksCfRef::new(db, column_family)
114}
115
116/// State handler for our RocksDB storage, separating "tables" by column families.
117///
118/// With data separated by column families, writing and reading should be done via the `RocksCfRef` fields.
119pub struct RocksStorageState {
120    pub db: Arc<DB>,
121    db_path: String,
122    accounts: RocksCfRef<'static, AddressRocksdb, CfAccountsValue>,
123    accounts_history: RocksCfRef<'static, (AddressRocksdb, BlockNumberRocksdb), CfAccountsHistoryValue>,
124    account_slots: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb), CfAccountSlotsValue>,
125    account_slots_history: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb, BlockNumberRocksdb), CfAccountSlotsHistoryValue>,
126    pub transactions: RocksCfRef<'static, HashRocksdb, CfTransactionsValue>,
127    pub blocks_by_number: RocksCfRef<'static, BlockNumberRocksdb, CfBlocksByNumberValue>,
128    blocks_by_hash: RocksCfRef<'static, HashRocksdb, CfBlocksByHashValue>,
129    blocks_by_timestamp: RocksCfRef<'static, UnixTimeRocksdb, CfBlocksByHashValue>,
130    block_changes: RocksCfRef<'static, BlockNumberRocksdb, CfBlockChangesValue>,
131    /// Last collected stats for a histogram
132    #[cfg(feature = "rocks_metrics")]
133    prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
134    /// Options passed at DB creation, stored for metrics
135    ///
136    /// a newly created `rocksdb::Options` object is unique, with an underlying pointer identifier inside of it, and is used to access
137    /// the DB metrics, `Options` can be cloned, but two equal `Options` might not retrieve the same metrics
138    #[cfg(feature = "rocks_metrics")]
139    db_options: Options,
140    shutdown_timeout: Duration,
141    enable_sync_write: bool,
142}
143
144impl RocksStorageState {
145    pub fn new(path: String, shutdown_timeout: Duration, cf_cache_config: RocksCfCacheConfig, enable_sync_write: bool) -> Result<Self> {
146        tracing::debug!("creating (or opening an existing) database with the specified column families");
147
148        let cf_options_map = generate_cf_options_map(&cf_cache_config);
149
150        #[cfg_attr(not(feature = "rocks_metrics"), allow(unused_variables))]
151        let (db, db_options) = create_or_open_db(&path, &cf_options_map).context("when trying to create (or open) rocksdb")?;
152
153        if db.path().to_str().is_none() {
154            bail!("db path doesn't isn't valid UTF-8: {:?}", db.path());
155        }
156        if db.path().file_name().is_none() {
157            bail!("db path doesn't have a file name or isn't valid UTF-8: {:?}", db.path());
158        }
159
160        let state = Self {
161            db_path: path,
162            accounts: new_cf_ref(db, "accounts", &cf_options_map)?,
163            accounts_history: new_cf_ref(db, "accounts_history", &cf_options_map)?,
164            account_slots: new_cf_ref(db, "account_slots", &cf_options_map)?,
165            account_slots_history: new_cf_ref(db, "account_slots_history", &cf_options_map)?,
166            transactions: new_cf_ref(db, "transactions", &cf_options_map)?,
167            blocks_by_number: new_cf_ref(db, "blocks_by_number", &cf_options_map)?,
168            blocks_by_hash: new_cf_ref(db, "blocks_by_hash", &cf_options_map)?,
169            blocks_by_timestamp: new_cf_ref(db, "blocks_by_timestamp", &cf_options_map)?,
170            block_changes: new_cf_ref(db, "block_changes", &cf_options_map)?,
171            #[cfg(feature = "rocks_metrics")]
172            prev_stats: Mutex::default(),
173            #[cfg(feature = "rocks_metrics")]
174            db_options,
175            db: Arc::clone(db),
176            shutdown_timeout,
177            enable_sync_write,
178        };
179
180        tracing::debug!("opened database successfully");
181        Ok(state)
182    }
183
184    #[cfg(test)]
185    #[track_caller]
186    pub fn new_in_testdir() -> anyhow::Result<(Self, tempfile::TempDir)> {
187        let test_dir = tempfile::tempdir()?;
188        let path = test_dir.as_ref().display().to_string();
189        let state = Self::new(path, Duration::ZERO, RocksCfCacheConfig::default(), true)?;
190        Ok((state, test_dir))
191    }
192
193    /// Get the filename of the database path.
194    #[cfg(feature = "metrics")]
195    fn db_path_filename(&self) -> &str {
196        self.db_path.rsplit('/').next().unwrap_or(&self.db_path)
197    }
198
199    pub fn preload_block_number(&self) -> Result<AtomicU32> {
200        let block_number = self.blocks_by_number.last_key()?.unwrap_or_default().0;
201        tracing::info!(%block_number, "preloaded block_number");
202        Ok(AtomicU32::new(block_number))
203    }
204
205    #[cfg(feature = "dev")]
206    pub fn reset(&self) -> Result<()> {
207        self.accounts.clear()?;
208        self.accounts_history.clear()?;
209        self.account_slots.clear()?;
210        self.account_slots_history.clear()?;
211        self.transactions.clear()?;
212        self.blocks_by_number.clear()?;
213        self.blocks_by_hash.clear()?;
214        self.blocks_by_timestamp.clear()?;
215        Ok(())
216    }
217
218    /// Updates the in-memory state with changes from transaction execution
219    fn prepare_batch_with_execution_changes(&self, changes: ExecutionChanges, block_number: BlockNumber, batch: &mut WriteBatch) -> Result<()> {
220        let mut block_changes = BlockChangesRocksdb::with_capacity(changes.accounts.len());
221        let block_number = block_number.into();
222
223        for (address, change) in changes.accounts {
224            let address: AddressRocksdb = address.into();
225            if change.is_modified() {
226                let mut account_change_entry = AccountChangesRocksdb::default();
227                let mut account_info_entry = self.accounts.get(&address)?.unwrap_or(AccountRocksdb::default().into());
228
229                if change.nonce.is_changed() {
230                    let nonce = (*change.nonce.value()).into();
231                    account_info_entry.nonce = nonce;
232                    account_change_entry.nonce = Some(nonce);
233                }
234                if change.balance.is_changed() {
235                    let balance = (*change.balance.value()).into();
236                    account_info_entry.balance = balance;
237                    account_change_entry.balance = Some(balance);
238                }
239                if change.bytecode.is_changed() {
240                    let bytecode = change.bytecode.value().clone().map_into();
241                    account_info_entry.bytecode = bytecode.clone();
242                    account_change_entry.bytecode = Some(bytecode);
243                }
244
245                self.accounts.prepare_batch_insertion([(address, account_info_entry.clone())], batch)?;
246                self.accounts_history
247                    .prepare_batch_insertion([((address, block_number), account_info_entry.into_inner().into())], batch)?;
248                block_changes.account_changes.insert(address, account_change_entry);
249            }
250        }
251
252        for ((address, slot_index), slot_value) in changes.slots {
253            let address = address.into();
254            let slot_index = slot_index.into();
255            let slot_value: SlotValueRocksdb = slot_value.into();
256
257            self.account_slots
258                .prepare_batch_insertion([((address, slot_index), slot_value.into())], batch)?;
259            self.account_slots_history
260                .prepare_batch_insertion([((address, slot_index, block_number), slot_value.into())], batch)?;
261
262            block_changes.slot_changes.insert((address, slot_index), slot_value);
263        }
264
265        self.block_changes.prepare_batch_insertion([(block_number, block_changes.into())], batch)?;
266
267        Ok(())
268    }
269
270    pub fn read_transaction(&self, tx_hash: Hash) -> Result<Option<TransactionMined>> {
271        let Some(block_number) = self.transactions.get(&tx_hash.into())? else {
272            return Ok(None);
273        };
274
275        let Some(block) = self.blocks_by_number.get(&block_number)? else {
276            return log_and_err!("the block that the transaction was supposed to be in was not found")
277                .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}"));
278        };
279        let block = block.into_inner();
280
281        let transaction = block.transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);
282
283        match transaction {
284            Some(tx) => {
285                tracing::trace!(%tx_hash, "transaction found");
286                Ok(Some(TransactionMined::from_rocks_primitives(tx, block_number.into_inner(), block.header.hash)))
287            }
288            None => log_and_err!("rocks error, transaction wasn't found in block where the index pointed at")
289                .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}")),
290        }
291    }
292
293    pub fn read_logs(&self, filter: &LogFilter) -> Result<Vec<LogMessage>> {
294        let is_block_number_in_end_range = |number: BlockNumber| match filter.to_block.as_ref() {
295            Some(&last_block) => number <= last_block,
296            None => true,
297        };
298
299        let iter = self
300            .blocks_by_number
301            .iter_from(BlockNumberRocksdb::from(filter.from_block), Direction::Forward)?;
302
303        let mut logs_result = vec![];
304
305        for next in iter {
306            let (number, block) = next?;
307
308            if !is_block_number_in_end_range(number.into()) {
309                break;
310            }
311
312            let block = block.into_inner();
313            let logs = block.transactions.into_iter().enumerate().flat_map(|(tx_index, transaction)| {
314                transaction
315                    .logs
316                    .into_iter()
317                    .map(move |log| LogMessage::from_rocks_primitives(log, block.header.number, block.header.hash, tx_index, transaction.input.hash))
318            });
319
320            let filtered_logs = logs.filter(|log| filter.matches(&log.log, block.header.number.into()));
321            logs_result.extend(filtered_logs);
322        }
323        Ok(logs_result)
324    }
325
326    pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> Result<Option<Slot>> {
327        if address.is_coinbase() {
328            return Ok(None);
329        }
330
331        match point_in_time {
332            PointInTime::Mined | PointInTime::Pending => {
333                let query_params = (address.into(), index.into());
334
335                let Some(account_slot_value) = self.account_slots.get(&query_params)? else {
336                    return Ok(None);
337                };
338
339                Ok(Some(Slot {
340                    index,
341                    value: account_slot_value.into_inner().into(),
342                }))
343            }
344            PointInTime::MinedPast(block_number) => {
345                tracing::debug!(?address, ?index, ?block_number, "searching slot");
346
347                let key = (address.into(), (index).into(), block_number.into());
348
349                if let Some(((rocks_address, rocks_index, block), value)) = self.account_slots_history.seek(key)?
350                    && rocks_index == index.into()
351                    && rocks_address == address.into()
352                {
353                    tracing::debug!(?block, ?rocks_index, ?rocks_address, "slot found in rocksdb storage");
354                    return Ok(Some(Slot {
355                        index: rocks_index.into(),
356                        value: value.into_inner().into(),
357                    }));
358                }
359                Ok(None)
360            }
361        }
362    }
363
364    pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> Result<Option<Account>> {
365        if address.is_coinbase() || address.is_zero() {
366            return Ok(None);
367        }
368
369        match point_in_time {
370            PointInTime::Mined | PointInTime::Pending => {
371                let Some(inner_account) = self.accounts.get(&address.into())? else {
372                    tracing::trace!(%address, "account not found");
373                    return Ok(None);
374                };
375
376                let account = inner_account.to_account(address);
377                tracing::trace!(%address, ?account, "account found");
378                Ok(Some(account))
379            }
380            PointInTime::MinedPast(block_number) => {
381                tracing::debug!(?address, ?block_number, "searching account");
382
383                let key = (address.into(), block_number.into());
384
385                if let Some(((addr, block), account_info)) = self.accounts_history.seek(key)?
386                    && addr == address.into()
387                {
388                    tracing::debug!(?block, ?address, "account found in rocksdb storage");
389                    return Ok(Some(account_info.to_account(address)));
390                }
391                Ok(None)
392            }
393        }
394    }
395
396    pub fn read_accounts(&self, addresses: Vec<Address>) -> Result<Vec<(Address, Account)>> {
397        self.accounts
398            .multi_get(addresses.into_iter().map_into())
399            .map(|vec| vec.into_iter().map(|(addr, acc)| (addr.into(), acc.to_account(addr.into()))).collect_vec())
400    }
401
402    pub fn read_block(&self, selection: BlockFilter) -> Result<Option<Block>> {
403        tracing::debug!(?selection, "reading block");
404
405        let block = match selection {
406            BlockFilter::Latest | BlockFilter::Pending => self.blocks_by_number.last_value(),
407            BlockFilter::Earliest => self.blocks_by_number.first_value(),
408            BlockFilter::Number(block_number) => self.blocks_by_number.get(&block_number.into()),
409            BlockFilter::Hash(block_hash) =>
410                if let Some(block_number) = self.blocks_by_hash.get(&block_hash.into())? {
411                    self.blocks_by_number.get(&block_number)
412                } else {
413                    Ok(None)
414                },
415            BlockFilter::Timestamp(timestamp) => self
416                .blocks_by_timestamp
417                .iter_from(timestamp.timestamp.into(), timestamp.mode.into())?
418                .next()
419                .transpose()?
420                .map(|inner| self.blocks_by_number.get(&inner.1))
421                .transpose()
422                .map(|nested_opt| nested_opt.flatten()),
423        };
424        block.map(|block_option| block_option.map(|block| block.into_inner().into()))
425    }
426
427    pub fn save_accounts(&self, accounts: Vec<Account>) -> Result<()> {
428        let mut write_batch = WriteBatch::default();
429
430        tracing::debug!(?accounts, "preparing accounts cf batch");
431        self.accounts.prepare_batch_insertion(
432            accounts.iter().cloned().map(|acc| {
433                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
434                (tup.0, tup.1.into())
435            }),
436            &mut write_batch,
437        )?;
438
439        tracing::debug!(?accounts, "preparing accounts history batch");
440        self.accounts_history.prepare_batch_insertion(
441            accounts.iter().cloned().map(|acc| {
442                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
443                ((tup.0, 0u32.into()), tup.1.into())
444            }),
445            &mut write_batch,
446        )?;
447
448        self.write_in_batch_for_multiple_cfs(write_batch)
449    }
450
451    pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> Result<()> {
452        let mut batch = WriteBatch::default();
453
454        let mut txs_batch = vec![];
455        for transaction in block.transactions.iter().cloned() {
456            txs_batch.push((transaction.info.hash.into(), transaction.evm_input.block_number.into()));
457        }
458        self.transactions.prepare_batch_insertion(txs_batch, &mut batch)?;
459
460        let number = block.number();
461        let block_hash = block.hash();
462        let timestamp = block.header.timestamp;
463
464        let block_by_number = (number.into(), block.into());
465        self.blocks_by_number.prepare_batch_insertion([block_by_number], &mut batch)?;
466
467        let block_by_hash = (block_hash.into(), number.into());
468        self.blocks_by_hash.prepare_batch_insertion([block_by_hash], &mut batch)?;
469
470        let block_by_timestamp = (timestamp.into(), number.into());
471        self.blocks_by_timestamp.prepare_batch_insertion([block_by_timestamp], &mut batch)?;
472
473        self.prepare_batch_with_execution_changes(account_changes, number, &mut batch)?;
474
475        self.accounts.prepare_batch_insertion(
476            accounts.iter().cloned().map(|acc| {
477                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
478                (tup.0, tup.1.into())
479            }),
480            &mut batch,
481        )?;
482
483        self.accounts_history.prepare_batch_insertion(
484            accounts.iter().cloned().map(|acc| {
485                let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
486                ((tup.0, 0u32.into()), tup.1.into())
487            }),
488            &mut batch,
489        )?;
490
491        self.write_in_batch_for_multiple_cfs(batch)
492    }
493
494    pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> Result<()> {
495        let mut batch = WriteBatch::default();
496        self.prepare_block_insertion(block, account_changes, &mut batch)?;
497        self.write_in_batch_for_multiple_cfs(batch)
498    }
499
500    pub fn prepare_block_insertion(&self, block: Block, account_changes: ExecutionChanges, batch: &mut WriteBatch) -> Result<()> {
501        let mut txs_batch = vec![];
502        for transaction in block.transactions.iter().cloned() {
503            txs_batch.push((transaction.info.hash.into(), transaction.evm_input.block_number.into()));
504        }
505
506        self.transactions.prepare_batch_insertion(txs_batch, batch)?;
507
508        let number = block.number();
509        let block_hash = block.hash();
510        let timestamp = block.header.timestamp;
511
512        let block_by_number = (number.into(), block.into());
513        self.blocks_by_number.prepare_batch_insertion([block_by_number], batch)?;
514
515        let block_by_hash = (block_hash.into(), number.into());
516        self.blocks_by_hash.prepare_batch_insertion([block_by_hash], batch)?;
517
518        let block_by_timestamp = (timestamp.into(), number.into());
519        self.blocks_by_timestamp.prepare_batch_insertion([block_by_timestamp], batch)?;
520
521        self.prepare_batch_with_execution_changes(account_changes, number, batch)?;
522
523        Ok(())
524    }
525
526    /// Write to DB in a batch
527    pub fn write_in_batch_for_multiple_cfs(&self, batch: WriteBatch) -> Result<()> {
528        tracing::debug!("writing batch");
529        let batch_len = batch.len();
530
531        let mut options = WriteOptions::default();
532        // By default, each write to rocksdb is asynchronous: it returns after pushing
533        // the write from the process into the operating system (buffer cache).
534        //
535        // This option offers the trade-off of waiting after each write to
536        // ensure data is persisted to disk before returning, preventing
537        // potential data loss in case of system failure.
538        options.set_sync(self.enable_sync_write);
539
540        self.db
541            .write_opt(batch, &options)
542            .context("failed to write in batch to (possibly) multiple column families")
543            .inspect_err(|e| {
544                tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
545            })
546    }
547
548    #[cfg(feature = "dev")]
549    pub fn save_slot(&self, address: Address, slot: Slot) -> Result<()> {
550        let mut batch = WriteBatch::default();
551        self.account_slots
552            .prepare_batch_insertion([((address.into(), slot.index.into()), slot.value.into())], &mut batch)?;
553        self.account_slots.apply_batch_with_context(batch)
554    }
555
556    #[cfg(feature = "dev")]
557    pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> Result<()> {
558        let mut batch = WriteBatch::default();
559
560        // Get the current account or create a new one
561        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
562
563        // Update the nonce
564        account_info_entry.nonce = nonce.into();
565
566        // Save the account
567        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
568        self.accounts.apply_batch_with_context(batch)
569    }
570
571    #[cfg(feature = "dev")]
572    pub fn save_account_balance(&self, address: Address, balance: Wei) -> Result<()> {
573        let mut batch = WriteBatch::default();
574
575        // Get the current account or create a new one
576        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
577
578        // Update the balance
579        account_info_entry.balance = balance.into();
580
581        // Save the account
582        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
583        self.accounts.apply_batch_with_context(batch)
584    }
585
586    #[cfg(feature = "dev")]
587    pub fn save_account_code(&self, address: Address, code: Bytes) -> Result<()> {
588        use crate::alias::RevmBytecode;
589
590        let mut batch = WriteBatch::default();
591
592        // Get the current account or create a new one
593        let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
594
595        // Update the bytecode
596        account_info_entry.bytecode = if code.0.is_empty() {
597            None
598        } else {
599            Some(RevmBytecode::new_raw(code.0.into()))
600        }
601        .map_into();
602
603        // Save the account
604        self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
605        self.accounts.apply_batch_with_context(batch)
606    }
607
608    #[cfg(test)]
609    pub fn read_all_accounts(&self) -> Result<Vec<AccountRocksdb>> {
610        self.accounts.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
611    }
612
613    #[cfg(test)]
614    pub fn read_all_historical_accounts(&self) -> Result<Vec<AccountRocksdb>> {
615        self.accounts_history.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
616    }
617
618    #[cfg(feature = "dev")]
619    pub fn clear(&self) -> Result<()> {
620        self.accounts.clear().context("when clearing accounts")?;
621        self.accounts_history.clear().context("when clearing accounts_history")?;
622        self.account_slots.clear().context("when clearing account_slots")?;
623        self.account_slots_history.clear().context("when clearing account_slots_history")?;
624        self.transactions.clear().context("when clearing transactions")?;
625        self.blocks_by_hash.clear().context("when clearing blocks_by_hash")?;
626        self.blocks_by_number.clear().context("when clearing blocks_by_number")?;
627        self.blocks_by_timestamp.clear().context("when clearing blocks_by_timestamp")?;
628        Ok(())
629    }
630
631    pub fn read_block_with_changes(&self, selection: BlockFilter) -> Result<Option<(Block, BlockChangesRocksdb)>> {
632        let Some(block_wo_changes) = self.read_block(selection)? else {
633            return Ok(None);
634        };
635        let changes = self.block_changes.get(&block_wo_changes.number().into())?;
636        Ok(Some((block_wo_changes, changes.map(|changes| changes.into_inner()).unwrap_or_default())))
637    }
638}
639
640#[cfg(feature = "rocks_metrics")]
641impl RocksStorageState {
642    pub fn export_metrics(&self) -> Result<()> {
643        let db_get = self.db_options.get_histogram_data(Histogram::DbGet);
644        let db_write = self.db_options.get_histogram_data(Histogram::DbWrite);
645
646        let wal_file_synced = self.db_options.get_ticker_count(Ticker::WalFileSynced);
647        let block_cache_miss = self.db_options.get_ticker_count(Ticker::BlockCacheMiss);
648        let block_cache_hit = self.db_options.get_ticker_count(Ticker::BlockCacheHit);
649        let bytes_written = self.db_options.get_ticker_count(Ticker::BytesWritten);
650        let bytes_read = self.db_options.get_ticker_count(Ticker::BytesRead);
651
652        let cur_size_active_mem_table = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ACTIVE_MEM_TABLE).unwrap_or_default();
653        let cur_size_all_mem_tables = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ALL_MEM_TABLES).unwrap_or_default();
654        let size_all_mem_tables = self.db.property_int_value(rocksdb::properties::SIZE_ALL_MEM_TABLES).unwrap_or_default();
655        let block_cache_usage = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_USAGE).unwrap_or_default();
656        let block_cache_capacity = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_CAPACITY).unwrap_or_default();
657        let background_errors = self.db.property_int_value(rocksdb::properties::BACKGROUND_ERRORS).unwrap_or_default();
658
659        let db_name = self.db_path_filename();
660
661        metrics::set_rocks_db_get(db_get.count(), db_name);
662        metrics::set_rocks_db_write(db_write.count(), db_name);
663        metrics::set_rocks_block_cache_miss(block_cache_miss, db_name);
664        metrics::set_rocks_block_cache_hit(block_cache_hit, db_name);
665        metrics::set_rocks_bytes_written(bytes_written, db_name);
666        metrics::set_rocks_bytes_read(bytes_read, db_name);
667        metrics::set_rocks_wal_file_synced(wal_file_synced, db_name);
668
669        metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime)?, db_name);
670        metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime)?, db_name);
671        metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime)?, db_name);
672
673        if let Some(cur_size_active_mem_table) = cur_size_active_mem_table {
674            metrics::set_rocks_cur_size_active_mem_table(cur_size_active_mem_table, db_name);
675        }
676
677        if let Some(cur_size_all_mem_tables) = cur_size_all_mem_tables {
678            metrics::set_rocks_cur_size_all_mem_tables(cur_size_all_mem_tables, db_name);
679        }
680
681        if let Some(size_all_mem_tables) = size_all_mem_tables {
682            metrics::set_rocks_size_all_mem_tables(size_all_mem_tables, db_name);
683        }
684
685        if let Some(block_cache_usage) = block_cache_usage {
686            metrics::set_rocks_block_cache_usage(block_cache_usage, db_name);
687        }
688        if let Some(block_cache_capacity) = block_cache_capacity {
689            metrics::set_rocks_block_cache_capacity(block_cache_capacity, db_name);
690        }
691        if let Some(background_errors) = background_errors {
692            metrics::set_rocks_background_errors(background_errors, db_name);
693        }
694
695        self.account_slots.export_metrics();
696        self.account_slots_history.export_metrics();
697        self.accounts.export_metrics();
698        self.accounts_history.export_metrics();
699        self.blocks_by_hash.export_metrics();
700        self.blocks_by_number.export_metrics();
701        self.blocks_by_timestamp.export_metrics();
702        self.transactions.export_metrics();
703        Ok(())
704    }
705
706    fn get_histogram_average_in_interval(&self, hist: Histogram) -> Result<u64> {
707        // The stats are cumulative since opening the db
708        // we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)
709
710        let mut prev_values = self.prev_stats.lock();
711
712        let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
713        let data = self.db_options.get_histogram_data(hist);
714        let data_count = data.count();
715        let data_sum = data.sum();
716
717        let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else {
718            return Ok(0);
719        };
720
721        prev_values.insert(hist as u32, (data_sum, data_count));
722        Ok(avg)
723    }
724}
725
726#[cfg(feature = "metrics")]
727impl RocksStorageState {
728    pub fn export_column_family_size_metrics(&self) -> Result<()> {
729        let db_name = self.db_path_filename();
730
731        let cf_names = DB::list_cf(&Options::default(), &self.db_path)?;
732
733        for cf_name in cf_names {
734            if let Some(cf_handle) = self.db.cf_handle(&cf_name)
735                && let Ok(Some(size)) = self.db.property_int_value_cf(&cf_handle, "rocksdb.total-sst-files-size")
736            {
737                metrics::set_rocks_cf_size(size, db_name, &cf_name);
738            }
739        }
740
741        Ok(())
742    }
743}
744
745impl Drop for RocksStorageState {
746    fn drop(&mut self) {
747        let mut options = WaitForCompactOptions::default();
748        // if background jobs are paused, it makes no sense to keep waiting indefinitely
749        options.set_abort_on_pause(true);
750        // flush all write buffers before waiting
751        options.set_flush(true);
752        // wait for the duration passed to `--rocks-shutdown-timeout`, or the default value
753        options.set_timeout(self.shutdown_timeout.as_micros().try_into().unwrap_or_default());
754
755        tracing::info!("starting rocksdb shutdown");
756        let instant = Instant::now();
757
758        // here, waiting for is done to force WAL logs to be processed, to skip replaying most of them when booting
759        // up, which was slowing things down, we are sacrificing a long shutdown for a faster boot
760        let result = self.db.wait_for_compact(&options);
761        let waited_for = instant.elapsed();
762
763        #[cfg(feature = "metrics")]
764        {
765            let db_name = self.db_path_filename();
766            metrics::set_rocks_last_shutdown_delay_millis(waited_for.as_millis() as u64, db_name);
767        }
768
769        if let Err(e) = result {
770            tracing::error!(reason = ?e, ?waited_for, "rocksdb shutdown compaction didn't finish in time, shutting it down anyways");
771        } else {
772            tracing::info!(?waited_for, "finished rocksdb shutdown");
773        }
774    }
775}
776
777impl fmt::Debug for RocksStorageState {
778    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
779        f.debug_struct("RocksStorageState").field("db_path", &self.db_path).finish()
780    }
781}
782
783#[cfg(test)]
784mod tests {
785
786    use fake::Fake;
787    use fake::Faker;
788
789    use super::*;
790    use crate::eth::executor::EvmExecutionResult;
791    use crate::eth::executor::EvmInput;
792    use crate::eth::primitives::BlockHeader;
793    use crate::eth::primitives::EvmExecution;
794    use crate::eth::primitives::TransactionExecution;
795
796    #[test]
797    #[cfg(feature = "dev")]
798    fn test_rocks_multi_get() {
799        use std::collections::HashMap;
800        use std::collections::HashSet;
801        use std::iter;
802
803        type Key = (AddressRocksdb, SlotIndexRocksdb);
804        type Value = CfAccountSlotsValue;
805
806        let (mut state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
807        let account_slots = &mut state.account_slots;
808
809        // slots and extra_slots to be inserted
810        let slots: HashMap<Key, Value> = (0..1000).map(|_| Faker.fake()).collect();
811        let extra_slots: HashMap<Key, Value> = iter::repeat_with(|| Faker.fake())
812            .filter(|(key, _)| !slots.contains_key(key))
813            .take(1000)
814            .collect();
815
816        let mut batch = WriteBatch::default();
817        account_slots.prepare_batch_insertion(slots.clone(), &mut batch).unwrap();
818        account_slots.prepare_batch_insertion(extra_slots.clone(), &mut batch).unwrap();
819        account_slots.apply_batch_with_context(batch).unwrap();
820
821        // keys that don't match any entry
822        let extra_keys: HashSet<Key> = (0..1000)
823            .map(|_| Faker.fake())
824            .filter(|key| !extra_slots.contains_key(key) && !slots.contains_key(key))
825            .collect();
826
827        // concatenation of keys for inserted elements, and keys that aren't in the DB
828        let keys: Vec<Key> = slots.keys().copied().chain(extra_keys).collect();
829        let result = account_slots.multi_get(keys).expect("this should not fail");
830
831        // check that, besides having extra slots inserted, and extra keys when querying,
832        // only the expected slots are returned
833        assert_eq!(result.len(), slots.keys().len());
834        for (idx, value) in result {
835            assert_eq!(value, *slots.get(&idx).expect("slot should be found"));
836        }
837    }
838
839    #[test]
840    fn regression_test_read_logs_without_providing_filter_address() {
841        let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
842
843        assert_eq!(state.read_logs(&LogFilter::default()).unwrap(), vec![]);
844
845        // 100 blocks with 1 transaction, with 2 logs, total: 200 logs
846        for number in 0..100 {
847            let block = Block {
848                header: BlockHeader {
849                    number: number.into(),
850                    ..Faker.fake()
851                },
852                transactions: vec![TransactionMined {
853                    execution: TransactionExecution {
854                        evm_input: EvmInput {
855                            block_number: number.into(),
856                            ..Faker.fake()
857                        },
858                        result: EvmExecutionResult {
859                            execution: EvmExecution {
860                                logs: vec![Faker.fake(), Faker.fake()],
861                                ..Faker.fake()
862                            },
863                            ..Faker.fake()
864                        },
865                        ..Faker.fake()
866                    },
867                    ..Faker.fake()
868                }],
869            };
870
871            state.save_block(block, ExecutionChanges::default()).unwrap();
872        }
873
874        let filter = LogFilter {
875            // address is empty
876            addresses: vec![],
877            ..Default::default()
878        };
879
880        assert_eq!(state.read_logs(&filter).unwrap().len(), 200);
881    }
882
883    #[test]
884    fn test_column_families_creation_order_is_deterministic() {
885        let mut previous_order: Option<Vec<String>> = None;
886
887        // Run the test multiple times to ensure RocksDB CF creation order is deterministic
888        for i in 0..10 {
889            let test_dir = tempfile::tempdir().unwrap();
890            let path = test_dir.as_ref().display().to_string();
891
892            // Create the database with the column families
893            let cf_options_map = generate_cf_options_map(&RocksCfCacheConfig::default());
894            let (_db, _) = create_or_open_db(&path, &cf_options_map).unwrap();
895
896            // Get the actual CF order from RocksDB
897            let actual_cf_order = DB::list_cf(&Options::default(), &path).unwrap();
898
899            // Compare with previous iteration
900            if let Some(prev_order) = &previous_order {
901                assert_eq!(
902                    &actual_cf_order,
903                    prev_order,
904                    "RocksDB column family order changed between iterations {} and {}",
905                    i - 1,
906                    i
907                );
908            } else {
909                previous_order = Some(actual_cf_order);
910            }
911        }
912    }
913
914    #[test]
915    fn test_bincode_lexicographical_ordering() {
916        use super::super::types::BlockNumberRocksdb;
917        use crate::rocks_bincode_config;
918
919        let boundary_tests = vec![(250, 251), (255, 256), (511, 512), (1023, 1024), (2047, 2048)];
920
921        for (before, after) in boundary_tests {
922            let before_rocks = BlockNumberRocksdb::from(before as u32);
923            let after_rocks = BlockNumberRocksdb::from(after as u32);
924
925            let before_bytes = bincode::encode_to_vec(before_rocks, rocks_bincode_config()).unwrap();
926            let after_bytes = bincode::encode_to_vec(after_rocks, rocks_bincode_config()).unwrap();
927
928            let lexicographic_order_correct = before_bytes < after_bytes;
929
930            assert!(
931                lexicographic_order_correct,
932                "Block {before} should serialize to bytes < Block {after} for RocksDB ordering",
933            );
934        }
935    }
936}