1use std::collections::BTreeMap;
2use std::fmt;
3use std::fmt::Debug;
4use std::sync::Arc;
5use std::sync::atomic::AtomicU32;
6use std::time::Duration;
7use std::time::Instant;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use itertools::Itertools;
13use rocksdb::DB;
14use rocksdb::Direction;
15use rocksdb::Options;
16use rocksdb::WaitForCompactOptions;
17use rocksdb::WriteBatch;
18use rocksdb::WriteOptions;
19use serde::Deserialize;
20use serde::Serialize;
21use sugars::btmap;
22
23use super::cf_versions::CfAccountSlotsHistoryValue;
24use super::cf_versions::CfAccountSlotsValue;
25use super::cf_versions::CfAccountsHistoryValue;
26use super::cf_versions::CfAccountsValue;
27use super::cf_versions::CfBlocksByHashValue;
28use super::cf_versions::CfBlocksByNumberValue;
29use super::cf_versions::CfTransactionsValue;
30use super::rocks_cf::RocksCfRef;
31use super::rocks_cf_cache_config::RocksCfCacheConfig;
32use super::rocks_config::CacheSetting;
33use super::rocks_config::DbConfig;
34use super::rocks_db::create_or_open_db;
35use super::types::AccountRocksdb;
36use super::types::AddressRocksdb;
37use super::types::BlockNumberRocksdb;
38use super::types::HashRocksdb;
39use super::types::SlotIndexRocksdb;
40use super::types::SlotValueRocksdb;
41use super::types::UnixTimeRocksdb;
42use crate::eth::primitives::Account;
43use crate::eth::primitives::Address;
44use crate::eth::primitives::Block;
45use crate::eth::primitives::BlockFilter;
46use crate::eth::primitives::BlockNumber;
47#[cfg(feature = "dev")]
48use crate::eth::primitives::Bytes;
49use crate::eth::primitives::ExecutionChanges;
50use crate::eth::primitives::Hash;
51use crate::eth::primitives::LogFilter;
52use crate::eth::primitives::LogMessage;
53#[cfg(feature = "dev")]
54use crate::eth::primitives::Nonce;
55use crate::eth::primitives::PointInTime;
56use crate::eth::primitives::Slot;
57use crate::eth::primitives::SlotIndex;
58use crate::eth::primitives::TransactionMined;
59#[cfg(feature = "dev")]
60use crate::eth::primitives::Wei;
61use crate::eth::storage::permanent::rocks::SerializeDeserializeWithContext;
62use crate::eth::storage::permanent::rocks::cf_versions::CfBlockChangesValue;
63use crate::eth::storage::permanent::rocks::types::AccountChangesRocksdb;
64use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
65use crate::ext::OptionExt;
66#[cfg(feature = "metrics")]
67use crate::infra::metrics;
68use crate::log_and_err;
69
70cfg_if::cfg_if! {
71 if #[cfg(feature = "rocks_metrics")] {
72 use parking_lot::Mutex;
73
74 use rocksdb::statistics::Histogram;
75 use rocksdb::statistics::Ticker;
76 use std::collections::HashMap;
77
78
79 use crate::infra::metrics::{Count, HistogramInt, Sum};
80 }
81}
82
83pub fn generate_cf_options_map(cf_cache_config: &RocksCfCacheConfig) -> BTreeMap<&'static str, Options> {
84 let cache_setting_from_size = |size: usize| -> CacheSetting { CacheSetting::Enabled(size) };
86
87 btmap! {
88 "accounts" => DbConfig::OptimizedPointLookUp.to_options(cache_setting_from_size(cf_cache_config.accounts)),
89 "accounts_history" => DbConfig::HistoricalData.to_options(cache_setting_from_size(cf_cache_config.accounts_history)),
90 "account_slots" => DbConfig::OptimizedPointLookUp.to_options(cache_setting_from_size(cf_cache_config.account_slots)),
91 "account_slots_history" => DbConfig::HistoricalData.to_options(cache_setting_from_size(cf_cache_config.account_slots_history)),
92 "transactions" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.transactions)),
93 "blocks_by_number" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_number)),
94 "blocks_by_hash" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_hash)),
95 "blocks_by_timestamp" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.blocks_by_timestamp)),
96 "block_changes" => DbConfig::Default.to_options(cache_setting_from_size(cf_cache_config.block_changes)),
97 }
98}
99
100fn new_cf_ref<'a, K, V>(db: &'a Arc<DB>, column_family: &str, cf_options_map: &BTreeMap<&str, Options>) -> Result<RocksCfRef<'a, K, V>>
102where
103 K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
104 V: Serialize + for<'de> Deserialize<'de> + Debug + Clone + SerializeDeserializeWithContext + bincode::Encode + bincode::Decode<()>,
105{
106 tracing::debug!(column_family = column_family, "creating new column family");
107
108 cf_options_map
109 .get(column_family)
110 .with_context(|| format!("matching column_family `{column_family}` given to `new_cf_ref` wasn't found in configuration map"))?;
111
112 RocksCfRef::new(db, column_family)
114}
115
116pub struct RocksStorageState {
120 pub db: Arc<DB>,
121 db_path: String,
122 accounts: RocksCfRef<'static, AddressRocksdb, CfAccountsValue>,
123 accounts_history: RocksCfRef<'static, (AddressRocksdb, BlockNumberRocksdb), CfAccountsHistoryValue>,
124 account_slots: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb), CfAccountSlotsValue>,
125 account_slots_history: RocksCfRef<'static, (AddressRocksdb, SlotIndexRocksdb, BlockNumberRocksdb), CfAccountSlotsHistoryValue>,
126 pub transactions: RocksCfRef<'static, HashRocksdb, CfTransactionsValue>,
127 pub blocks_by_number: RocksCfRef<'static, BlockNumberRocksdb, CfBlocksByNumberValue>,
128 blocks_by_hash: RocksCfRef<'static, HashRocksdb, CfBlocksByHashValue>,
129 blocks_by_timestamp: RocksCfRef<'static, UnixTimeRocksdb, CfBlocksByHashValue>,
130 block_changes: RocksCfRef<'static, BlockNumberRocksdb, CfBlockChangesValue>,
131 #[cfg(feature = "rocks_metrics")]
133 prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
134 #[cfg(feature = "rocks_metrics")]
139 db_options: Options,
140 shutdown_timeout: Duration,
141 enable_sync_write: bool,
142}
143
144impl RocksStorageState {
145 pub fn new(path: String, shutdown_timeout: Duration, cf_cache_config: RocksCfCacheConfig, enable_sync_write: bool) -> Result<Self> {
146 tracing::debug!("creating (or opening an existing) database with the specified column families");
147
148 let cf_options_map = generate_cf_options_map(&cf_cache_config);
149
150 #[cfg_attr(not(feature = "rocks_metrics"), allow(unused_variables))]
151 let (db, db_options) = create_or_open_db(&path, &cf_options_map).context("when trying to create (or open) rocksdb")?;
152
153 if db.path().to_str().is_none() {
154 bail!("db path doesn't isn't valid UTF-8: {:?}", db.path());
155 }
156 if db.path().file_name().is_none() {
157 bail!("db path doesn't have a file name or isn't valid UTF-8: {:?}", db.path());
158 }
159
160 let state = Self {
161 db_path: path,
162 accounts: new_cf_ref(db, "accounts", &cf_options_map)?,
163 accounts_history: new_cf_ref(db, "accounts_history", &cf_options_map)?,
164 account_slots: new_cf_ref(db, "account_slots", &cf_options_map)?,
165 account_slots_history: new_cf_ref(db, "account_slots_history", &cf_options_map)?,
166 transactions: new_cf_ref(db, "transactions", &cf_options_map)?,
167 blocks_by_number: new_cf_ref(db, "blocks_by_number", &cf_options_map)?,
168 blocks_by_hash: new_cf_ref(db, "blocks_by_hash", &cf_options_map)?,
169 blocks_by_timestamp: new_cf_ref(db, "blocks_by_timestamp", &cf_options_map)?,
170 block_changes: new_cf_ref(db, "block_changes", &cf_options_map)?,
171 #[cfg(feature = "rocks_metrics")]
172 prev_stats: Mutex::default(),
173 #[cfg(feature = "rocks_metrics")]
174 db_options,
175 db: Arc::clone(db),
176 shutdown_timeout,
177 enable_sync_write,
178 };
179
180 tracing::debug!("opened database successfully");
181 Ok(state)
182 }
183
184 #[cfg(test)]
185 #[track_caller]
186 pub fn new_in_testdir() -> anyhow::Result<(Self, tempfile::TempDir)> {
187 let test_dir = tempfile::tempdir()?;
188 let path = test_dir.as_ref().display().to_string();
189 let state = Self::new(path, Duration::ZERO, RocksCfCacheConfig::default(), true)?;
190 Ok((state, test_dir))
191 }
192
193 #[cfg(feature = "metrics")]
195 fn db_path_filename(&self) -> &str {
196 self.db_path.rsplit('/').next().unwrap_or(&self.db_path)
197 }
198
199 pub fn preload_block_number(&self) -> Result<AtomicU32> {
200 let block_number = self.blocks_by_number.last_key()?.unwrap_or_default().0;
201 tracing::info!(%block_number, "preloaded block_number");
202 Ok(AtomicU32::new(block_number))
203 }
204
205 #[cfg(feature = "dev")]
206 pub fn reset(&self) -> Result<()> {
207 self.accounts.clear()?;
208 self.accounts_history.clear()?;
209 self.account_slots.clear()?;
210 self.account_slots_history.clear()?;
211 self.transactions.clear()?;
212 self.blocks_by_number.clear()?;
213 self.blocks_by_hash.clear()?;
214 self.blocks_by_timestamp.clear()?;
215 Ok(())
216 }
217
218 fn prepare_batch_with_execution_changes(&self, changes: ExecutionChanges, block_number: BlockNumber, batch: &mut WriteBatch) -> Result<()> {
220 let mut block_changes = BlockChangesRocksdb::with_capacity(changes.accounts.len());
221 let block_number = block_number.into();
222
223 for (address, change) in changes.accounts {
224 let address: AddressRocksdb = address.into();
225 if change.is_modified() {
226 let mut account_change_entry = AccountChangesRocksdb::default();
227 let mut account_info_entry = self.accounts.get(&address)?.unwrap_or(AccountRocksdb::default().into());
228
229 if change.nonce.is_changed() {
230 let nonce = (*change.nonce.value()).into();
231 account_info_entry.nonce = nonce;
232 account_change_entry.nonce = Some(nonce);
233 }
234 if change.balance.is_changed() {
235 let balance = (*change.balance.value()).into();
236 account_info_entry.balance = balance;
237 account_change_entry.balance = Some(balance);
238 }
239 if change.bytecode.is_changed() {
240 let bytecode = change.bytecode.value().clone().map_into();
241 account_info_entry.bytecode = bytecode.clone();
242 account_change_entry.bytecode = Some(bytecode);
243 }
244
245 self.accounts.prepare_batch_insertion([(address, account_info_entry.clone())], batch)?;
246 self.accounts_history
247 .prepare_batch_insertion([((address, block_number), account_info_entry.into_inner().into())], batch)?;
248 block_changes.account_changes.insert(address, account_change_entry);
249 }
250 }
251
252 for ((address, slot_index), slot_value) in changes.slots {
253 let address = address.into();
254 let slot_index = slot_index.into();
255 let slot_value: SlotValueRocksdb = slot_value.into();
256
257 self.account_slots
258 .prepare_batch_insertion([((address, slot_index), slot_value.into())], batch)?;
259 self.account_slots_history
260 .prepare_batch_insertion([((address, slot_index, block_number), slot_value.into())], batch)?;
261
262 block_changes.slot_changes.insert((address, slot_index), slot_value);
263 }
264
265 self.block_changes.prepare_batch_insertion([(block_number, block_changes.into())], batch)?;
266
267 Ok(())
268 }
269
270 pub fn read_transaction(&self, tx_hash: Hash) -> Result<Option<TransactionMined>> {
271 let Some(block_number) = self.transactions.get(&tx_hash.into())? else {
272 return Ok(None);
273 };
274
275 let Some(block) = self.blocks_by_number.get(&block_number)? else {
276 return log_and_err!("the block that the transaction was supposed to be in was not found")
277 .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}"));
278 };
279 let block = block.into_inner();
280
281 let transaction = block.transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);
282
283 match transaction {
284 Some(tx) => {
285 tracing::trace!(%tx_hash, "transaction found");
286 Ok(Some(TransactionMined::from_rocks_primitives(tx, block_number.into_inner(), block.header.hash)))
287 }
288 None => log_and_err!("rocks error, transaction wasn't found in block where the index pointed at")
289 .with_context(|| format!("block_number = {block_number:?} tx_hash = {tx_hash}")),
290 }
291 }
292
293 pub fn read_logs(&self, filter: &LogFilter) -> Result<Vec<LogMessage>> {
294 let is_block_number_in_end_range = |number: BlockNumber| match filter.to_block.as_ref() {
295 Some(&last_block) => number <= last_block,
296 None => true,
297 };
298
299 let iter = self
300 .blocks_by_number
301 .iter_from(BlockNumberRocksdb::from(filter.from_block), Direction::Forward)?;
302
303 let mut logs_result = vec![];
304
305 for next in iter {
306 let (number, block) = next?;
307
308 if !is_block_number_in_end_range(number.into()) {
309 break;
310 }
311
312 let block = block.into_inner();
313 let logs = block.transactions.into_iter().enumerate().flat_map(|(tx_index, transaction)| {
314 transaction
315 .logs
316 .into_iter()
317 .map(move |log| LogMessage::from_rocks_primitives(log, block.header.number, block.header.hash, tx_index, transaction.input.hash))
318 });
319
320 let filtered_logs = logs.filter(|log| filter.matches(&log.log, block.header.number.into()));
321 logs_result.extend(filtered_logs);
322 }
323 Ok(logs_result)
324 }
325
326 pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> Result<Option<Slot>> {
327 if address.is_coinbase() {
328 return Ok(None);
329 }
330
331 match point_in_time {
332 PointInTime::Mined | PointInTime::Pending => {
333 let query_params = (address.into(), index.into());
334
335 let Some(account_slot_value) = self.account_slots.get(&query_params)? else {
336 return Ok(None);
337 };
338
339 Ok(Some(Slot {
340 index,
341 value: account_slot_value.into_inner().into(),
342 }))
343 }
344 PointInTime::MinedPast(block_number) => {
345 tracing::debug!(?address, ?index, ?block_number, "searching slot");
346
347 let key = (address.into(), (index).into(), block_number.into());
348
349 if let Some(((rocks_address, rocks_index, block), value)) = self.account_slots_history.seek(key)?
350 && rocks_index == index.into()
351 && rocks_address == address.into()
352 {
353 tracing::debug!(?block, ?rocks_index, ?rocks_address, "slot found in rocksdb storage");
354 return Ok(Some(Slot {
355 index: rocks_index.into(),
356 value: value.into_inner().into(),
357 }));
358 }
359 Ok(None)
360 }
361 }
362 }
363
364 pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> Result<Option<Account>> {
365 if address.is_coinbase() || address.is_zero() {
366 return Ok(None);
367 }
368
369 match point_in_time {
370 PointInTime::Mined | PointInTime::Pending => {
371 let Some(inner_account) = self.accounts.get(&address.into())? else {
372 tracing::trace!(%address, "account not found");
373 return Ok(None);
374 };
375
376 let account = inner_account.to_account(address);
377 tracing::trace!(%address, ?account, "account found");
378 Ok(Some(account))
379 }
380 PointInTime::MinedPast(block_number) => {
381 tracing::debug!(?address, ?block_number, "searching account");
382
383 let key = (address.into(), block_number.into());
384
385 if let Some(((addr, block), account_info)) = self.accounts_history.seek(key)?
386 && addr == address.into()
387 {
388 tracing::debug!(?block, ?address, "account found in rocksdb storage");
389 return Ok(Some(account_info.to_account(address)));
390 }
391 Ok(None)
392 }
393 }
394 }
395
396 pub fn read_accounts(&self, addresses: Vec<Address>) -> Result<Vec<(Address, Account)>> {
397 self.accounts
398 .multi_get(addresses.into_iter().map_into())
399 .map(|vec| vec.into_iter().map(|(addr, acc)| (addr.into(), acc.to_account(addr.into()))).collect_vec())
400 }
401
402 pub fn read_block(&self, selection: BlockFilter) -> Result<Option<Block>> {
403 tracing::debug!(?selection, "reading block");
404
405 let block = match selection {
406 BlockFilter::Latest | BlockFilter::Pending => self.blocks_by_number.last_value(),
407 BlockFilter::Earliest => self.blocks_by_number.first_value(),
408 BlockFilter::Number(block_number) => self.blocks_by_number.get(&block_number.into()),
409 BlockFilter::Hash(block_hash) =>
410 if let Some(block_number) = self.blocks_by_hash.get(&block_hash.into())? {
411 self.blocks_by_number.get(&block_number)
412 } else {
413 Ok(None)
414 },
415 BlockFilter::Timestamp(timestamp) => self
416 .blocks_by_timestamp
417 .iter_from(timestamp.timestamp.into(), timestamp.mode.into())?
418 .next()
419 .transpose()?
420 .map(|inner| self.blocks_by_number.get(&inner.1))
421 .transpose()
422 .map(|nested_opt| nested_opt.flatten()),
423 };
424 block.map(|block_option| block_option.map(|block| block.into_inner().into()))
425 }
426
427 pub fn save_accounts(&self, accounts: Vec<Account>) -> Result<()> {
428 let mut write_batch = WriteBatch::default();
429
430 tracing::debug!(?accounts, "preparing accounts cf batch");
431 self.accounts.prepare_batch_insertion(
432 accounts.iter().cloned().map(|acc| {
433 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
434 (tup.0, tup.1.into())
435 }),
436 &mut write_batch,
437 )?;
438
439 tracing::debug!(?accounts, "preparing accounts history batch");
440 self.accounts_history.prepare_batch_insertion(
441 accounts.iter().cloned().map(|acc| {
442 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
443 ((tup.0, 0u32.into()), tup.1.into())
444 }),
445 &mut write_batch,
446 )?;
447
448 self.write_in_batch_for_multiple_cfs(write_batch)
449 }
450
451 pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> Result<()> {
452 let mut batch = WriteBatch::default();
453
454 let mut txs_batch = vec![];
455 for transaction in block.transactions.iter().cloned() {
456 txs_batch.push((transaction.info.hash.into(), transaction.evm_input.block_number.into()));
457 }
458 self.transactions.prepare_batch_insertion(txs_batch, &mut batch)?;
459
460 let number = block.number();
461 let block_hash = block.hash();
462 let timestamp = block.header.timestamp;
463
464 let block_by_number = (number.into(), block.into());
465 self.blocks_by_number.prepare_batch_insertion([block_by_number], &mut batch)?;
466
467 let block_by_hash = (block_hash.into(), number.into());
468 self.blocks_by_hash.prepare_batch_insertion([block_by_hash], &mut batch)?;
469
470 let block_by_timestamp = (timestamp.into(), number.into());
471 self.blocks_by_timestamp.prepare_batch_insertion([block_by_timestamp], &mut batch)?;
472
473 self.prepare_batch_with_execution_changes(account_changes, number, &mut batch)?;
474
475 self.accounts.prepare_batch_insertion(
476 accounts.iter().cloned().map(|acc| {
477 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
478 (tup.0, tup.1.into())
479 }),
480 &mut batch,
481 )?;
482
483 self.accounts_history.prepare_batch_insertion(
484 accounts.iter().cloned().map(|acc| {
485 let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
486 ((tup.0, 0u32.into()), tup.1.into())
487 }),
488 &mut batch,
489 )?;
490
491 self.write_in_batch_for_multiple_cfs(batch)
492 }
493
494 pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> Result<()> {
495 let mut batch = WriteBatch::default();
496 self.prepare_block_insertion(block, account_changes, &mut batch)?;
497 self.write_in_batch_for_multiple_cfs(batch)
498 }
499
500 pub fn prepare_block_insertion(&self, block: Block, account_changes: ExecutionChanges, batch: &mut WriteBatch) -> Result<()> {
501 let mut txs_batch = vec![];
502 for transaction in block.transactions.iter().cloned() {
503 txs_batch.push((transaction.info.hash.into(), transaction.evm_input.block_number.into()));
504 }
505
506 self.transactions.prepare_batch_insertion(txs_batch, batch)?;
507
508 let number = block.number();
509 let block_hash = block.hash();
510 let timestamp = block.header.timestamp;
511
512 let block_by_number = (number.into(), block.into());
513 self.blocks_by_number.prepare_batch_insertion([block_by_number], batch)?;
514
515 let block_by_hash = (block_hash.into(), number.into());
516 self.blocks_by_hash.prepare_batch_insertion([block_by_hash], batch)?;
517
518 let block_by_timestamp = (timestamp.into(), number.into());
519 self.blocks_by_timestamp.prepare_batch_insertion([block_by_timestamp], batch)?;
520
521 self.prepare_batch_with_execution_changes(account_changes, number, batch)?;
522
523 Ok(())
524 }
525
526 pub fn write_in_batch_for_multiple_cfs(&self, batch: WriteBatch) -> Result<()> {
528 tracing::debug!("writing batch");
529 let batch_len = batch.len();
530
531 let mut options = WriteOptions::default();
532 options.set_sync(self.enable_sync_write);
539
540 self.db
541 .write_opt(batch, &options)
542 .context("failed to write in batch to (possibly) multiple column families")
543 .inspect_err(|e| {
544 tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
545 })
546 }
547
548 #[cfg(feature = "dev")]
549 pub fn save_slot(&self, address: Address, slot: Slot) -> Result<()> {
550 let mut batch = WriteBatch::default();
551 self.account_slots
552 .prepare_batch_insertion([((address.into(), slot.index.into()), slot.value.into())], &mut batch)?;
553 self.account_slots.apply_batch_with_context(batch)
554 }
555
556 #[cfg(feature = "dev")]
557 pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> Result<()> {
558 let mut batch = WriteBatch::default();
559
560 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
562
563 account_info_entry.nonce = nonce.into();
565
566 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
568 self.accounts.apply_batch_with_context(batch)
569 }
570
571 #[cfg(feature = "dev")]
572 pub fn save_account_balance(&self, address: Address, balance: Wei) -> Result<()> {
573 let mut batch = WriteBatch::default();
574
575 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
577
578 account_info_entry.balance = balance.into();
580
581 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
583 self.accounts.apply_batch_with_context(batch)
584 }
585
586 #[cfg(feature = "dev")]
587 pub fn save_account_code(&self, address: Address, code: Bytes) -> Result<()> {
588 use crate::alias::RevmBytecode;
589
590 let mut batch = WriteBatch::default();
591
592 let mut account_info_entry = self.accounts.get(&address.into())?.unwrap_or(AccountRocksdb::default().into());
594
595 account_info_entry.bytecode = if code.0.is_empty() {
597 None
598 } else {
599 Some(RevmBytecode::new_raw(code.0.into()))
600 }
601 .map_into();
602
603 self.accounts.prepare_batch_insertion([(address.into(), account_info_entry)], &mut batch)?;
605 self.accounts.apply_batch_with_context(batch)
606 }
607
608 #[cfg(test)]
609 pub fn read_all_accounts(&self) -> Result<Vec<AccountRocksdb>> {
610 self.accounts.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
611 }
612
613 #[cfg(test)]
614 pub fn read_all_historical_accounts(&self) -> Result<Vec<AccountRocksdb>> {
615 self.accounts_history.iter_start().map(|result| Ok(result?.1.into_inner())).collect()
616 }
617
618 #[cfg(feature = "dev")]
619 pub fn clear(&self) -> Result<()> {
620 self.accounts.clear().context("when clearing accounts")?;
621 self.accounts_history.clear().context("when clearing accounts_history")?;
622 self.account_slots.clear().context("when clearing account_slots")?;
623 self.account_slots_history.clear().context("when clearing account_slots_history")?;
624 self.transactions.clear().context("when clearing transactions")?;
625 self.blocks_by_hash.clear().context("when clearing blocks_by_hash")?;
626 self.blocks_by_number.clear().context("when clearing blocks_by_number")?;
627 self.blocks_by_timestamp.clear().context("when clearing blocks_by_timestamp")?;
628 Ok(())
629 }
630
631 pub fn read_block_with_changes(&self, selection: BlockFilter) -> Result<Option<(Block, BlockChangesRocksdb)>> {
632 let Some(block_wo_changes) = self.read_block(selection)? else {
633 return Ok(None);
634 };
635 let changes = self.block_changes.get(&block_wo_changes.number().into())?;
636 Ok(Some((block_wo_changes, changes.map(|changes| changes.into_inner()).unwrap_or_default())))
637 }
638}
639
640#[cfg(feature = "rocks_metrics")]
641impl RocksStorageState {
642 pub fn export_metrics(&self) -> Result<()> {
643 let db_get = self.db_options.get_histogram_data(Histogram::DbGet);
644 let db_write = self.db_options.get_histogram_data(Histogram::DbWrite);
645
646 let wal_file_synced = self.db_options.get_ticker_count(Ticker::WalFileSynced);
647 let block_cache_miss = self.db_options.get_ticker_count(Ticker::BlockCacheMiss);
648 let block_cache_hit = self.db_options.get_ticker_count(Ticker::BlockCacheHit);
649 let bytes_written = self.db_options.get_ticker_count(Ticker::BytesWritten);
650 let bytes_read = self.db_options.get_ticker_count(Ticker::BytesRead);
651
652 let cur_size_active_mem_table = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ACTIVE_MEM_TABLE).unwrap_or_default();
653 let cur_size_all_mem_tables = self.db.property_int_value(rocksdb::properties::CUR_SIZE_ALL_MEM_TABLES).unwrap_or_default();
654 let size_all_mem_tables = self.db.property_int_value(rocksdb::properties::SIZE_ALL_MEM_TABLES).unwrap_or_default();
655 let block_cache_usage = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_USAGE).unwrap_or_default();
656 let block_cache_capacity = self.db.property_int_value(rocksdb::properties::BLOCK_CACHE_CAPACITY).unwrap_or_default();
657 let background_errors = self.db.property_int_value(rocksdb::properties::BACKGROUND_ERRORS).unwrap_or_default();
658
659 let db_name = self.db_path_filename();
660
661 metrics::set_rocks_db_get(db_get.count(), db_name);
662 metrics::set_rocks_db_write(db_write.count(), db_name);
663 metrics::set_rocks_block_cache_miss(block_cache_miss, db_name);
664 metrics::set_rocks_block_cache_hit(block_cache_hit, db_name);
665 metrics::set_rocks_bytes_written(bytes_written, db_name);
666 metrics::set_rocks_bytes_read(bytes_read, db_name);
667 metrics::set_rocks_wal_file_synced(wal_file_synced, db_name);
668
669 metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime)?, db_name);
670 metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime)?, db_name);
671 metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime)?, db_name);
672
673 if let Some(cur_size_active_mem_table) = cur_size_active_mem_table {
674 metrics::set_rocks_cur_size_active_mem_table(cur_size_active_mem_table, db_name);
675 }
676
677 if let Some(cur_size_all_mem_tables) = cur_size_all_mem_tables {
678 metrics::set_rocks_cur_size_all_mem_tables(cur_size_all_mem_tables, db_name);
679 }
680
681 if let Some(size_all_mem_tables) = size_all_mem_tables {
682 metrics::set_rocks_size_all_mem_tables(size_all_mem_tables, db_name);
683 }
684
685 if let Some(block_cache_usage) = block_cache_usage {
686 metrics::set_rocks_block_cache_usage(block_cache_usage, db_name);
687 }
688 if let Some(block_cache_capacity) = block_cache_capacity {
689 metrics::set_rocks_block_cache_capacity(block_cache_capacity, db_name);
690 }
691 if let Some(background_errors) = background_errors {
692 metrics::set_rocks_background_errors(background_errors, db_name);
693 }
694
695 self.account_slots.export_metrics();
696 self.account_slots_history.export_metrics();
697 self.accounts.export_metrics();
698 self.accounts_history.export_metrics();
699 self.blocks_by_hash.export_metrics();
700 self.blocks_by_number.export_metrics();
701 self.blocks_by_timestamp.export_metrics();
702 self.transactions.export_metrics();
703 Ok(())
704 }
705
706 fn get_histogram_average_in_interval(&self, hist: Histogram) -> Result<u64> {
707 let mut prev_values = self.prev_stats.lock();
711
712 let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
713 let data = self.db_options.get_histogram_data(hist);
714 let data_count = data.count();
715 let data_sum = data.sum();
716
717 let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else {
718 return Ok(0);
719 };
720
721 prev_values.insert(hist as u32, (data_sum, data_count));
722 Ok(avg)
723 }
724}
725
726#[cfg(feature = "metrics")]
727impl RocksStorageState {
728 pub fn export_column_family_size_metrics(&self) -> Result<()> {
729 let db_name = self.db_path_filename();
730
731 let cf_names = DB::list_cf(&Options::default(), &self.db_path)?;
732
733 for cf_name in cf_names {
734 if let Some(cf_handle) = self.db.cf_handle(&cf_name)
735 && let Ok(Some(size)) = self.db.property_int_value_cf(&cf_handle, "rocksdb.total-sst-files-size")
736 {
737 metrics::set_rocks_cf_size(size, db_name, &cf_name);
738 }
739 }
740
741 Ok(())
742 }
743}
744
745impl Drop for RocksStorageState {
746 fn drop(&mut self) {
747 let mut options = WaitForCompactOptions::default();
748 options.set_abort_on_pause(true);
750 options.set_flush(true);
752 options.set_timeout(self.shutdown_timeout.as_micros().try_into().unwrap_or_default());
754
755 tracing::info!("starting rocksdb shutdown");
756 let instant = Instant::now();
757
758 let result = self.db.wait_for_compact(&options);
761 let waited_for = instant.elapsed();
762
763 #[cfg(feature = "metrics")]
764 {
765 let db_name = self.db_path_filename();
766 metrics::set_rocks_last_shutdown_delay_millis(waited_for.as_millis() as u64, db_name);
767 }
768
769 if let Err(e) = result {
770 tracing::error!(reason = ?e, ?waited_for, "rocksdb shutdown compaction didn't finish in time, shutting it down anyways");
771 } else {
772 tracing::info!(?waited_for, "finished rocksdb shutdown");
773 }
774 }
775}
776
777impl fmt::Debug for RocksStorageState {
778 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
779 f.debug_struct("RocksStorageState").field("db_path", &self.db_path).finish()
780 }
781}
782
783#[cfg(test)]
784mod tests {
785
786 use fake::Fake;
787 use fake::Faker;
788
789 use super::*;
790 use crate::eth::executor::EvmExecutionResult;
791 use crate::eth::executor::EvmInput;
792 use crate::eth::primitives::BlockHeader;
793 use crate::eth::primitives::EvmExecution;
794 use crate::eth::primitives::TransactionExecution;
795
796 #[test]
797 #[cfg(feature = "dev")]
798 fn test_rocks_multi_get() {
799 use std::collections::HashMap;
800 use std::collections::HashSet;
801 use std::iter;
802
803 type Key = (AddressRocksdb, SlotIndexRocksdb);
804 type Value = CfAccountSlotsValue;
805
806 let (mut state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
807 let account_slots = &mut state.account_slots;
808
809 let slots: HashMap<Key, Value> = (0..1000).map(|_| Faker.fake()).collect();
811 let extra_slots: HashMap<Key, Value> = iter::repeat_with(|| Faker.fake())
812 .filter(|(key, _)| !slots.contains_key(key))
813 .take(1000)
814 .collect();
815
816 let mut batch = WriteBatch::default();
817 account_slots.prepare_batch_insertion(slots.clone(), &mut batch).unwrap();
818 account_slots.prepare_batch_insertion(extra_slots.clone(), &mut batch).unwrap();
819 account_slots.apply_batch_with_context(batch).unwrap();
820
821 let extra_keys: HashSet<Key> = (0..1000)
823 .map(|_| Faker.fake())
824 .filter(|key| !extra_slots.contains_key(key) && !slots.contains_key(key))
825 .collect();
826
827 let keys: Vec<Key> = slots.keys().copied().chain(extra_keys).collect();
829 let result = account_slots.multi_get(keys).expect("this should not fail");
830
831 assert_eq!(result.len(), slots.keys().len());
834 for (idx, value) in result {
835 assert_eq!(value, *slots.get(&idx).expect("slot should be found"));
836 }
837 }
838
839 #[test]
840 fn regression_test_read_logs_without_providing_filter_address() {
841 let (state, _test_dir) = RocksStorageState::new_in_testdir().unwrap();
842
843 assert_eq!(state.read_logs(&LogFilter::default()).unwrap(), vec![]);
844
845 for number in 0..100 {
847 let block = Block {
848 header: BlockHeader {
849 number: number.into(),
850 ..Faker.fake()
851 },
852 transactions: vec![TransactionMined {
853 execution: TransactionExecution {
854 evm_input: EvmInput {
855 block_number: number.into(),
856 ..Faker.fake()
857 },
858 result: EvmExecutionResult {
859 execution: EvmExecution {
860 logs: vec![Faker.fake(), Faker.fake()],
861 ..Faker.fake()
862 },
863 ..Faker.fake()
864 },
865 ..Faker.fake()
866 },
867 ..Faker.fake()
868 }],
869 };
870
871 state.save_block(block, ExecutionChanges::default()).unwrap();
872 }
873
874 let filter = LogFilter {
875 addresses: vec![],
877 ..Default::default()
878 };
879
880 assert_eq!(state.read_logs(&filter).unwrap().len(), 200);
881 }
882
883 #[test]
884 fn test_column_families_creation_order_is_deterministic() {
885 let mut previous_order: Option<Vec<String>> = None;
886
887 for i in 0..10 {
889 let test_dir = tempfile::tempdir().unwrap();
890 let path = test_dir.as_ref().display().to_string();
891
892 let cf_options_map = generate_cf_options_map(&RocksCfCacheConfig::default());
894 let (_db, _) = create_or_open_db(&path, &cf_options_map).unwrap();
895
896 let actual_cf_order = DB::list_cf(&Options::default(), &path).unwrap();
898
899 if let Some(prev_order) = &previous_order {
901 assert_eq!(
902 &actual_cf_order,
903 prev_order,
904 "RocksDB column family order changed between iterations {} and {}",
905 i - 1,
906 i
907 );
908 } else {
909 previous_order = Some(actual_cf_order);
910 }
911 }
912 }
913
914 #[test]
915 fn test_bincode_lexicographical_ordering() {
916 use super::super::types::BlockNumberRocksdb;
917 use crate::rocks_bincode_config;
918
919 let boundary_tests = vec![(250, 251), (255, 256), (511, 512), (1023, 1024), (2047, 2048)];
920
921 for (before, after) in boundary_tests {
922 let before_rocks = BlockNumberRocksdb::from(before as u32);
923 let after_rocks = BlockNumberRocksdb::from(after as u32);
924
925 let before_bytes = bincode::encode_to_vec(before_rocks, rocks_bincode_config()).unwrap();
926 let after_bytes = bincode::encode_to_vec(after_rocks, rocks_bincode_config()).unwrap();
927
928 let lexicographic_order_correct = before_bytes < after_bytes;
929
930 assert!(
931 lexicographic_order_correct,
932 "Block {before} should serialize to bytes < Block {after} for RocksDB ordering",
933 );
934 }
935 }
936}