stratus/eth/storage/permanent/rocks/
rocks_permanent.rs1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicU32;
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use anyhow::bail;
8
9use super::rocks_state::RocksStorageState;
10use crate::GlobalState;
11use crate::eth::primitives::Account;
12use crate::eth::primitives::Address;
13use crate::eth::primitives::Block;
14use crate::eth::primitives::BlockFilter;
15use crate::eth::primitives::BlockNumber;
16#[cfg(feature = "dev")]
17use crate::eth::primitives::Bytes;
18use crate::eth::primitives::ExecutionChanges;
19use crate::eth::primitives::Hash;
20use crate::eth::primitives::LogFilter;
21use crate::eth::primitives::LogMined;
22#[cfg(feature = "dev")]
23use crate::eth::primitives::Nonce;
24use crate::eth::primitives::PointInTime;
25use crate::eth::primitives::Slot;
26use crate::eth::primitives::SlotIndex;
27use crate::eth::primitives::StorageError;
28use crate::eth::primitives::TransactionMined;
29#[cfg(feature = "dev")]
30use crate::eth::primitives::Wei;
31use crate::ext::SleepReason;
32use crate::ext::spawn;
33use crate::ext::traced_sleep;
34
35#[derive(Debug)]
36pub struct RocksPermanentStorage {
37 pub state: Arc<RocksStorageState>,
38 block_number: AtomicU32,
39}
40
41fn get_current_file_descriptor_limit() -> Result<libc::rlimit, StorageError> {
42 let mut rlimit = libc::rlimit { rlim_cur: 0, rlim_max: 0 };
43 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut rlimit as *mut libc::rlimit) };
44 if result != 0 {
45 let err = std::io::Error::last_os_error();
46 let msg = format!("Failed to get current file descriptor limit (error: {err}).");
47 return Err(StorageError::Unexpected { msg });
48 }
49 Ok(rlimit)
50}
51
52impl RocksPermanentStorage {
53 pub fn new(
54 db_path_prefix: Option<String>,
55 shutdown_timeout: Duration,
56 cache_size_multiplier: Option<f32>,
57 enable_sync_write: bool,
58 cf_size_metrics_interval: Option<Duration>,
59 file_descriptors_limit: u64,
60 ) -> anyhow::Result<Self> {
61 tracing::info!("setting up rocksdb storage");
62
63 Self::check_file_descriptor_limit(file_descriptors_limit).map_err(|err| anyhow::anyhow!("{}", err))?;
65
66 let path = if let Some(prefix) = db_path_prefix {
67 if prefix.is_empty() {
69 bail!("given prefix for RocksDB is empty, try not providing the flag");
70 }
71
72 if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 {
73 tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder");
74 }
75
76 let path = format!("{prefix}-rocksdb");
77 tracing::info!("starting rocksdb storage - at custom path: '{:?}'", path);
78 path
79 } else {
80 tracing::info!("starting rocksdb storage - at default path: 'data/rocksdb'");
81 "data/rocksdb".to_string()
82 };
83
84 let state = Arc::new(RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier, enable_sync_write)?);
85
86 let block_number = state.preload_block_number()?;
87
88 #[cfg(feature = "metrics")]
90 if let Some(interval) = cf_size_metrics_interval {
91 tracing::info!("starting column family size metrics collector with interval {:?}", interval);
92 spawn(
93 "rocks::cf_size_metrics_collector",
94 Self::start_cf_size_metrics_collector(Arc::clone(&state), interval),
95 );
96 };
97
98 Ok(Self { state, block_number })
99 }
100
101 fn check_file_descriptor_limit(limit_required: u64) -> Result<(), StorageError> {
105 let current_limit = get_current_file_descriptor_limit()?;
106
107 tracing::info!(
108 current_limit = current_limit.rlim_cur,
109 min_required = limit_required,
110 "checking file descriptor limit for RocksDB"
111 );
112
113 if current_limit.rlim_cur < limit_required {
114 tracing::warn!(
115 current_limit = current_limit.rlim_cur,
116 min_required = limit_required,
117 "File descriptor limit is below minimum."
118 );
119
120 let msg = format!(
121 "File descriptor limit (current: {}, max: {}) is below the recommended minimum ({limit_required}) for RocksDB.",
122 current_limit.rlim_cur, current_limit.rlim_max
123 );
124
125 return Err(StorageError::Unexpected { msg });
126 } else {
127 tracing::info!("File descriptor limit check passed: {} >= {}", current_limit.rlim_cur, limit_required);
128 }
129
130 Ok(())
131 }
132
133 pub fn read_mined_block_number(&self) -> BlockNumber {
138 self.block_number.load(Ordering::SeqCst).into()
139 }
140
141 pub fn set_mined_block_number(&self, number: BlockNumber) {
142 self.block_number.store(number.as_u32(), Ordering::SeqCst);
143 }
144
145 pub fn has_genesis(&self) -> Result<bool, StorageError> {
146 let genesis = self.read_block(BlockFilter::Number(BlockNumber::ZERO))?;
147 Ok(genesis.is_some())
148 }
149
150 pub fn read_account(&self, address: Address, point_in_time: PointInTime) -> anyhow::Result<Option<Account>, StorageError> {
155 self.state
156 .read_account(address, point_in_time)
157 .map_err(|err| StorageError::RocksError { err })
158 .inspect_err(|e| {
159 tracing::error!(reason = ?e, "failed to read account in RocksPermanent");
160 })
161 }
162
163 pub fn read_accounts(&self, addresses: Vec<Address>) -> anyhow::Result<Vec<(Address, Account)>, StorageError> {
164 self.state.read_accounts(addresses).map_err(|err| StorageError::RocksError { err })
165 }
166
167 pub fn read_slot(&self, address: Address, index: SlotIndex, point_in_time: PointInTime) -> anyhow::Result<Option<Slot>, StorageError> {
168 self.state
169 .read_slot(address, index, point_in_time)
170 .map_err(|err| StorageError::RocksError { err })
171 .inspect_err(|e| {
172 tracing::error!(reason = ?e, "failed to read slot in RocksPermanent");
173 })
174 }
175
176 pub fn read_block(&self, selection: BlockFilter) -> anyhow::Result<Option<Block>, StorageError> {
177 let block = self.state.read_block(selection).inspect_err(|e| {
178 tracing::error!(reason = ?e, "failed to read block in RocksPermanent");
179 });
180 if let Ok(Some(block)) = &block {
181 tracing::trace!(?selection, ?block, "block found");
182 }
183 block.map_err(|err| StorageError::RocksError { err })
184 }
185
186 pub fn read_block_with_changes(&self, selection: BlockFilter) -> anyhow::Result<Option<(Block, ExecutionChanges)>, StorageError> {
187 let result = self.state.read_block_with_changes(selection).inspect_err(|e| {
188 tracing::error!(reason = ?e, "failed to read block with changes in RocksPermanent");
189 });
190 if let Ok(Some(block)) = &result {
191 tracing::trace!(?selection, ?block, "block found");
192 }
193 result.map_err(|err| StorageError::RocksError { err })
194 }
195
196 pub fn read_transaction(&self, hash: Hash) -> anyhow::Result<Option<TransactionMined>, StorageError> {
197 self.state
198 .read_transaction(hash)
199 .map_err(|err| StorageError::RocksError { err })
200 .inspect_err(|e| {
201 tracing::error!(reason = ?e, "failed to read transaction in RocksPermanent");
202 })
203 }
204
205 pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>, StorageError> {
206 self.state.read_logs(filter).map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
207 tracing::error!(reason = ?e, "failed to read log in RocksPermanent");
208 })
209 }
210
211 pub fn save_genesis_block(&self, block: Block, accounts: Vec<Account>, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
212 #[cfg(feature = "rocks_metrics")]
213 {
214 self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
215 tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
216 })?;
217 }
218
219 self.state
220 .save_genesis_block(block, accounts, account_changes)
221 .map_err(|err| StorageError::RocksError { err })
222 .inspect_err(|e| {
223 tracing::error!(reason = ?e, "failed to save genesis block in RocksPermanent");
224 })
225 }
226
227 pub fn save_block(&self, block: Block, account_changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
228 #[cfg(feature = "rocks_metrics")]
229 {
230 self.state.export_metrics().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
231 tracing::error!(reason = ?e, "failed to export metrics in RocksPermanent");
232 })?;
233 }
234 self.state
235 .save_block(block, account_changes)
236 .map_err(|err| StorageError::RocksError { err })
237 .inspect_err(|e| {
238 tracing::error!(reason = ?e, "failed to save block in RocksPermanent");
239 })
240 }
241
242 pub fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<(), StorageError> {
243 self.state
244 .save_accounts(accounts)
245 .map_err(|err| StorageError::RocksError { err })
246 .inspect_err(|e| {
247 tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
248 })
249 }
250
251 #[cfg(feature = "dev")]
252 pub fn save_slot(&self, address: Address, slot: Slot) -> anyhow::Result<(), StorageError> {
253 self.state
254 .save_slot(address, slot)
255 .map_err(|err| StorageError::RocksError { err })
256 .inspect_err(|e| {
257 tracing::error!(reason = ?e, "failed to save slot in RocksPermanent");
258 })
259 }
260
261 #[cfg(feature = "dev")]
262 pub fn save_account_nonce(&self, address: Address, nonce: Nonce) -> anyhow::Result<(), StorageError> {
263 self.state
264 .save_account_nonce(address, nonce)
265 .map_err(|err| StorageError::RocksError { err })
266 .inspect_err(|e| {
267 tracing::error!(reason = ?e, "failed to save account nonce in RocksPermanent");
268 })
269 }
270
271 #[cfg(feature = "dev")]
272 pub fn save_account_balance(&self, address: Address, balance: Wei) -> anyhow::Result<(), StorageError> {
273 self.state
274 .save_account_balance(address, balance)
275 .map_err(|err| StorageError::RocksError { err })
276 .inspect_err(|e| {
277 tracing::error!(reason = ?e, "failed to save account balance in RocksPermanent");
278 })
279 }
280
281 #[cfg(feature = "metrics")]
282 async fn start_cf_size_metrics_collector(state: Arc<RocksStorageState>, interval: Duration) -> anyhow::Result<()> {
284 const TASK_NAME: &str = "rocks::cf_size_metrics";
285
286 loop {
287 if GlobalState::is_shutdown_warn(TASK_NAME) {
288 return Ok(());
289 }
290
291 if let Err(e) = state.export_column_family_size_metrics() {
292 tracing::warn!("failed to export column family metrics: {:?}", e);
293 }
294
295 traced_sleep(interval, SleepReason::Interval).await;
296 }
297 }
298
299 #[cfg(feature = "dev")]
300 pub fn save_account_code(&self, address: Address, code: Bytes) -> anyhow::Result<(), StorageError> {
301 self.state
302 .save_account_code(address, code)
303 .map_err(|err| StorageError::RocksError { err })
304 .inspect_err(|e| {
305 tracing::error!(reason = ?e, "failed to save account code in RocksPermanent");
306 })
307 }
308
309 #[cfg(feature = "dev")]
310 pub fn reset(&self) -> anyhow::Result<(), StorageError> {
311 self.block_number.store(0u32, Ordering::SeqCst);
312 self.state.reset().map_err(|err| StorageError::RocksError { err }).inspect_err(|e| {
313 tracing::error!(reason = ?e, "failed to reset in RocksPermanent");
314 })
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321
322 #[test]
323 fn test_ulimit_check_with_no_change() {
324 let result = RocksPermanentStorage::check_file_descriptor_limit(1024);
325 assert!(result.is_ok(), "ulimit check should succeed with reasonable low limit");
326 }
327
328 #[test]
329 fn test_ulimit_check_with_max_requirement() {
330 let result = RocksPermanentStorage::check_file_descriptor_limit(u64::MAX);
331 assert!(
332 result.is_err(),
333 "ulimit check should fail because the required limit is above the system maximum"
334 );
335 }
336}