stratus/eth/storage/permanent/rocks/
rocks_permanent.rs1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicU32;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use anyhow::bail;
8
9use super::rocks_cf_cache_config::RocksCfCacheConfig;
10use super::rocks_state::RocksStorageState;
11use crate::GlobalState;
12use crate::eth::primitives::Account;
13use crate::eth::primitives::Address;
14use crate::eth::primitives::Block;
15use crate::eth::primitives::BlockFilter;
16use crate::eth::primitives::BlockNumber;
17#[cfg(feature = "dev")]
18use crate::eth::primitives::Bytes;
19use crate::eth::primitives::ExecutionChanges;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogFilter;
22use crate::eth::primitives::LogMessage;
23#[cfg(feature = "dev")]
24use crate::eth::primitives::Nonce;
25use crate::eth::primitives::PointInTime;
26use crate::eth::primitives::Slot;
27use crate::eth::primitives::SlotIndex;
28use crate::eth::primitives::StorageError;
29use crate::eth::primitives::TransactionMined;
30#[cfg(feature = "dev")]
31use crate::eth::primitives::Wei;
32use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
33use crate::ext::SleepReason;
34use crate::ext::spawn;
35use crate::ext::traced_sleep;
36
37#[derive(Debug)]
38pub struct RocksPermanentStorage {
39 pub state: Arc<RocksStorageState>,
40 block_number: AtomicU32,
41}
42
43fn get_current_file_descriptor_limit() -> Result<libc::rlimit, StorageError> {
44 let mut rlimit = libc::rlimit { rlim_cur: 0, rlim_max: 0 };
45 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlimit as *mut libc::rlimit) };
46 if result != 0 {
47 let err = std::io::Error::last_os_error();
48 let msg = format!("Failed to get current file descriptor limit (error: {err}).");
49 return Err(StorageError::Unexpected { msg });
50 }
51 Ok(rlimit)
52}
53
54impl RocksPermanentStorage {
55 pub fn new(
56 db_path_prefix: Option<String>,
57 shutdown_timeout: Duration,
58 cf_cache_config: RocksCfCacheConfig,
59 enable_sync_write: bool,
60 cf_size_metrics_interval: Option<Duration>,
61 file_descriptors_limit: u64,
62 ) -> anyhow::Result<Self> {
63 tracing::info!("setting up rocksdb storage");
64
65 Self::check_file_descriptor_limit(file_descriptors_limit).map_err(|err| anyhow::anyhow!("{err}"))?;
67
68 let path = if let Some(prefix) = db_path_prefix {
69 if prefix.is_empty() {
71 bail!("given prefix for RocksDB is empty, try not providing the flag");
72 }
73
74 if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 {
75 tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder");
76 }
77
78 let path = format!("{prefix}-rocksdb");
79 tracing::info!("starting rocksdb storage - at custom path: '{:?}'", path);
80 path
81 } else {
82 tracing::info!("starting rocksdb storage - at default path: 'data/rocksdb'");
83 "data/rocksdb".to_string()
84 };
85
86 let state = Arc::new(RocksStorageState::new(path, shutdown_timeout, cf_cache_config, enable_sync_write)?);
87
88 let block_number = state.preload_block_number()?;
89
90 #[cfg(feature = "metrics")]
92 if let Some(interval) = cf_size_metrics_interval {
93 tracing::info!("starting column family size metrics collector with interval {:?}", interval);
94 spawn(
95 "rocks::cf_size_metrics_collector",
96 Self::start_cf_size_metrics_collector(Arc::clone(&state), interval),
97 );
98 };
99
100 Ok(Self { state, block_number })
101 }
102
103 fn check_file_descriptor_limit(limit_required: u64) -> Result<(), StorageError> {
107 let current_limit = get_current_file_descriptor_limit()?;
108
109 tracing::info!(
110 current_limit = current_limit.rlim_cur,
111 min_required = limit_required,
112 "checking file descriptor limit for RocksDB"
113 );
114
115 if current_limit.rlim_cur < limit_required {
116 tracing::warn!(
117 current_limit = current_limit.rlim_cur,
118 min_required = limit_required,
119 "File descriptor limit is below minimum."
120 );
121
122 let msg = format!(
123 "File descriptor limit (current: {}, max: {}) is below the recommended minimum ({limit_required}) for RocksDB.",
124 current_limit.rlim_cur, current_limit.rlim_max
125 );
126
127 return Err(StorageError::Unexpected { msg });
128 } else {
129 tracing::info!("File descriptor limit check passed: {} >= {}", current_limit.rlim_cur, limit_required);
130 }
131
132 Ok(())
133 }
134
135 pub fn read_mined_block_number(&self) -> BlockNumber {
140 self.block_number.load(Ordering::SeqCst).into()
141 }
142
143 pub fn set_mined_block_number(&self, number: BlockNumber) {
144 self.block_number.store(number.as_u32(), Ordering::SeqCst);
145 }
146
147 pub fn has_genesis(&self) -> Result<bool, StorageError> {
148 let genesis = self.read_block(BlockFilter::Number(BlockNumber::ZERO))?;
149 Ok(genesis.is_some())
150 }
151
152 pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> anyhow::Result<Option<Account>, StorageError> {
157 self.state
158 .read_account(address, point_in_time)
159 .map_err(|err| StorageError::RocksError { err })
160 .inspect_err(|e| {
161 tracing::error!(reason = ?e, "failed to read account in RocksPermanent");
162 })
163 }
164
165 pub fn read_accounts(&self, addresses: Vec<Address>) -> anyhow::Result<Vec<(Address, Account)>, StorageError> {
166 self.state.read_accounts(addresses).map_err(|err| StorageError::RocksError { err })
167 }
168
169 pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> anyhow::Result<Option<Slot>, StorageError> {
170 self.state
171 .read_slot(address, index, point_in_time)
172 .map_err(|err| StorageError::RocksError { err })
173 .inspect_err(|e| {
174 tracing::error!(reason = ?e, "failed to read slot in RocksPermanent");
175 })
176 }
177
178 pub fn read_block(&self, selection: BlockFilter) -> anyhow::Result<Option<Block>, StorageError> {
179 let block = self.state.read_block(selection).inspect_err(|e| {
180 tracing::error!(reason = ?e, "failed to read block in RocksPermanent");
181 });
182 if let Ok(Some(block)) = &block {
183 tracing::trace!(?selection, ?block, "block found");
184 }
185 block.map_err(|err| StorageError::RocksError { err })
186 }
187
188 pub fn read_block_with_changes(&self, selection: BlockFilter) -> anyhow::Result<Option<(Block, BlockChangesRocksdb)>, StorageError> {
189 let result = self.state.read_block_with_changes(selection).inspect_err(|e| {
190 tracing::error!(reason = ?e, "failed to read block with changes in RocksPermanent");
191 });
192 if let Ok(Some(block)) = &result {
193 tracing::trace!(?selection, ?block, "block found");
194 }
195 result.map_err(|err| StorageError::RocksError { err })
196 }
197
198 pub fn read_transaction(&self, hash: Hash) -> anyhow::Result<Option<TransactionMined>, StorageError> {
199 self.state
200 .read_transaction(hash)
201 .map_err(|err| StorageError::RocksError { err })
202 .inspect_err(|e| {
203 tracing::error!(reason = ?e, "failed to read transaction in RocksPermanent");
204 })
205 }
206
207 pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMessage>, StorageError> {
208 self.state.read_logs(filter).map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
209 tracing::error!(reason = ?e, "failed to read log in RocksPermanent");
210 })
211 }
212
213 pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
214 #[cfg(feature = "rocks_metrics")]
215 {
216 self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
217 tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
218 })?;
219 }
220
221 self.state
222 .save_genesis_block(block, accounts, account_changes)
223 .map_err(|err| StorageError::RocksError { err })
224 .inspect_err(|e| {
225 tracing::error!(reason = ?e, "failed to save genesis block in RocksPermanent");
226 })
227 }
228
229 pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
230 #[cfg(feature = "rocks_metrics")]
231 {
232 self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
233 tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
234 })?;
235 }
236 self.state
237 .save_block(block, account_changes)
238 .map_err(|err| StorageError::RocksError { err })
239 .inspect_err(|e| {
240 tracing::error!(reason = ?e, "failed to save block in RocksPermanent");
241 })
242 }
243
244 pub fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<(), StorageError> {
245 self.state
246 .save_accounts(accounts)
247 .map_err(|err| StorageError::RocksError { err })
248 .inspect_err(|e| {
249 tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
250 })
251 }
252
253 #[cfg(feature = "dev")]
254 pub fn save_slot(&self, address: Address, slot: Slot) -> anyhow::Result<(), StorageError> {
255 self.state
256 .save_slot(address, slot)
257 .map_err(|err| StorageError::RocksError { err })
258 .inspect_err(|e| {
259 tracing::error!(reason = ?e, "failed to save slot in RocksPermanent");
260 })
261 }
262
263 #[cfg(feature = "dev")]
264 pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> anyhow::Result<(), StorageError> {
265 self.state
266 .save_account_nonce(address, nonce)
267 .map_err(|err| StorageError::RocksError { err })
268 .inspect_err(|e| {
269 tracing::error!(reason = ?e, "failed to save account nonce in RocksPermanent");
270 })
271 }
272
273 #[cfg(feature = "dev")]
274 pub fn save_account_balance(&self, address: Address, balance: Wei) -> anyhow::Result<(), StorageError> {
275 self.state
276 .save_account_balance(address, balance)
277 .map_err(|err| StorageError::RocksError { err })
278 .inspect_err(|e| {
279 tracing::error!(reason = ?e, "failed to save account balance in RocksPermanent");
280 })
281 }
282
283 #[cfg(feature = "metrics")]
284 async fn start_cf_size_metrics_collector(state: Arc<RocksStorageState>, interval: Duration) -> anyhow::Result<()> {
286 const TASK_NAME: &str = "rocks::cf_size_metrics";
287
288 loop {
289 if GlobalState::is_shutdown_warn(TASK_NAME) {
290 return Ok(());
291 }
292
293 if let Err(e) = state.export_column_family_size_metrics() {
294 tracing::warn!("failed to export column family metrics: {:?}", e);
295 }
296
297 traced_sleep(interval, SleepReason::Interval).await;
298 }
299 }
300
301 #[cfg(feature = "dev")]
302 pub fn save_account_code(&self, address: Address, code: Bytes) -> anyhow::Result<(), StorageError> {
303 self.state
304 .save_account_code(address, code)
305 .map_err(|err| StorageError::RocksError { err })
306 .inspect_err(|e| {
307 tracing::error!(reason = ?e, "failed to save account code in RocksPermanent");
308 })
309 }
310
311 #[cfg(feature = "dev")]
312 pub fn reset(&self) -> anyhow::Result<(), StorageError> {
313 self.block_number.store(0u32, Ordering::SeqCst);
314 self.state.reset().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
315 tracing::error!(reason = ?e, "failed to reset in RocksPermanent");
316 })
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_ulimit_check_with_no_change() {
326 let result = RocksPermanentStorage::check_file_descriptor_limit(1024);
327 assert!(result.is_ok(), "ulimit check should succeed with reasonable low limit");
328 }
329
330 #[test]
331 fn test_ulimit_check_with_max_requirement() {
332 let result = RocksPermanentStorage::check_file_descriptor_limit(u64::MAX);
333 assert!(
334 result.is_err(),
335 "ulimit check should fail because the required limit is above the system maximum"
336 );
337 }
338}