stratus/eth/storage/permanent/rocks/
rocks_permanent.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicU32;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use anyhow::bail;
8
9use super::rocks_state::RocksStorageState;
10use crate::GlobalState;
11use crate::eth::primitives::Account;
12use crate::eth::primitives::Address;
13use crate::eth::primitives::Block;
14use crate::eth::primitives::BlockFilter;
15use crate::eth::primitives::BlockNumber;
16#[cfg(feature = "dev")]
17use crate::eth::primitives::Bytes;
18use crate::eth::primitives::ExecutionChanges;
19use crate::eth::primitives::Hash;
20use crate::eth::primitives::LogFilter;
21use crate::eth::primitives::LogMined;
22#[cfg(feature = "dev")]
23use crate::eth::primitives::Nonce;
24use crate::eth::primitives::PointInTime;
25use crate::eth::primitives::Slot;
26use crate::eth::primitives::SlotIndex;
27use crate::eth::primitives::StorageError;
28use crate::eth::primitives::TransactionMined;
29#[cfg(feature = "dev")]
30use crate::eth::primitives::Wei;
31use crate::ext::SleepReason;
32use crate::ext::spawn;
33use crate::ext::traced_sleep;
34
35#[derive(Debug)]
36pub struct RocksPermanentStorage {
37    pub state: Arc<RocksStorageState>,
38    block_number: AtomicU32,
39}
40
41fn get_current_file_descriptor_limit() -> Result<libc::rlimit, StorageError> {
42    let mut rlimit = libc::rlimit { rlim_cur: 0, rlim_max: 0 };
43    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlimit as *mut libc::rlimit) };
44    if result != 0 {
45        let err = std::io::Error::last_os_error();
46        let msg = format!("Failed to get current file descriptor limit (error: {err}).");
47        return Err(StorageError::Unexpected { msg });
48    }
49    Ok(rlimit)
50}
51
52impl RocksPermanentStorage {
53    pub fn new(
54        db_path_prefix: Option<String>,
55        shutdown_timeout: Duration,
56        cache_size_multiplier: Option<f32>,
57        enable_sync_write: bool,
58        cf_size_metrics_interval: Option<Duration>,
59        file_descriptors_limit: u64,
60    ) -> anyhow::Result<Self> {
61        tracing::info!("setting up rocksdb storage");
62
63        // Check file descriptor limit before proceeding with RocksDB initialization
64        Self::check_file_descriptor_limit(file_descriptors_limit).map_err(|err| anyhow::anyhow!("{}", err))?;
65
66        let path = if let Some(prefix) = db_path_prefix {
67            // run some checks on the given prefix
68            if prefix.is_empty() {
69                bail!("given prefix for RocksDB is empty, try not providing the flag");
70            }
71
72            if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 {
73                tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder");
74            }
75
76            let path = format!("{prefix}-rocksdb");
77            tracing::info!("starting rocksdb storage - at custom path: '{:?}'", path);
78            path
79        } else {
80            tracing::info!("starting rocksdb storage - at default path: 'data/rocksdb'");
81            "data/rocksdb".to_string()
82        };
83
84        let state = Arc::new(RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier, enable_sync_write)?);
85
86        let block_number = state.preload_block_number()?;
87
88        // spawn background task for collecting column family size metrics
89        #[cfg(feature = "metrics")]
90        if let Some(interval) = cf_size_metrics_interval {
91            tracing::info!("starting column family size metrics collector with interval {:?}", interval);
92            spawn(
93                "rocks::cf_size_metrics_collector",
94                Self::start_cf_size_metrics_collector(Arc::clone(&state), interval),
95            );
96        };
97
98        Ok(Self { state, block_number })
99    }
100
101    /// Checks the current file descriptor limit and validates it meets the minimum requirement.
102    ///
103    /// This prevents RocksDB from misbehaving or corrupting data due to insufficient file descriptors.
104    fn check_file_descriptor_limit(limit_required: u64) -> Result<(), StorageError> {
105        let current_limit = get_current_file_descriptor_limit()?;
106
107        tracing::info!(
108            current_limit = current_limit.rlim_cur,
109            min_required = limit_required,
110            "checking file descriptor limit for RocksDB"
111        );
112
113        if current_limit.rlim_cur < limit_required {
114            tracing::warn!(
115                current_limit = current_limit.rlim_cur,
116                min_required = limit_required,
117                "File descriptor limit is below minimum."
118            );
119
120            let msg = format!(
121                "File descriptor limit (current: {}, max: {}) is below the recommended minimum ({limit_required}) for RocksDB.",
122                current_limit.rlim_cur, current_limit.rlim_max
123            );
124
125            return Err(StorageError::Unexpected { msg });
126        } else {
127            tracing::info!("File descriptor limit check passed: {} >= {}", current_limit.rlim_cur, limit_required);
128        }
129
130        Ok(())
131    }
132
133    // -------------------------------------------------------------------------
134    // Block number operations
135    // -------------------------------------------------------------------------
136
137    pub fn read_mined_block_number(&self) -> BlockNumber {
138        self.block_number.load(Ordering::SeqCst).into()
139    }
140
141    pub fn set_mined_block_number(&self, number: BlockNumber) {
142        self.block_number.store(number.as_u32(), Ordering::SeqCst);
143    }
144
145    pub fn has_genesis(&self) -> Result<bool, StorageError> {
146        let genesis = self.read_block(BlockFilter::Number(BlockNumber::ZERO))?;
147        Ok(genesis.is_some())
148    }
149
150    // -------------------------------------------------------------------------
151    // State operations
152    // -------------------------------------------------------------------------
153
154    pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> anyhow::Result<Option<Account>, StorageError> {
155        self.state
156            .read_account(address, point_in_time)
157            .map_err(|err| StorageError::RocksError { err })
158            .inspect_err(|e| {
159                tracing::error!(reason = ?e, "failed to read account in RocksPermanent");
160            })
161    }
162
163    pub fn read_accounts(&self, addresses: Vec<Address>) -> anyhow::Result<Vec<(Address, Account)>, StorageError> {
164        self.state.read_accounts(addresses).map_err(|err| StorageError::RocksError { err })
165    }
166
167    pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> anyhow::Result<Option<Slot>, StorageError> {
168        self.state
169            .read_slot(address, index, point_in_time)
170            .map_err(|err| StorageError::RocksError { err })
171            .inspect_err(|e| {
172                tracing::error!(reason = ?e, "failed to read slot in RocksPermanent");
173            })
174    }
175
176    pub fn read_block(&self, selection: BlockFilter) -> anyhow::Result<Option<Block>, StorageError> {
177        let block = self.state.read_block(selection).inspect_err(|e| {
178            tracing::error!(reason = ?e, "failed to read block in RocksPermanent");
179        });
180        if let Ok(Some(block)) = &block {
181            tracing::trace!(?selection, ?block, "block found");
182        }
183        block.map_err(|err| StorageError::RocksError { err })
184    }
185
186    pub fn read_block_with_changes(&self, selection: BlockFilter) -> anyhow::Result<Option<(Block, ExecutionChanges)>, StorageError> {
187        let result = self.state.read_block_with_changes(selection).inspect_err(|e| {
188            tracing::error!(reason = ?e, "failed to read block with changes in RocksPermanent");
189        });
190        if let Ok(Some(block)) = &result {
191            tracing::trace!(?selection, ?block, "block found");
192        }
193        result.map_err(|err| StorageError::RocksError { err })
194    }
195
196    pub fn read_transaction(&self, hash: Hash) -> anyhow::Result<Option<TransactionMined>, StorageError> {
197        self.state
198            .read_transaction(hash)
199            .map_err(|err| StorageError::RocksError { err })
200            .inspect_err(|e| {
201                tracing::error!(reason = ?e, "failed to read transaction in RocksPermanent");
202            })
203    }
204
205    pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>, StorageError> {
206        self.state.read_logs(filter).map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
207            tracing::error!(reason = ?e, "failed to read log in RocksPermanent");
208        })
209    }
210
211    pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
212        #[cfg(feature = "rocks_metrics")]
213        {
214            self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
215                tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
216            })?;
217        }
218
219        self.state
220            .save_genesis_block(block, accounts, account_changes)
221            .map_err(|err| StorageError::RocksError { err })
222            .inspect_err(|e| {
223                tracing::error!(reason = ?e, "failed to save genesis block in RocksPermanent");
224            })
225    }
226
227    pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
228        #[cfg(feature = "rocks_metrics")]
229        {
230            self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
231                tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
232            })?;
233        }
234        self.state
235            .save_block(block, account_changes)
236            .map_err(|err| StorageError::RocksError { err })
237            .inspect_err(|e| {
238                tracing::error!(reason = ?e, "failed to save block in RocksPermanent");
239            })
240    }
241
242    pub fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<(), StorageError> {
243        self.state
244            .save_accounts(accounts)
245            .map_err(|err| StorageError::RocksError { err })
246            .inspect_err(|e| {
247                tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
248            })
249    }
250
251    #[cfg(feature = "dev")]
252    pub fn save_slot(&self, address: Address, slot: Slot) -> anyhow::Result<(), StorageError> {
253        self.state
254            .save_slot(address, slot)
255            .map_err(|err| StorageError::RocksError { err })
256            .inspect_err(|e| {
257                tracing::error!(reason = ?e, "failed to save slot in RocksPermanent");
258            })
259    }
260
261    #[cfg(feature = "dev")]
262    pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> anyhow::Result<(), StorageError> {
263        self.state
264            .save_account_nonce(address, nonce)
265            .map_err(|err| StorageError::RocksError { err })
266            .inspect_err(|e| {
267                tracing::error!(reason = ?e, "failed to save account nonce in RocksPermanent");
268            })
269    }
270
271    #[cfg(feature = "dev")]
272    pub fn save_account_balance(&self, address: Address, balance: Wei) -> anyhow::Result<(), StorageError> {
273        self.state
274            .save_account_balance(address, balance)
275            .map_err(|err| StorageError::RocksError { err })
276            .inspect_err(|e| {
277                tracing::error!(reason = ?e, "failed to save account balance in RocksPermanent");
278            })
279    }
280
281    #[cfg(feature = "metrics")]
282    /// Starts a background task that collects column family size metrics at regular intervals.
283    async fn start_cf_size_metrics_collector(state: Arc<RocksStorageState>, interval: Duration) -> anyhow::Result<()> {
284        const TASK_NAME: &str = "rocks::cf_size_metrics";
285
286        loop {
287            if GlobalState::is_shutdown_warn(TASK_NAME) {
288                return Ok(());
289            }
290
291            if let Err(e) = state.export_column_family_size_metrics() {
292                tracing::warn!("failed to export column family metrics: {:?}", e);
293            }
294
295            traced_sleep(interval, SleepReason::Interval).await;
296        }
297    }
298
299    #[cfg(feature = "dev")]
300    pub fn save_account_code(&self, address: Address, code: Bytes) -> anyhow::Result<(), StorageError> {
301        self.state
302            .save_account_code(address, code)
303            .map_err(|err| StorageError::RocksError { err })
304            .inspect_err(|e| {
305                tracing::error!(reason = ?e, "failed to save account code in RocksPermanent");
306            })
307    }
308
309    #[cfg(feature = "dev")]
310    pub fn reset(&self) -> anyhow::Result<(), StorageError> {
311        self.block_number.store(0u32, Ordering::SeqCst);
312        self.state.reset().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
313            tracing::error!(reason = ?e, "failed to reset in RocksPermanent");
314        })
315    }
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    #[test]
323    fn test_ulimit_check_with_no_change() {
324        let result = RocksPermanentStorage::check_file_descriptor_limit(1024);
325        assert!(result.is_ok(), "ulimit check should succeed with reasonable low limit");
326    }
327
328    #[test]
329    fn test_ulimit_check_with_max_requirement() {
330        let result = RocksPermanentStorage::check_file_descriptor_limit(u64::MAX);
331        assert!(
332            result.is_err(),
333            "ulimit check should fail because the required limit is above the system maximum"
334        );
335    }
336}