1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::Ordering;
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::anyhow;
8use parking_lot::Mutex;
9use parking_lot::RwLock;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::broadcast;
12use tokio::task::JoinSet;
13use tokio_util::sync::CancellationToken;
14use tracing::Span;
15
16use crate::eth::miner::MinerMode;
17use crate::eth::primitives::Block;
18use crate::eth::primitives::BlockHeader;
19use crate::eth::primitives::ExecutionChanges;
20use crate::eth::primitives::ExternalBlock;
21use crate::eth::primitives::Hash;
22use crate::eth::primitives::LogMessage;
23use crate::eth::primitives::StorageError;
24use crate::eth::primitives::StratusError;
25use crate::eth::primitives::TransactionExecution;
26use crate::eth::storage::StratusStorage;
27use crate::ext::DisplayExt;
28use crate::ext::not;
29use crate::globals::STRATUS_SHUTDOWN_SIGNAL;
30use crate::infra::tracing::SpanExt;
31
32cfg_if::cfg_if! {
33 if #[cfg(feature = "tracing")] {
34 use tracing::field;
35 use tracing::info_span;
36 }
37}
38
39#[allow(clippy::large_enum_variant)]
41pub enum CommitItem {
42 Block(Block),
44 ReplicationBlock(Block),
46}
47
48pub struct Miner {
49 pub locks: MinerLocks,
50
51 storage: Arc<StratusStorage>,
52
53 is_paused: AtomicBool,
55
56 mode: RwLock<MinerMode>,
58
59 pub notifier_pending_txs: broadcast::Sender<Hash>,
61
62 pub notifier_blocks: broadcast::Sender<BlockHeader>,
64
65 pub notifier_logs: broadcast::Sender<LogMessage>,
67
68 shutdown_signal: Mutex<CancellationToken>,
73
74 interval_joinset: AsyncMutex<Option<JoinSet<()>>>,
76}
77
78#[derive(Default)]
80pub struct MinerLocks {
81 save_execution: Mutex<()>,
82 pub mine_and_commit: Mutex<()>,
83 mine: Mutex<()>,
84 commit: Mutex<()>,
85}
86
87impl Miner {
88 pub fn new(storage: Arc<StratusStorage>, mode: MinerMode) -> Self {
89 tracing::info!(?mode, "creating block miner");
90 Self {
91 locks: MinerLocks::default(),
92 storage,
93 is_paused: AtomicBool::new(false),
94 mode: mode.into(),
95 notifier_pending_txs: broadcast::channel(u16::MAX as usize).0,
96 notifier_blocks: broadcast::channel(u16::MAX as usize).0,
97 notifier_logs: broadcast::channel(u16::MAX as usize).0,
98 shutdown_signal: Mutex::new(STRATUS_SHUTDOWN_SIGNAL.child_token()),
99 interval_joinset: AsyncMutex::new(None),
100 }
101 }
102
103 pub async fn start_interval_mining(self: &Arc<Self>, block_time: Duration) {
107 if self.is_interval_miner_running() {
108 tracing::warn!(block_time = ?block_time.to_string_ext(), "tried to start interval mining, but it's already running, skipping");
109 return;
110 };
111
112 tracing::info!(block_time = ?block_time.to_string_ext(), "spawning interval miner");
113 self.set_mode(MinerMode::Interval(block_time));
114 self.unpause();
115
116 let (ticks_tx, ticks_rx) = mpsc::channel();
118 let new_shutdown_signal = STRATUS_SHUTDOWN_SIGNAL.child_token();
119 let mut joinset = JoinSet::new();
120
121 joinset.spawn_blocking({
122 let shutdown = new_shutdown_signal.clone();
123 let miner_clone = Arc::clone(self);
124 || interval_miner::run(miner_clone, ticks_rx, shutdown)
125 });
126
127 joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone()));
128
129 *self.shutdown_signal.lock() = new_shutdown_signal;
130 *self.interval_joinset.lock().await = Some(joinset);
131 }
132
133 pub async fn switch_to_external_mode(self: &Arc<Self>) {
135 if self.mode().is_external() {
136 tracing::warn!("trying to change mode to external, but it's already set, skipping");
137 return;
138 }
139 self.shutdown_and_wait().await;
140 self.set_mode(MinerMode::External);
141 self.unpause();
142 }
143
144 pub fn unpause(&self) {
146 self.is_paused.store(false, Ordering::Relaxed);
147 }
148
149 pub fn pause(&self) {
151 self.is_paused.store(true, Ordering::Relaxed);
152 }
153
154 pub fn is_paused(&self) -> bool {
156 self.is_paused.load(Ordering::Relaxed)
157 }
158
159 pub fn mode(&self) -> MinerMode {
160 *self.mode.read()
161 }
162
163 fn set_mode(&self, new_mode: MinerMode) {
164 *self.mode.write() = new_mode;
165 }
166
167 pub fn is_interval_miner_running(&self) -> bool {
168 match self.interval_joinset.try_lock() {
169 Ok(joinset) => joinset.as_ref().is_some_and(|joinset| not(joinset.is_empty())),
171 Err(_) => true,
173 }
174 }
175
176 async fn shutdown_and_wait(&self) {
178 let mut joinset_lock = self.interval_joinset.lock().await;
181
182 let Some(mut joinset) = joinset_lock.take() else {
183 return;
184 };
185
186 tracing::warn!("shutting down interval miner to switch to external mode");
187
188 self.shutdown_signal.lock().cancel();
189
190 while let Some(result) = joinset.join_next().await {
192 if let Err(e) = result {
193 tracing::error!(reason = ?e, "miner task failed");
194 }
195 }
196 }
197
198 pub fn save_execution(&self, tx_execution: TransactionExecution) -> Result<(), StratusError> {
200 let tx_hash = tx_execution.info.hash;
201
202 #[cfg(feature = "tracing")]
204 let _span = info_span!("miner::save_execution", %tx_hash).entered();
205
206 let is_automine = self.mode().is_automine();
208
209 let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None };
211
212 self.storage.save_execution(tx_execution)?;
214
215 if self.has_pending_tx_subscribers() {
217 self.send_pending_tx_notification(&Some(tx_hash));
218 }
219
220 if is_automine {
222 self.mine_local_and_commit()?;
223 }
224
225 Ok(())
226 }
227
228 pub fn mine_external(&self, external_block: ExternalBlock) -> anyhow::Result<(Block, ExecutionChanges)> {
232 #[cfg(feature = "tracing")]
234 let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();
235
236 let _mine_lock = self.locks.mine.lock();
238
239 let (pending_block, changes) = self.storage.finish_pending_block()?;
241 let mut block: Block = pending_block.into();
242
243 Span::with(|s| s.rec_str("block_number", &block.header.number));
244 block.apply_external(&external_block);
245
246 match external_block == block {
247 true => Ok((block, changes)),
248 false => Err(anyhow!(
249 "mismatching block info:\n\tlocal:\n\t\tnumber: {:?}\n\t\ttimestamp: {:?}\n\t\thash: {:?}\n\texternal:\n\t\tnumber: {:?}\n\t\ttimestamp: {:?}\n\t\thash: {:?}",
250 block.number(),
251 block.header.timestamp,
252 block.hash(),
253 external_block.number(),
254 external_block.timestamp(),
255 external_block.hash()
256 )),
257 }
258 }
259
260 pub fn mine_local_and_commit(&self) -> anyhow::Result<(), StorageError> {
263 let _mine_and_commit_lock = self.locks.mine_and_commit.lock();
264
265 let (block, changes) = self.mine_local()?;
266 self.commit(CommitItem::Block(block), changes)
267 }
268
269 pub fn mine_local(&self) -> anyhow::Result<(Block, ExecutionChanges), StorageError> {
273 #[cfg(feature = "tracing")]
274 let _span = info_span!("miner::mine_local", block_number = field::Empty).entered();
275
276 let _mine_lock = self.locks.mine.lock();
278
279 let (block, changes) = self.storage.finish_pending_block()?;
281 Span::with(|s| s.rec_str("block_number", &block.header.number));
282
283 Ok((block.into(), changes))
284 }
285
286 pub fn commit(&self, item: CommitItem, changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
287 match item {
288 CommitItem::Block(block) => self.commit_block(block, changes),
289 CommitItem::ReplicationBlock(block) => {
290 self.storage.finish_pending_block()?;
291 self.commit_block(block, changes)
292 }
293 }
294 }
295
296 pub fn commit_block(&self, block: Block, changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
298 let block_number = block.number();
299
300 #[cfg(feature = "tracing")]
302 let _span = info_span!("miner::commit", %block_number).entered();
303 tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block");
304
305 let _commit_lock = self.locks.commit.lock();
307
308 tracing::info!(%block_number, "miner acquired commit lock");
309
310 let block_header = if self.has_block_header_subscribers() {
312 Some(block.header.clone())
313 } else {
314 None
315 };
316 let block_logs = self.has_log_subscribers().then(|| block.create_log_messages());
317
318 self.storage.save_block(block, changes)?;
320
321 self.send_log_notifications(&block_logs);
323 self.send_block_header_notification(&block_header);
324
325 Ok(())
326 }
327
328 fn has_block_header_subscribers(&self) -> bool {
334 self.notifier_blocks.receiver_count() > 0
335 }
336
337 fn has_log_subscribers(&self) -> bool {
339 self.notifier_logs.receiver_count() > 0
340 }
341
342 fn has_pending_tx_subscribers(&self) -> bool {
344 self.notifier_pending_txs.receiver_count() > 0
345 }
346
347 fn send_block_header_notification(&self, block_header: &Option<BlockHeader>) {
349 if let Some(block_header) = block_header {
350 let _ = self.notifier_blocks.send(block_header.clone());
351 }
352 }
353
354 fn send_log_notifications(&self, logs: &Option<Vec<LogMessage>>) {
356 if let Some(logs) = logs {
357 for log in logs {
358 let _ = self.notifier_logs.send(log.clone());
359 }
360 }
361 }
362
363 fn send_pending_tx_notification(&self, tx_hash: &Option<Hash>) {
365 if let Some(tx_hash) = tx_hash {
366 let _ = self.notifier_pending_txs.send(*tx_hash);
367 }
368 }
369}
370
371pub mod interval_miner {
375 use std::sync::Arc;
376 use std::sync::mpsc;
377 use std::sync::mpsc::RecvTimeoutError;
378 use std::time::Duration;
379
380 use tokio::time::Instant;
381 use tokio_util::sync::CancellationToken;
382
383 use crate::eth::miner::Miner;
384 use crate::eth::miner::miner::CommitItem;
385 use crate::infra::tracing::warn_task_cancellation;
386 use crate::infra::tracing::warn_task_rx_closed;
387
388 pub fn run(miner: Arc<Miner>, ticks_rx: mpsc::Receiver<Instant>, cancellation: CancellationToken) {
389 const TASK_NAME: &str = "interval-miner-ticker";
390
391 loop {
392 if cancellation.is_cancelled() {
393 warn_task_cancellation(TASK_NAME);
394 break;
395 }
396
397 let tick = match ticks_rx.recv_timeout(Duration::from_secs(2)) {
398 Ok(tick) => tick,
399 Err(RecvTimeoutError::Disconnected) => break,
400 Err(RecvTimeoutError::Timeout) => {
401 tracing::warn!(timeout = "2s", "timeout reading miner channel, expected 1 block per second");
402 continue;
403 }
404 };
405
406 if miner.is_paused() {
407 tracing::warn!("skipping mining block because block mining is paused");
408 continue;
409 }
410
411 tracing::info!(lag_us = %tick.elapsed().as_micros(), "interval mining block");
413 mine_and_commit(&miner);
414 }
415 warn_task_rx_closed(TASK_NAME);
416 }
417
418 #[inline(always)]
419 pub fn mine_and_commit(miner: &Miner) {
420 let _mine_and_commit_lock = miner.locks.mine_and_commit.lock();
421
422 let (block, changes) = loop {
424 match miner.mine_local() {
425 Ok(block) => break block,
426 Err(e) => {
427 tracing::error!(reason = ?e, "failed to mine block");
428 }
429 }
430 };
431
432 loop {
434 match miner.commit(CommitItem::Block(block.clone()), changes.clone()) {
435 Ok(_) => break,
436 Err(e) => {
437 tracing::error!(reason = ?e, "failed to commit block");
438 continue;
439 }
440 }
441 }
442 }
443}
444
445mod interval_miner_ticker {
446 use std::sync::mpsc;
447 use std::thread;
448 use std::time::Duration;
449
450 use chrono::Timelike;
451 use chrono::Utc;
452 use tokio::time::Instant;
453 use tokio_util::sync::CancellationToken;
454
455 use crate::infra::tracing::warn_task_cancellation;
456 use crate::infra::tracing::warn_task_rx_closed;
457
458 pub async fn run(block_time: Duration, ticks_tx: mpsc::Sender<Instant>, cancellation: CancellationToken) {
459 const TASK_NAME: &str = "interval-miner-ticker";
460
461 #[allow(clippy::expect_used)]
463 let next_second = (Utc::now() + Duration::from_secs(1))
464 .with_nanosecond(0)
465 .expect("nanosecond above is set to `0`, which is always less than 2 billion");
466
467 let time_to_sleep = (next_second - Utc::now()).to_std().unwrap_or_default();
468 thread::sleep(time_to_sleep);
469
470 let mut ticker = tokio::time::interval(block_time);
472 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
473
474 loop {
475 if cancellation.is_cancelled() {
476 warn_task_cancellation(TASK_NAME);
477 return;
478 }
479
480 let tick = ticker.tick().await;
481 if ticks_tx.send(tick).is_err() {
482 warn_task_rx_closed(TASK_NAME);
483 break;
484 };
485 }
486 }
487}