1use std::collections::BTreeMap;
2use std::fmt;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::sync::atomic::AtomicU32;
6use std::time::Duration;
7use std::time::Instant;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use rocksdb::DB;
13use rocksdb::Direction;
14use rocksdb::Options;
15use rocksdb::WaitForCompactOptions;
16use rocksdb::WriteBatch;
17use rocksdb::WriteOptions;
18use serde::Deserialize;
19use serde::Serialize;
20use sugars::btmap;
21
22use super::cf_versions::CfAccountSlotsHistoryValue;
23use super::cf_versions::CfAccountSlotsValue;
24use super::cf_versions::CfAccountsHistoryValue;
25use super::cf_versions::CfAccountsValue;
26use super::cf_versions::CfBlocksByHashValue;
27use super::cf_versions::CfBlocksByNumberValue;
28use super::cf_versions::CfTransactionsValue;
29use super::rocks_cf::RocksCfRef;
30use super::rocks_config::CacheSetting;
31use super::rocks_config::DbConfig;
32use super::rocks_db::create_or_open_db;
33use super::types::AccountRocksdb;
34use super::types::AddressRocksdb;
35use super::types::BlockNumberRocksdb;
36use super::types::HashRocksdb;
37use super::types::SlotIndexRocksdb;
38use super::types::SlotValueRocksdb;
39use crate::eth::primitives::Account;
40use crate::eth::primitives::Address;
41use crate::eth::primitives::Block;
42use crate::eth::primitives::BlockFilter;
43use crate::eth::primitives::BlockNumber;
44#[cfg(feature = "dev")]
45use crate::eth::primitives::Bytes;
46use crate::eth::primitives::ExecutionChanges;
47use crate::eth::primitives::Hash;
48use crate::eth::primitives::LogFilter;
49use crate::eth::primitives::LogMined;
50#[cfg(feature = "dev")]
51use crate::eth::primitives::Nonce;
52use crate::eth::primitives::PointInTime;
53use crate::eth::primitives::Slot;
54use crate::eth::primitives::SlotIndex;
55use crate::eth::primitives::TransactionMined;
56#[cfg(feature = "dev")]
57use crate::eth::primitives::Wei;
58use crate::eth::storage::permanent::rocks::SerializeDeserializeWithContext;
59use crate::eth::storage::permanent::rocks::cf_versions::CfBlockChangesValue;
60use crate::eth::storage::permanent::rocks::types::AccountChangesRocksdb;
61use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
62use crate::ext::OptionExt;
63#[cfg(feature = "metrics")]
64use crate::infra::metrics;
65use crate::log_and_err;
66use crate::utils::GIGABYTE;
67
68cfg_if::cfg_if! {
69 if #[cfg(feature = "rocks_metrics")] {
70 use parking_lot::Mutex;
71
72 use rocksdb::statistics::Histogram;
73 use rocksdb::statistics::Ticker;
74 use std::collections::HashMap;
75
76
77 use crate::infra::metrics::{Count, HistogramInt, Sum};
78 }
79}
80
81pub fn generate_cf_options_map(cache_multiplier: Option<f32>) -> BTreeMap<&'static str, Options> {
82 let cache_multiplier = cache_multiplier.unwrap_or(1.0);
83
84 let cached_in_gigs_and_multiplied = |size_in_gbs: u64| -> CacheSetting {
86 let size = (size_in_gbs as f32) * cache_multiplier;
87 let size = GIGABYTE * size as usize;
88 CacheSetting::Enabled(size)
89 };
90
91 btmap! {
92 "accounts" => DbConfig::OptimizedPointLookUp.to_options(cached_in_gigs_and_multiplied(10)),
93 "accounts_history" => DbConfig::HistoricalData.to_options(cached_in_gigs_and_multiplied(2)),
94 "account_slots" => DbConfig::OptimizedPointLookUp.to_options(cached_in_gigs_and_multiplied(30)),
95 "account_slots_history" => DbConfig::HistoricalData.to_options(cached_in_gigs_and_multiplied(2)),
96 "transactions" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
97 "blocks_by_number" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
98 "blocks_by_hash" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
99 "block_changes" => DbConfig::Default.to_options(cached_in_gigs_and_multiplied(2)),
100 }
101}
102
103fn new_cf_ref<'a, K, V>(db: &'a Arc<DB>, column_family: &str, cf_options_map: &BTreeMap<&str, Options>) -> Result<RocksCfRef<'a, K, V>>
105where
106 K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
107 V: Serialize + for<'de> Deserialize<'de> + Debug + Clone + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
108{
109 tracing::debug!(column_family = column_family, "creating new column family");
110
111 cf_options_map
112 .get(column_family)
113 .with_context(|| format!("matching column_family `{column_family}` given to `new_cf_ref` wasn't found in configuration map"))?;
114
115 RocksCfRef::new(db, column_family)
117}
118
119pub struct RocksStorageState {
123 pub db: Arc<DB>,
124 db_path: String,
125 accounts: RocksCfRef<'static, AddressRocksdb, CfAccountsValue>,
126 accounts_history: RocksCfRef<'static, (AddressRocksdb, BlockNumberRocksdb), CfAccountsHistoryValue>,
127 account_slots: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb), CfAccountSlotsValue>,
128 account_slots_history: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb, BlockNumberRocksdb), CfAccountSlotsHistoryValue>,
129 pub transactions: RocksCfRef<'static, HashRocksdb, CfTransactionsValue>,
130 pub blocks_by_number: RocksCfRef<'static, BlockNumberRocksdb, CfBlocksByNumberValue>,
131 blocks_by_hash: RocksCfRef<'static, HashRocksdb, CfBlocksByHashValue>,
132 block_changes: RocksCfRef<'static, BlockNumberRocksdb, CfBlockChangesValue>,
133 #[cfg(feature = "rocks_metrics")]
135 prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
136 #[cfg(feature = "rocks_metrics")]
141 db_options: Options,
142 shutdown_timeout: Duration,
143 enable_sync_write: bool,
144}
145
146impl RocksStorageState {
147 pub fn new(path: String, shutdown_timeout: Duration, cache_multiplier: Option<f32>, enable_sync_write: bool) -> Result<Self> {
148 tracing::debug!("creating (or opening an existing) database with the specified column families");
149
150 let cf_options_map = generate_cf_options_map(cache_multiplier);
151
152 #[cfg_attr(not(feature = "rocks_metrics"), allow(unused_variables))]
153 let (db, db_options) = create_or_open_db(&path, &cf_options_map).context("when trying to create (or open) rocksdb")?;
154
155 if db.path().to_str().is_none() {
156 bail!("db path doesn't isn't valid UTF-8: {:?}", db.path());
157 }
158 if db.path().file_name().is_none() {
159 bail!("db path doesn't have a file name or isn't valid UTF-8: {:?}", db.path());
160 }
161
162 let state = Self {
163 db_path: path,
164 accounts: new_cf_ref(db, "accounts", &cf_options_map)?,
165 accounts_history: new_cf_ref(db, "accounts_history", &cf_options_map)?,
166 account_slots: new_cf_ref(db, "account_slots", &cf_options_map)?,
167 account_slots_history: new_cf_ref(db, "account_slots_history", &cf_options_map)?,
168 transactions: new_cf_ref(db, "transactions", &cf_options_map)?,
169 blocks_by_number: new_cf_ref(db, "blocks_by_number", &cf_options_map)?,
170 blocks_by_hash: new_cf_ref(db, "blocks_by_hash", &cf_options_map)?,
171 block_changes: new_cf_ref(db, "block_changes", &cf_options_map)?,
172 #[cfg(feature = "rocks_metrics")]
173 prev_stats: Mutex::default(),
174 #[cfg(feature = "rocks_metrics")]
175 db_options,
176 db: Arc::clone(db),
177 shutdown_timeout,
178 enable_sync_write,
179 };
180
181 tracing::debug!("opened database successfully");
182 Ok(state)
183 }
184
185 #[cfg(test)]
186 #[track_caller]
187 pub fn new_in_testdir() -> anyhow::Result<(Self, tempfile::TempDir)> {
188 let test_dir = tempfile::tempdir()?;
189 let path = test_dir.as_ref().display().to_string();
190 let state = Self::new(path, Duration::ZERO, None, true)?;
191 Ok((state, test_dir))
192 }
193
194 #[cfg(feature = "metrics")]
196 fn db_path_filename(&self) -> &str {
197 self.db_path.rsplit('/').next().unwrap_or(&self.db_path)
198 }
199
200 pub fn preload_block_number(&self) -> Result<AtomicU32> {
201 let block_number = self.blocks_by_number.last_key()?.unwrap_or_default().0;
202 tracing::info!(%block_number, "preloaded block_number");
203 Ok(AtomicU32::new(block_number))
204 }
205
206 #[cfg(feature = "dev")]
207 pub fn reset(&self) -> Result<()> {
208 self.accounts.clear()?;
209 self.accounts_history.clear()?;
210 self.account_slots.clear()?;
211 self.account_slots_history.clear()?;
212 self.transactions.clear()?;
213 self.blocks_by_number.clear()?;
214 self.blocks_by_hash.clear()?;
215 Ok(())
216 }
217
218 fn prepare_batch_with_execution_changes(&self, changes: ExecutionChanges, block_number: BlockNumber, batch: &mut WriteBatch) -> Result<()> {
220 let mut block_changes = BlockChangesRocksdb::default();
221 let block_number = block_number.into();
222
223 for (address, change) in changes {
224 let address: AddressRocksdb = address.into();
225 let mut account_change_entry = AccountChangesRocksdb::default();
226
227 if change.is_account_modified() {
228 let mut account_info_entry = self.accounts.get(&address)?.unwrap_or(AccountRocksdb::default().into());
229
230 if let Some(nonce) = change.nonce.take_modified() {
231 account_info_entry.nonce = nonce.into();
232 account_change_entry.nonce = Some(nonce.into());
233 }
234 if let Some(balance) = change.balance.take_modified() {
235 account_info_entry.balance = balance.into();
236 account_change_entry.balance = Some(balance.into());
237 }
238 if let Some(bytecode) = change.bytecode.take_modified() {
239 account_info_entry.bytecode = bytecode.clone().map_into();
240 account_change_entry.bytecode = bytecode.map_into();
241 }
242
243 self.accounts.prepare_batch_insertion([(address, account_info_entry.clone())], batch)?;
244 self.accounts_history
245 .prepare_batch_insertion([((address, block_number), account_info_entry.into_inner().into())], batch)?;
246 }
247
248 for (&slot_index, slot_change) in &change.slots {
249 if let Some(slot) = slot_change.take_modified_ref() {
250 let slot_index = slot_index.into();
251 let slot_value: SlotValueRocksdb = slot.value.into();
252
253 account_change_entry.slot_changes.insert(slot_index, slot_value);
254 self.account_slots
255 .prepare_batch_insertion([((address, slot_index), slot_value.into())], batch)?;
256 self.account_slots_history
257 .prepare_batch_insertion([((address, slot_index, block_number), slot_value.into())], batch)?;
258 }
259 }
260
261 account_change_entry
262 .has_changes()
263 .then(|| block_changes.account_changes.insert(address, account_change_entry));
264 }
265
266 self.block_changes.prepare_batch_insertion([(block_number, block_changes.into())], batch)?;
267
268 Ok(())
269 }
270
271 pub fn read_transaction(&self, tx_hash: Hash) -> Result<Option<TransactionMined>> {
272 let Some(block_number) = self.transactions.get(&tx_hash.into())? else {
273 return Ok(None);
274 };
275
276 let Some(block) = self.blocks_by_number.get(&block_number)? else {
277 return log_and_err!("the block that the transaction was supposed to be in was not found")
278 .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}"));
279 };
280 let block = block.into_inner();
281
282 let transaction = block.transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);
283
284 match transaction {
285 Some(tx) => {
286 tracing::trace!(%tx_hash, "transaction found");
287 Ok(Some(TransactionMined::from_rocks_primitives(tx, block_number.into_inner(), block.header.hash)))
288 }
289 None => log_and_err!("rocks error, transaction wasn't found in block where the index pointed at")
290 .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}")),
291 }
292 }
293
294 pub fn read_logs(&self, filter: &LogFilter) -> Result<Vec<LogMined>> {
295 let is_block_number_in_end_range = |number: BlockNumber| match filter.to_block.as_ref() {
296 Some(&last_block) => number <= last_block,
297 None => true,
298 };
299
300 let iter = self
301 .blocks_by_number
302 .iter_from(BlockNumberRocksdb::from(filter.from_block), Direction::Forward)?;
303
304 let mut logs_result = vec![];
305
306 for next in iter {
307 let (number, block) = next?;
308
309 if !is_block_number_in_end_range(number.into()) {
310 break;
311 }
312
313 let block = block.into_inner();
314 let logs = block.transactions.into_iter().enumerate().flat_map(|(tx_index, transaction)| {
315 transaction.logs.into_iter().map(move |log| {
316 LogMined::from_rocks_primitives(log.log, block.header.number, block.header.hash, tx_index, transaction.input.hash, log.index)
317 })
318 });
319
320 let filtered_logs = logs.filter(|log| filter.matches(log));
321 logs_result.extend(filtered_logs);
322 }
323 Ok(logs_result)
324 }
325
326 pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> Result<Option<Slot>> {
327 if address.is_coinbase() {
328 return Ok(None);
329 }
330
331 match point_in_time {
332 PointInTime::Mined | PointInTime::Pending => {
333 let query_params = (address.into(), index.into());
334
335 let Some(account_slot_value) = self.account_slots.get(&query_params)? else {
336 return Ok(None);
337 };
338
339 Ok(Some(Slot {
340 index,
341 value: account_slot_value.into_inner().into(),
342 }))
343 }
344 PointInTime::MinedPast(block_number) => {
345 tracing::debug!(?address, ?index, ?block_number, "searching slot");
346
347 let key = (address.into(), (index).into(), block_number.into());
348
349 if let Some(((rocks_address, rocks_index, block), value)) = self.account_slots_history.seek(key)?
350 && rocks_index == index.into()
351 && rocks_address == address.into()
352 {
353 tracing::debug!(?block, ?rocks_index, ?rocks_address, "slot found in rocksdb storage");
354 return Ok(Some(Slot {
355 index: rocks_index.into(),
356 value: value.into_inner().into(),
357 }));
358 }
359 Ok(None)
360 }
361 }
362 }
363
364 pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> Result<Option<Account>> {
365 if address.is_coinbase() || address.is_zero() {
366 return Ok(None);
367 }
368
369 match point_in_time {
370 PointInTime::Mined | PointInTime::Pending => {
371 let Some(inner_account) = self.accounts.get(&address.into())? else {
372 tracing::trace!(%address, "account not found");
373 return Ok(None);
374 };
375
376 let account = inner_account.to_account(address);
377 tracing::trace!(%address, ?account, "account found");
378 Ok(Some(account))
379 }
380 PointInTime::MinedPast(block_number) => {
381 tracing::debug!(?address, ?block_number, "searching account");
382
383 let key = (address.into(), block_number.into());
384
385 if let Some(((addr, block), account_info)) = self.accounts_history.seek(key)?
386 && addr == address.into()
387 {
388 tracing::debug!(?block, ?address, "account found in rocksdb storage");
389 return Ok(Some(account_info.to_account(address)));
390 }
391 Ok(None)
392 }
393 }
394 }
395
396 pub fn read_block(&self, selection: BlockFilter) -> Result<Option<Block>> {
397 tracing::debug!(?selection, "reading block");
398
399 let block = match selection {
400 BlockFilter::Latest | BlockFilter::Pending => self.blocks_by_number.last_value(),
401 BlockFilter::Earliest => self.blocks_by_number.first_value(),
402 BlockFilter::Number(block_number) => self.blocks_by_number.get(&block_number.into()),
403 BlockFilter::Hash(block_hash) =>
404 if let Some(block_number) = self.blocks_by_hash.get(&block_hash.into())? {
405 self.blocks_by_number.get(&block_number)
406 } else {
407 Ok(None)
408 },
409 };
410 block.map(|block_option| block_option.map(|block| block.into_inner().into()))
411 }
412
413 pub fn save_accounts(&self, accounts: Vec<Account>) -> Result<()> {
414 let mut write_batch = WriteBatch::default();
415
416 tracing::debug!(?accounts, "preparing accounts cf batch");
417 self.accounts.prepare_batch_insertion(
418 accounts.iter().cloned().map(|acc| {
419 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
420 (tup.0, tup.1.into())
421 }),
422 &mut write_batch,
423 )?;
424
425 tracing::debug!(?accounts, "preparing accounts history batch");
426 self.accounts_history.prepare_batch_insertion(
427 accounts.iter().cloned().map(|acc| {
428 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
429 ((tup.0, 0u32.into()), tup.1.into())
430 }),
431 &mut write_batch,
432 )?;
433
434 self.write_in_batch_for_multiple_cfs(write_batch)
435 }
436
437 pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> Result<()> {
438 let mut batch = WriteBatch::default();
439
440 let mut txs_batch = vec![];
441 for transaction in block.transactions.iter().cloned() {
442 txs_batch.push((transaction.input.hash.into(), transaction.block_number.into()));
443 }
444 self.transactions.prepare_batch_insertion(txs_batch, &mut batch)?;
445
446 let number = block.number();
447 let block_hash = block.hash();
448
449 let block_by_number = (number.into(), block.into());
450 self.blocks_by_number.prepare_batch_insertion([block_by_number], &mut batch)?;
451
452 let block_by_hash = (block_hash.into(), number.into());
453 self.blocks_by_hash.prepare_batch_insertion([block_by_hash], &mut batch)?;
454
455 self.prepare_batch_with_execution_changes(account_changes, number, &mut batch)?;
456
457 self.accounts.prepare_batch_insertion(
458 accounts.iter().cloned().map(|acc| {
459 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
460 (tup.0, tup.1.into())
461 }),
462 &mut batch,
463 )?;
464
465 self.accounts_history.prepare_batch_insertion(
466 accounts.iter().cloned().map(|acc| {
467 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
468 ((tup.0, 0u32.into()), tup.1.into())
469 }),
470 &mut batch,
471 )?;
472
473 self.write_in_batch_for_multiple_cfs(batch)
474 }
475
476 pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> Result<()> {
477 let mut batch = WriteBatch::default();
478 self.prepare_block_insertion(block, account_changes, &mut batch)?;
479 self.write_in_batch_for_multiple_cfs(batch)
480 }
481
482 pub fn prepare_block_insertion(&self, block: Block, account_changes: ExecutionChanges, batch: &mut WriteBatch) -> Result<()> {
483 let mut txs_batch = vec![];
484 for transaction in block.transactions.iter().cloned() {
485 txs_batch.push((transaction.input.hash.into(), transaction.block_number.into()));
486 }
487
488 self.transactions.prepare_batch_insertion(txs_batch, batch)?;
489
490 let number = block.number();
491 let block_hash = block.hash();
492
493 let block_by_number = (number.into(), block.into());
494 self.blocks_by_number.prepare_batch_insertion([block_by_number], batch)?;
495
496 let block_by_hash = (block_hash.into(), number.into());
497 self.blocks_by_hash.prepare_batch_insertion([block_by_hash], batch)?;
498
499 self.prepare_batch_with_execution_changes(account_changes, number, batch)?;
500
501 Ok(())
502 }
503
504 pub fn write_in_batch_for_multiple_cfs(&self, batch: WriteBatch) -> Result<()> {
506 tracing::debug!("writing batch");
507 let batch_len = batch.len();
508
509 let mut options = WriteOptions::default();
510 options.set_sync(self.enable_sync_write);
517
518 self.db
519 .write_opt(batch, &options)
520 .context("failed to write in batch to (possibly) multiple column families")
521 .inspect_err(|e| {
522 tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
523 })
524 }
525
526 #[cfg(feature = "dev")]
527 pub fn save_slot(&self, address: Address, slot: Slot) -> Result<()> {
528 let mut batch = WriteBatch::default();
529 self.account_slots
530 .prepare_batch_insertion([((address.into(), slot.index.into()), slot.value.into())], &mut batch)?;
531 self.account_slots.apply_batch_with_context(batch)
532 }
533
534 #[cfg(feature = "dev")]
535 pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> Result<()> {
536 let mut batch = WriteBatch::default();
537
538 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
540
541 account_info_entry.nonce = nonce.into();
543
544 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
546 self.accounts.apply_batch_with_context(batch)
547 }
548
549 #[cfg(feature = "dev")]
550 pub fn save_account_balance(&self, address: Address, balance: Wei) -> Result<()> {
551 let mut batch = WriteBatch::default();
552
553 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
555
556 account_info_entry.balance = balance.into();
558
559 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
561 self.accounts.apply_batch_with_context(batch)
562 }
563
564 #[cfg(feature = "dev")]
565 pub fn save_account_code(&self, address: Address, code: Bytes) -> Result<()> {
566 use crate::alias::RevmBytecode;
567
568 let mut batch = WriteBatch::default();
569
570 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
572
573 account_info_entry.bytecode = if code.0.is_empty() {
575 None
576 } else {
577 Some(RevmBytecode::new_raw(code.0.into()))
578 }
579 .map_into();
580
581 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
583 self.accounts.apply_batch_with_context(batch)
584 }
585
586 #[cfg(test)]
587 pub fn read_all_accounts(&self) -> Result<Vec<AccountRocksdb>> {
588 self.accounts.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
589 }
590
591 #[cfg(test)]
592 pub fn read_all_historical_accounts(&self) -> Result<Vec<AccountRocksdb>> {
593 self.accounts_history.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
594 }
595
596 #[cfg(feature = "dev")]
597 pub fn clear(&self) -> Result<()> {
598 self.accounts.clear().context("when clearing accounts")?;
599 self.accounts_history.clear().context("when clearing accounts_history")?;
600 self.account_slots.clear().context("when clearing account_slots")?;
601 self.account_slots_history.clear().context("when clearing account_slots_history")?;
602 self.transactions.clear().context("when clearing transactions")?;
603 self.blocks_by_hash.clear().context("when clearing blocks_by_hash")?;
604 self.blocks_by_number.clear().context("when clearing blocks_by_number")?;
605 Ok(())
606 }
607
608 pub fn read_block_with_changes(&self, selection: BlockFilter) -> Result<Option<(Block, ExecutionChanges)>> {
609 let Some(block_wo_changes) = self.read_block(selection)? else {
610 return Ok(None);
611 };
612 let changes = self.block_changes.get(&block_wo_changes.number().into())?;
613 Ok(Some((block_wo_changes, changes.map(|changes| changes.into_inner().into()).unwrap_or_default())))
614 }
615}
616
617#[cfg(feature = "rocks_metrics")]
618impl RocksStorageState {
619 pub fn export_metrics(&self) -> Result<()> {
620 let db_get = self.db_options.get_histogram_data(Histogram::DbGet);
621 let db_write = self.db_options.get_histogram_data(Histogram::DbWrite);
622
623 let wal_file_synced = self.db_options.get_ticker_count(Ticker::WalFileSynced);
624 let block_cache_miss = self.db_options.get_ticker_count(Ticker::BlockCacheMiss);
625 let block_cache_hit = self.db_options.get_ticker_count(Ticker::BlockCacheHit);
626 let bytes_written = self.db_options.get_ticker_count(Ticker::BytesWritten);
627 let bytes_read = self.db_options.get_ticker_count(Ticker::BytesRead);
628
629 let cur_size_active_mem_table = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ACTIVE_MEM_TABLE).unwrap_or_default();
630 let cur_size_all_mem_tables = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ALL_MEM_TABLES).unwrap_or_default();
631 let size_all_mem_tables = self.db.property_int_value(rocksdb::properties::SIZE_ALL_MEM_TABLES).unwrap_or_default();
632 let block_cache_usage = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_USAGE).unwrap_or_default();
633 let block_cache_capacity = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_CAPACITY).unwrap_or_default();
634 let background_errors = self.db.property_int_value(rocksdb::properties::BACKGROUND_ERRORS).unwrap_or_default();
635
636 let db_name = self.db_path_filename();
637
638 metrics::set_rocks_db_get(db_get.count(), db_name);
639 metrics::set_rocks_db_write(db_write.count(), db_name);
640 metrics::set_rocks_block_cache_miss(block_cache_miss, db_name);
641 metrics::set_rocks_block_cache_hit(block_cache_hit, db_name);
642 metrics::set_rocks_bytes_written(bytes_written, db_name);
643 metrics::set_rocks_bytes_read(bytes_read, db_name);
644 metrics::set_rocks_wal_file_synced(wal_file_synced, db_name);
645
646 metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime)?, db_name);
647 metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime)?, db_name);
648 metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime)?, db_name);
649
650 if let Some(cur_size_active_mem_table) = cur_size_active_mem_table {
651 metrics::set_rocks_cur_size_active_mem_table(cur_size_active_mem_table, db_name);
652 }
653
654 if let Some(cur_size_all_mem_tables) = cur_size_all_mem_tables {
655 metrics::set_rocks_cur_size_all_mem_tables(cur_size_all_mem_tables, db_name);
656 }
657
658 if let Some(size_all_mem_tables) = size_all_mem_tables {
659 metrics::set_rocks_size_all_mem_tables(size_all_mem_tables, db_name);
660 }
661
662 if let Some(block_cache_usage) = block_cache_usage {
663 metrics::set_rocks_block_cache_usage(block_cache_usage, db_name);
664 }
665 if let Some(block_cache_capacity) = block_cache_capacity {
666 metrics::set_rocks_block_cache_capacity(block_cache_capacity, db_name);
667 }
668 if let Some(background_errors) = background_errors {
669 metrics::set_rocks_background_errors(background_errors, db_name);
670 }
671
672 self.account_slots.export_metrics();
673 self.account_slots_history.export_metrics();
674 self.accounts.export_metrics();
675 self.accounts_history.export_metrics();
676 self.blocks_by_hash.export_metrics();
677 self.blocks_by_number.export_metrics();
678 self.transactions.export_metrics();
679 Ok(())
680 }
681
682 fn get_histogram_average_in_interval(&self, hist: Histogram) -> Result<u64> {
683 let mut prev_values = self.prev_stats.lock();
687
688 let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
689 let data = self.db_options.get_histogram_data(hist);
690 let data_count = data.count();
691 let data_sum = data.sum();
692
693 let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else {
694 return Ok(0);
695 };
696
697 prev_values.insert(hist as u32, (data_sum, data_count));
698 Ok(avg)
699 }
700}
701
702#[cfg(feature = "metrics")]
703impl RocksStorageState {
704 pub fn export_column_family_size_metrics(&self) -> Result<()> {
705 let db_name = self.db_path_filename();
706
707 let cf_names = DB::list_cf(&Options::default(), &self.db_path)?;
708
709 for cf_name in cf_names {
710 if let Some(cf_handle) = self.db.cf_handle(&cf_name)
711 && let Ok(Some(size)) = self.db.property_int_value_cf(&cf_handle, "rocksdb.total-sst-files-size")
712 {
713 metrics::set_rocks_cf_size(size, db_name, &cf_name);
714 }
715 }
716
717 Ok(())
718 }
719}
720
721impl Drop for RocksStorageState {
722 fn drop(&mut self) {
723 let mut options = WaitForCompactOptions::default();
724 options.set_abort_on_pause(true);
726 options.set_flush(true);
728 options.set_timeout(self.shutdown_timeout.as_micros().try_into().unwrap_or_default());
730
731 tracing::info!("starting rocksdb shutdown");
732 let instant = Instant::now();
733
734 let result = self.db.wait_for_compact(&options);
737 let waited_for = instant.elapsed();
738
739 #[cfg(feature = "metrics")]
740 {
741 let db_name = self.db_path_filename();
742 metrics::set_rocks_last_shutdown_delay_millis(waited_for.as_millis() as u64, db_name);
743 }
744
745 if let Err(e) = result {
746 tracing::error!(reason = ?e, ?waited_for, "rocksdb shutdown compaction didn't finish in time, shutting it down anyways");
747 } else {
748 tracing::info!(?waited_for, "finished rocksdb shutdown");
749 }
750 }
751}
752
753impl fmt::Debug for RocksStorageState {
754 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
755 f.debug_struct("RocksStorageState").field("db_path", &self.db_path).finish()
756 }
757}
758
759#[cfg(test)]
760mod tests {
761
762 use fake::Fake;
763 use fake::Faker;
764
765 use super::*;
766 use crate::alias::RevmBytecode;
767 use crate::eth::primitives::BlockHeader;
768 use crate::eth::primitives::ExecutionAccountChanges;
769 use crate::eth::primitives::ExecutionValueChange;
770
771 #[test]
772 #[cfg(feature = "dev")]
773 fn test_rocks_multi_get() {
774 use std::collections::HashMap;
775 use std::collections::HashSet;
776 use std::iter;
777
778 type Key = (AddressRocksdb, SlotIndexRocksdb);
779 type Value = CfAccountSlotsValue;
780
781 let (mut state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
782 let account_slots = &mut state.account_slots;
783
784 let slots: HashMap<Key, Value> = (0..1000).map(|_| Faker.fake()).collect();
786 let extra_slots: HashMap<Key, Value> = iter::repeat_with(|| Faker.fake())
787 .filter(|(key, _)| !slots.contains_key(key))
788 .take(1000)
789 .collect();
790
791 let mut batch = WriteBatch::default();
792 account_slots.prepare_batch_insertion(slots.clone(), &mut batch).unwrap();
793 account_slots.prepare_batch_insertion(extra_slots.clone(), &mut batch).unwrap();
794 account_slots.apply_batch_with_context(batch).unwrap();
795
796 let extra_keys: HashSet<Key> = (0..1000)
798 .map(|_| Faker.fake())
799 .filter(|key| !extra_slots.contains_key(key) && !slots.contains_key(key))
800 .collect();
801
802 let keys: Vec<Key> = slots.keys().copied().chain(extra_keys).collect();
804 let result = account_slots.multi_get(keys).expect("this should not fail");
805
806 assert_eq!(result.len(), slots.keys().len());
809 for (idx, value) in result {
810 assert_eq!(value, *slots.get(&idx).expect("slot should be found"));
811 }
812 }
813
814 #[test]
815 fn regression_test_read_logs_without_providing_filter_address() {
816 let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
817
818 assert_eq!(state.read_logs(&LogFilter::default()).unwrap(), vec![]);
819
820 for number in 0..100 {
822 let block = Block {
823 header: BlockHeader {
824 number: number.into(),
825 ..Faker.fake()
826 },
827 transactions: vec![TransactionMined {
828 block_number: number.into(),
829 logs: vec![Faker.fake(), Faker.fake()],
830 ..Faker.fake()
831 }],
832 };
833
834 state.save_block(block, ExecutionChanges::default()).unwrap();
835 }
836
837 let filter = LogFilter {
838 addresses: vec![],
840 ..Default::default()
841 };
842
843 assert_eq!(state.read_logs(&filter).unwrap().len(), 200);
844 }
845
846 #[test]
847 fn regression_test_saving_account_changes_for_accounts_that_didnt_change() {
848 let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
849
850 let addr: Address = Faker.fake();
851 let change_base = ExecutionAccountChanges {
852 nonce: ExecutionValueChange::from_original(Faker.fake()),
853 balance: ExecutionValueChange::from_original(Faker.fake()),
854 bytecode: ExecutionValueChange::from_original(Some(RevmBytecode::new_raw(Faker.fake::<Vec<u8>>().into()))),
855 slots: BTreeMap::new(),
856 };
857
858 let change_1 = ExecutionChanges::from([(addr, ExecutionAccountChanges { ..change_base.clone() })]);
859 let change_2 = ExecutionChanges::from([(
860 addr,
861 ExecutionAccountChanges {
862 nonce: ExecutionValueChange::from_modified(Faker.fake()),
863 ..change_base.clone()
864 },
865 )]);
866 let change_3 = ExecutionChanges::from([(
867 addr,
868 ExecutionAccountChanges {
869 balance: ExecutionValueChange::from_modified(Faker.fake()),
870 ..change_base.clone()
871 },
872 )]);
873 let change_4 = ExecutionChanges::from([(
874 addr,
875 ExecutionAccountChanges {
876 bytecode: ExecutionValueChange::from_modified(Some(RevmBytecode::new_raw(Faker.fake::<Vec<u8>>().into()))),
877 ..change_base
878 },
879 )]);
880
881 let mut batch = WriteBatch::default();
882 state.prepare_batch_with_execution_changes(change_1, 1.into(), &mut batch).unwrap();
884 state.prepare_batch_with_execution_changes(change_2, 2.into(), &mut batch).unwrap();
885 state.prepare_batch_with_execution_changes(change_3, 3.into(), &mut batch).unwrap();
886 state.prepare_batch_with_execution_changes(change_4, 4.into(), &mut batch).unwrap();
887 state.write_in_batch_for_multiple_cfs(batch).unwrap();
888
889 let accounts = state.read_all_accounts().unwrap();
890 assert_eq!(accounts.len(), 1);
891
892 let history = state.read_all_historical_accounts().unwrap();
893 assert_eq!(history.len(), 3);
894 }
895
896 #[test]
897 fn test_column_families_creation_order_is_deterministic() {
898 let mut previous_order: Option<Vec<String>> = None;
899
900 for i in 0..10 {
902 let test_dir = tempfile::tempdir().unwrap();
903 let path = test_dir.as_ref().display().to_string();
904
905 let cf_options_map = generate_cf_options_map(None);
907 let (_db, _) = create_or_open_db(&path, &cf_options_map).unwrap();
908
909 let actual_cf_order = DB::list_cf(&Options::default(), &path).unwrap();
911
912 if let Some(prev_order) = &previous_order {
914 assert_eq!(
915 &actual_cf_order,
916 prev_order,
917 "RocksDB column family order changed between iterations {} and {}",
918 i - 1,
919 i
920 );
921 } else {
922 previous_order = Some(actual_cf_order);
923 }
924 }
925 }
926
927 #[test]
928 fn test_bincode_lexicographical_ordering() {
929 use super::super::types::BlockNumberRocksdb;
930 use crate::rocks_bincode_config;
931
932 let boundary_tests = vec![(250, 251), (255, 256), (511, 512), (1023, 1024), (2047, 2048)];
933
934 for (before, after) in boundary_tests {
935 let before_rocks = BlockNumberRocksdb::from(before as u32);
936 let after_rocks = BlockNumberRocksdb::from(after as u32);
937
938 let before_bytes = bincode::encode_to_vec(before_rocks, rocks_bincode_config()).unwrap();
939 let after_bytes = bincode::encode_to_vec(after_rocks, rocks_bincode_config()).unwrap();
940
941 let lexicographic_order_correct = before_bytes < after_bytes;
942
943 assert!(
944 lexicographic_order_correct,
945 "Block {before} should serialize to bytes < Block {after} for RocksDB ordering",
946 );
947 }
948 }
949}