stratus/eth/storage/permanent/rocks/
rocks_permanent.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicU32;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use anyhow::bail;
8
9use super::rocks_cf_cache_config::RocksCfCacheConfig;
10use super::rocks_state::RocksStorageState;
11use crate::GlobalState;
12use crate::eth::primitives::Account;
13use crate::eth::primitives::Address;
14use crate::eth::primitives::Block;
15use crate::eth::primitives::BlockFilter;
16use crate::eth::primitives::BlockNumber;
17#[cfg(feature = "dev")]
18use crate::eth::primitives::Bytes;
19use crate::eth::primitives::ExecutionChanges;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogFilter;
22use crate::eth::primitives::LogMessage;
23#[cfg(feature = "dev")]
24use crate::eth::primitives::Nonce;
25use crate::eth::primitives::PointInTime;
26use crate::eth::primitives::Slot;
27use crate::eth::primitives::SlotIndex;
28use crate::eth::primitives::StorageError;
29use crate::eth::primitives::TransactionMined;
30#[cfg(feature = "dev")]
31use crate::eth::primitives::Wei;
32use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
33use crate::ext::SleepReason;
34use crate::ext::spawn;
35use crate::ext::traced_sleep;
36
37#[derive(Debug)]
38pub struct RocksPermanentStorage {
39    pub state: Arc<RocksStorageState>,
40    block_number: AtomicU32,
41}
42
43fn get_current_file_descriptor_limit() -> Result<libc::rlimit, StorageError> {
44    let mut rlimit = libc::rlimit { rlim_cur: 0, rlim_max: 0 };
45    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlimit as *mut libc::rlimit) };
46    if result != 0 {
47        let err = std::io::Error::last_os_error();
48        let msg = format!("Failed to get current file descriptor limit (error: {err}).");
49        return Err(StorageError::Unexpected { msg });
50    }
51    Ok(rlimit)
52}
53
54impl RocksPermanentStorage {
55    pub fn new(
56        db_path_prefix: Option<String>,
57        shutdown_timeout: Duration,
58        cf_cache_config: RocksCfCacheConfig,
59        enable_sync_write: bool,
60        cf_size_metrics_interval: Option<Duration>,
61        file_descriptors_limit: u64,
62    ) -> anyhow::Result<Self> {
63        tracing::info!("setting up rocksdb storage");
64
65        // Check file descriptor limit before proceeding with RocksDB initialization
66        Self::check_file_descriptor_limit(file_descriptors_limit).map_err(|err| anyhow::anyhow!("{err}"))?;
67
68        let path = if let Some(prefix) = db_path_prefix {
69            // run some checks on the given prefix
70            if prefix.is_empty() {
71                bail!("given prefix for RocksDB is empty, try not providing the flag");
72            }
73
74            if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 {
75                tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder");
76            }
77
78            let path = format!("{prefix}-rocksdb");
79            tracing::info!("starting rocksdb storage - at custom path: '{:?}'", path);
80            path
81        } else {
82            tracing::info!("starting rocksdb storage - at default path: 'data/rocksdb'");
83            "data/rocksdb".to_string()
84        };
85
86        let state = Arc::new(RocksStorageState::new(path, shutdown_timeout, cf_cache_config, enable_sync_write)?);
87
88        let block_number = state.preload_block_number()?;
89
90        // spawn background task for collecting column family size metrics
91        #[cfg(feature = "metrics")]
92        if let Some(interval) = cf_size_metrics_interval {
93            tracing::info!("starting column family size metrics collector with interval {:?}", interval);
94            spawn(
95                "rocks::cf_size_metrics_collector",
96                Self::start_cf_size_metrics_collector(Arc::clone(&state), interval),
97            );
98        };
99
100        Ok(Self { state, block_number })
101    }
102
103    /// Checks the current file descriptor limit and validates it meets the minimum requirement.
104    ///
105    /// This prevents RocksDB from misbehaving or corrupting data due to insufficient file descriptors.
106    fn check_file_descriptor_limit(limit_required: u64) -> Result<(), StorageError> {
107        let current_limit = get_current_file_descriptor_limit()?;
108
109        tracing::info!(
110            current_limit = current_limit.rlim_cur,
111            min_required = limit_required,
112            "checking file descriptor limit for RocksDB"
113        );
114
115        if current_limit.rlim_cur < limit_required {
116            tracing::warn!(
117                current_limit = current_limit.rlim_cur,
118                min_required = limit_required,
119                "File descriptor limit is below minimum."
120            );
121
122            let msg = format!(
123                "File descriptor limit (current: {}, max: {}) is below the recommended minimum ({limit_required}) for RocksDB.",
124                current_limit.rlim_cur, current_limit.rlim_max
125            );
126
127            return Err(StorageError::Unexpected { msg });
128        } else {
129            tracing::info!("File descriptor limit check passed: {} >= {}", current_limit.rlim_cur, limit_required);
130        }
131
132        Ok(())
133    }
134
135    // -------------------------------------------------------------------------
136    // Block number operations
137    // -------------------------------------------------------------------------
138
139    pub fn read_mined_block_number(&self) -> BlockNumber {
140        self.block_number.load(Ordering::SeqCst).into()
141    }
142
143    pub fn set_mined_block_number(&self, number: BlockNumber) {
144        self.block_number.store(number.as_u32(), Ordering::SeqCst);
145    }
146
147    pub fn has_genesis(&self) -> Result<bool, StorageError> {
148        let genesis = self.read_block(BlockFilter::Number(BlockNumber::ZERO))?;
149        Ok(genesis.is_some())
150    }
151
152    // -------------------------------------------------------------------------
153    // State operations
154    // -------------------------------------------------------------------------
155
156    pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> anyhow::Result<Option<Account>, StorageError> {
157        self.state
158            .read_account(address, point_in_time)
159            .map_err(|err| StorageError::RocksError { err })
160            .inspect_err(|e| {
161                tracing::error!(reason = ?e, "failed to read account in RocksPermanent");
162            })
163    }
164
165    pub fn read_accounts(&self, addresses: Vec<Address>) -> anyhow::Result<Vec<(Address, Account)>, StorageError> {
166        self.state.read_accounts(addresses).map_err(|err| StorageError::RocksError { err })
167    }
168
169    pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> anyhow::Result<Option<Slot>, StorageError> {
170        self.state
171            .read_slot(address, index, point_in_time)
172            .map_err(|err| StorageError::RocksError { err })
173            .inspect_err(|e| {
174                tracing::error!(reason = ?e, "failed to read slot in RocksPermanent");
175            })
176    }
177
178    pub fn read_block(&self, selection: BlockFilter) -> anyhow::Result<Option<Block>, StorageError> {
179        let block = self.state.read_block(selection).inspect_err(|e| {
180            tracing::error!(reason = ?e, "failed to read block in RocksPermanent");
181        });
182        if let Ok(Some(block)) = &block {
183            tracing::trace!(?selection, ?block, "block found");
184        }
185        block.map_err(|err| StorageError::RocksError { err })
186    }
187
188    pub fn read_block_with_changes(&self, selection: BlockFilter) -> anyhow::Result<Option<(Block, BlockChangesRocksdb)>, StorageError> {
189        let result = self.state.read_block_with_changes(selection).inspect_err(|e| {
190            tracing::error!(reason = ?e, "failed to read block with changes in RocksPermanent");
191        });
192        if let Ok(Some(block)) = &result {
193            tracing::trace!(?selection, ?block, "block found");
194        }
195        result.map_err(|err| StorageError::RocksError { err })
196    }
197
198    pub fn read_transaction(&self, hash: Hash) -> anyhow::Result<Option<TransactionMined>, StorageError> {
199        self.state
200            .read_transaction(hash)
201            .map_err(|err| StorageError::RocksError { err })
202            .inspect_err(|e| {
203                tracing::error!(reason = ?e, "failed to read transaction in RocksPermanent");
204            })
205    }
206
207    pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMessage>, StorageError> {
208        self.state.read_logs(filter).map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
209            tracing::error!(reason = ?e, "failed to read log in RocksPermanent");
210        })
211    }
212
213    pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
214        #[cfg(feature = "rocks_metrics")]
215        {
216            self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
217                tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
218            })?;
219        }
220
221        self.state
222            .save_genesis_block(block, accounts, account_changes)
223            .map_err(|err| StorageError::RocksError { err })
224            .inspect_err(|e| {
225                tracing::error!(reason = ?e, "failed to save genesis block in RocksPermanent");
226            })
227    }
228
229    pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
230        #[cfg(feature = "rocks_metrics")]
231        {
232            self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
233                tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
234            })?;
235        }
236        self.state
237            .save_block(block, account_changes)
238            .map_err(|err| StorageError::RocksError { err })
239            .inspect_err(|e| {
240                tracing::error!(reason = ?e, "failed to save block in RocksPermanent");
241            })
242    }
243
244    pub fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<(), StorageError> {
245        self.state
246            .save_accounts(accounts)
247            .map_err(|err| StorageError::RocksError { err })
248            .inspect_err(|e| {
249                tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
250            })
251    }
252
253    #[cfg(feature = "dev")]
254    pub fn save_slot(&self, address: Address, slot: Slot) -> anyhow::Result<(), StorageError> {
255        self.state
256            .save_slot(address, slot)
257            .map_err(|err| StorageError::RocksError { err })
258            .inspect_err(|e| {
259                tracing::error!(reason = ?e, "failed to save slot in RocksPermanent");
260            })
261    }
262
263    #[cfg(feature = "dev")]
264    pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> anyhow::Result<(), StorageError> {
265        self.state
266            .save_account_nonce(address, nonce)
267            .map_err(|err| StorageError::RocksError { err })
268            .inspect_err(|e| {
269                tracing::error!(reason = ?e, "failed to save account nonce in RocksPermanent");
270            })
271    }
272
273    #[cfg(feature = "dev")]
274    pub fn save_account_balance(&self, address: Address, balance: Wei) -> anyhow::Result<(), StorageError> {
275        self.state
276            .save_account_balance(address, balance)
277            .map_err(|err| StorageError::RocksError { err })
278            .inspect_err(|e| {
279                tracing::error!(reason = ?e, "failed to save account balance in RocksPermanent");
280            })
281    }
282
283    #[cfg(feature = "metrics")]
284    /// Starts a background task that collects column family size metrics at regular intervals.
285    async fn start_cf_size_metrics_collector(state: Arc<RocksStorageState>, interval: Duration) -> anyhow::Result<()> {
286        const TASK_NAME: &str = "rocks::cf_size_metrics";
287
288        loop {
289            if GlobalState::is_shutdown_warn(TASK_NAME) {
290                return Ok(());
291            }
292
293            if let Err(e) = state.export_column_family_size_metrics() {
294                tracing::warn!("failed to export column family metrics: {:?}", e);
295            }
296
297            traced_sleep(interval, SleepReason::Interval).await;
298        }
299    }
300
301    #[cfg(feature = "dev")]
302    pub fn save_account_code(&self, address: Address, code: Bytes) -> anyhow::Result<(), StorageError> {
303        self.state
304            .save_account_code(address, code)
305            .map_err(|err| StorageError::RocksError { err })
306            .inspect_err(|e| {
307                tracing::error!(reason = ?e, "failed to save account code in RocksPermanent");
308            })
309    }
310
311    #[cfg(feature = "dev")]
312    pub fn reset(&self) -> anyhow::Result<(), StorageError> {
313        self.block_number.store(0u32, Ordering::SeqCst);
314        self.state.reset().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
315            tracing::error!(reason = ?e, "failed to reset in RocksPermanent");
316        })
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_ulimit_check_with_no_change() {
326        let result = RocksPermanentStorage::check_file_descriptor_limit(1024);
327        assert!(result.is_ok(), "ulimit check should succeed with reasonable low limit");
328    }
329
330    #[test]
331    fn test_ulimit_check_with_max_requirement() {
332        let result = RocksPermanentStorage::check_file_descriptor_limit(u64::MAX);
333        assert!(
334            result.is_err(),
335            "ulimit check should fail because the required limit is above the system maximum"
336        );
337    }
338}