stratus/eth/miner/
miner.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::Ordering;
4use std::sync::mpsc;
5use std::time::Duration;
6
7use anyhow::anyhow;
8use parking_lot::Mutex;
9use parking_lot::RwLock;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::broadcast;
12use tokio::task::JoinSet;
13use tokio_util::sync::CancellationToken;
14use tracing::Span;
15
16use crate::eth::miner::MinerMode;
17use crate::eth::primitives::Block;
18use crate::eth::primitives::BlockHeader;
19use crate::eth::primitives::ExecutionChanges;
20use crate::eth::primitives::ExternalBlock;
21use crate::eth::primitives::Hash;
22use crate::eth::primitives::LogMessage;
23use crate::eth::primitives::StorageError;
24use crate::eth::primitives::StratusError;
25use crate::eth::primitives::TransactionExecution;
26use crate::eth::storage::StratusStorage;
27use crate::ext::DisplayExt;
28use crate::ext::not;
29use crate::globals::STRATUS_SHUTDOWN_SIGNAL;
30use crate::infra::tracing::SpanExt;
31
32cfg_if::cfg_if! {
33    if #[cfg(feature = "tracing")] {
34        use tracing::field;
35        use tracing::info_span;
36    }
37}
38
39/// Represents different types of items that can be committed to storage
40#[allow(clippy::large_enum_variant)]
41pub enum CommitItem {
42    /// A block
43    Block(Block),
44    /// A block that wasn't executed in this node and instead contains all changes already pre-computed
45    ReplicationBlock(Block),
46}
47
48pub struct Miner {
49    pub locks: MinerLocks,
50
51    storage: Arc<StratusStorage>,
52
53    /// Miner is enabled by default, but can be disabled.
54    is_paused: AtomicBool,
55
56    /// Mode the block miner is running.
57    mode: RwLock<MinerMode>,
58
59    /// Broadcasts pending transactions events.
60    pub notifier_pending_txs: broadcast::Sender<Hash>,
61
62    /// Broadcasts new mined blocks events.
63    pub notifier_blocks: broadcast::Sender<BlockHeader>,
64
65    /// Broadcasts transaction logs events.
66    pub notifier_logs: broadcast::Sender<LogMessage>,
67
68    // -------------------------------------------------------------------------
69    // Shutdown
70    // -------------------------------------------------------------------------
71    /// Signal sent to tasks to shutdown.
72    shutdown_signal: Mutex<CancellationToken>,
73
74    /// Spawned tasks for interval miner, can be used to await for complete shutdown.
75    interval_joinset: AsyncMutex<Option<JoinSet<()>>>,
76}
77
78/// Locks used in operations that mutate state.
79#[derive(Default)]
80pub struct MinerLocks {
81    save_execution: Mutex<()>,
82    pub mine_and_commit: Mutex<()>,
83    mine: Mutex<()>,
84    commit: Mutex<()>,
85}
86
87impl Miner {
88    pub fn new(storage: Arc<StratusStorage>, mode: MinerMode) -> Self {
89        tracing::info!(?mode, "creating block miner");
90        Self {
91            locks: MinerLocks::default(),
92            storage,
93            is_paused: AtomicBool::new(false),
94            mode: mode.into(),
95            notifier_pending_txs: broadcast::channel(u16::MAX as usize).0,
96            notifier_blocks: broadcast::channel(u16::MAX as usize).0,
97            notifier_logs: broadcast::channel(u16::MAX as usize).0,
98            shutdown_signal: Mutex::new(STRATUS_SHUTDOWN_SIGNAL.child_token()),
99            interval_joinset: AsyncMutex::new(None),
100        }
101    }
102
103    /// Spawns a new thread that keep mining blocks in the specified interval.
104    ///
105    /// Also unpauses `Miner` if it was paused.
106    pub async fn start_interval_mining(self: &Arc<Self>, block_time: Duration) {
107        if self.is_interval_miner_running() {
108            tracing::warn!(block_time = ?block_time.to_string_ext(), "tried to start interval mining, but it's already running, skipping");
109            return;
110        };
111
112        tracing::info!(block_time = ?block_time.to_string_ext(), "spawning interval miner");
113        self.set_mode(MinerMode::Interval(block_time));
114        self.unpause();
115
116        // spawn miner and ticker
117        let (ticks_tx, ticks_rx) = mpsc::channel();
118        let new_shutdown_signal = STRATUS_SHUTDOWN_SIGNAL.child_token();
119        let mut joinset = JoinSet::new();
120
121        joinset.spawn_blocking({
122            let shutdown = new_shutdown_signal.clone();
123            let miner_clone = Arc::clone(self);
124            || interval_miner::run(miner_clone, ticks_rx, shutdown)
125        });
126
127        joinset.spawn(interval_miner_ticker::run(block_time, ticks_tx, new_shutdown_signal.clone()));
128
129        *self.shutdown_signal.lock() = new_shutdown_signal;
130        *self.interval_joinset.lock().await = Some(joinset);
131    }
132
133    /// Shuts down interval miner, set miner mode to External.
134    pub async fn switch_to_external_mode(self: &Arc<Self>) {
135        if self.mode().is_external() {
136            tracing::warn!("trying to change mode to external, but it's already set, skipping");
137            return;
138        }
139        self.shutdown_and_wait().await;
140        self.set_mode(MinerMode::External);
141        self.unpause();
142    }
143
144    // Unpause interval miner (if in interval mode)
145    pub fn unpause(&self) {
146        self.is_paused.store(false, Ordering::Relaxed);
147    }
148
149    // Pause interval miner (if in interval mode)
150    pub fn pause(&self) {
151        self.is_paused.store(true, Ordering::Relaxed);
152    }
153
154    // Whether or not interval miner is paused (means nothing if not in interval mode)
155    pub fn is_paused(&self) -> bool {
156        self.is_paused.load(Ordering::Relaxed)
157    }
158
159    pub fn mode(&self) -> MinerMode {
160        *self.mode.read()
161    }
162
163    fn set_mode(&self, new_mode: MinerMode) {
164        *self.mode.write() = new_mode;
165    }
166
167    pub fn is_interval_miner_running(&self) -> bool {
168        match self.interval_joinset.try_lock() {
169            // check if the joinset of tasks has futures running
170            Ok(joinset) => joinset.as_ref().is_some_and(|joinset| not(joinset.is_empty())),
171            // if the joinset is locked, it's either trying to shutdown or turning on, so yes
172            Err(_) => true,
173        }
174    }
175
176    /// Shutdown if miner is interval miner.
177    async fn shutdown_and_wait(&self) {
178        // Note: we are intentionally holding this mutex till the end of the function, so that
179        // subsequent calls wait for the first to finish, and `is_interval_miner_running` works too
180        let mut joinset_lock = self.interval_joinset.lock().await;
181
182        let Some(mut joinset) = joinset_lock.take() else {
183            return;
184        };
185
186        tracing::warn!("shutting down interval miner to switch to external mode");
187
188        self.shutdown_signal.lock().cancel();
189
190        // wait for all tasks to end
191        while let Some(result) = joinset.join_next().await {
192            if let Err(e) = result {
193                tracing::error!(reason = ?e, "miner task failed");
194            }
195        }
196    }
197
198    /// Persists a transaction execution.
199    pub fn save_execution(&self, tx_execution: TransactionExecution) -> Result<(), StratusError> {
200        let tx_hash = tx_execution.info.hash;
201
202        // track
203        #[cfg(feature = "tracing")]
204        let _span = info_span!("miner::save_execution", %tx_hash).entered();
205
206        // Check if automine is enabled
207        let is_automine = self.mode().is_automine();
208
209        // if automine is enabled, only one transaction can enter the block at a time.
210        let _save_execution_lock = if is_automine { Some(self.locks.save_execution.lock()) } else { None };
211
212        // save execution to temporary storage
213        self.storage.save_execution(tx_execution)?;
214
215        // notify
216        if self.has_pending_tx_subscribers() {
217            self.send_pending_tx_notification(&Some(tx_hash));
218        }
219
220        // if automine is enabled, automatically mines a block
221        if is_automine {
222            self.mine_local_and_commit()?;
223        }
224
225        Ok(())
226    }
227
228    /// Mines external block and external transactions.
229    ///
230    /// Local transactions are not allowed to be part of the block.
231    pub fn mine_external(&self, external_block: ExternalBlock) -> anyhow::Result<(Block, ExecutionChanges)> {
232        // track
233        #[cfg(feature = "tracing")]
234        let _span = info_span!("miner::mine_external", block_number = field::Empty).entered();
235
236        // lock
237        let _mine_lock = self.locks.mine.lock();
238
239        // mine block
240        let (pending_block, changes) = self.storage.finish_pending_block()?;
241        let mut block: Block = pending_block.into();
242
243        Span::with(|s| s.rec_str("block_number", &block.header.number));
244        block.apply_external(&external_block);
245
246        match external_block == block {
247            true => Ok((block, changes)),
248            false => Err(anyhow!(
249                "mismatching block info:\n\tlocal:\n\t\tnumber: {:?}\n\t\ttimestamp: {:?}\n\t\thash: {:?}\n\texternal:\n\t\tnumber: {:?}\n\t\ttimestamp: {:?}\n\t\thash: {:?}",
250                block.number(),
251                block.header.timestamp,
252                block.hash(),
253                external_block.number(),
254                external_block.timestamp(),
255                external_block.hash()
256            )),
257        }
258    }
259
260    /// Same as [`Self::mine_local`], but automatically commits the block instead of returning it.
261    /// mainly used when is_automine is enabled.
262    pub fn mine_local_and_commit(&self) -> anyhow::Result<(), StorageError> {
263        let _mine_and_commit_lock = self.locks.mine_and_commit.lock();
264
265        let (block, changes) = self.mine_local()?;
266        self.commit(CommitItem::Block(block), changes)
267    }
268
269    /// Mines local transactions.
270    ///
271    /// External transactions are not allowed to be part of the block.
272    pub fn mine_local(&self) -> anyhow::Result<(Block, ExecutionChanges), StorageError> {
273        #[cfg(feature = "tracing")]
274        let _span = info_span!("miner::mine_local", block_number = field::Empty).entered();
275
276        // lock
277        let _mine_lock = self.locks.mine.lock();
278
279        // mine block
280        let (block, changes) = self.storage.finish_pending_block()?;
281        Span::with(|s| s.rec_str("block_number", &block.header.number));
282
283        Ok((block.into(), changes))
284    }
285
286    pub fn commit(&self, item: CommitItem, changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
287        match item {
288            CommitItem::Block(block) => self.commit_block(block, changes),
289            CommitItem::ReplicationBlock(block) => {
290                self.storage.finish_pending_block()?;
291                self.commit_block(block, changes)
292            }
293        }
294    }
295
296    /// Persists a mined block to permanent storage and prepares new block.
297    pub fn commit_block(&self, block: Block, changes: ExecutionChanges) -> anyhow::Result<(), StorageError> {
298        let block_number = block.number();
299
300        // track
301        #[cfg(feature = "tracing")]
302        let _span = info_span!("miner::commit", %block_number).entered();
303        tracing::info!(%block_number, transactions_len = %block.transactions.len(), "commiting block");
304
305        // lock
306        let _commit_lock = self.locks.commit.lock();
307
308        tracing::info!(%block_number, "miner acquired commit lock");
309
310        // extract fields to use in notifications if have subscribers
311        let block_header = if self.has_block_header_subscribers() {
312            Some(block.header.clone())
313        } else {
314            None
315        };
316        let block_logs = self.has_log_subscribers().then(|| block.create_log_messages());
317
318        // save storage
319        self.storage.save_block(block, changes)?;
320
321        // Send notifications after saving the block
322        self.send_log_notifications(&block_logs);
323        self.send_block_header_notification(&block_header);
324
325        Ok(())
326    }
327
328    // -----------------------------------------------------------------------------
329    // Notification methods
330    // -----------------------------------------------------------------------------
331
332    /// Checks if there are any subscribers for block header notifications
333    fn has_block_header_subscribers(&self) -> bool {
334        self.notifier_blocks.receiver_count() > 0
335    }
336
337    /// Checks if there are any subscribers for log notifications
338    fn has_log_subscribers(&self) -> bool {
339        self.notifier_logs.receiver_count() > 0
340    }
341
342    /// Checks if there are any subscribers for pending transaction notifications
343    fn has_pending_tx_subscribers(&self) -> bool {
344        self.notifier_pending_txs.receiver_count() > 0
345    }
346
347    /// Sends a notification for a block header
348    fn send_block_header_notification(&self, block_header: &Option<BlockHeader>) {
349        if let Some(block_header) = block_header {
350            let _ = self.notifier_blocks.send(block_header.clone());
351        }
352    }
353
354    /// Sends notifications for logs
355    fn send_log_notifications(&self, logs: &Option<Vec<LogMessage>>) {
356        if let Some(logs) = logs {
357            for log in logs {
358                let _ = self.notifier_logs.send(log.clone());
359            }
360        }
361    }
362
363    /// Sends notifications for pending transactions
364    fn send_pending_tx_notification(&self, tx_hash: &Option<Hash>) {
365        if let Some(tx_hash) = tx_hash {
366            let _ = self.notifier_pending_txs.send(*tx_hash);
367        }
368    }
369}
370
371// -----------------------------------------------------------------------------
372// Miner
373// -----------------------------------------------------------------------------
374pub mod interval_miner {
375    use std::sync::Arc;
376    use std::sync::mpsc;
377    use std::sync::mpsc::RecvTimeoutError;
378    use std::time::Duration;
379
380    use tokio::time::Instant;
381    use tokio_util::sync::CancellationToken;
382
383    use crate::eth::miner::Miner;
384    use crate::eth::miner::miner::CommitItem;
385    use crate::infra::tracing::warn_task_cancellation;
386    use crate::infra::tracing::warn_task_rx_closed;
387
388    pub fn run(miner: Arc<Miner>, ticks_rx: mpsc::Receiver<Instant>, cancellation: CancellationToken) {
389        const TASK_NAME: &str = "interval-miner-ticker";
390
391        loop {
392            if cancellation.is_cancelled() {
393                warn_task_cancellation(TASK_NAME);
394                break;
395            }
396
397            let tick = match ticks_rx.recv_timeout(Duration::from_secs(2)) {
398                Ok(tick) => tick,
399                Err(RecvTimeoutError::Disconnected) => break,
400                Err(RecvTimeoutError::Timeout) => {
401                    tracing::warn!(timeout = "2s", "timeout reading miner channel, expected 1 block per second");
402                    continue;
403                }
404            };
405
406            if miner.is_paused() {
407                tracing::warn!("skipping mining block because block mining is paused");
408                continue;
409            }
410
411            // mine
412            tracing::info!(lag_us = %tick.elapsed().as_micros(), "interval mining block");
413            mine_and_commit(&miner);
414        }
415        warn_task_rx_closed(TASK_NAME);
416    }
417
418    #[inline(always)]
419    pub fn mine_and_commit(miner: &Miner) {
420        let _mine_and_commit_lock = miner.locks.mine_and_commit.lock();
421
422        // mine
423        let (block, changes) = loop {
424            match miner.mine_local() {
425                Ok(block) => break block,
426                Err(e) => {
427                    tracing::error!(reason = ?e, "failed to mine block");
428                }
429            }
430        };
431
432        // commit
433        loop {
434            match miner.commit(CommitItem::Block(block.clone()), changes.clone()) {
435                Ok(_) => break,
436                Err(e) => {
437                    tracing::error!(reason = ?e, "failed to commit block");
438                    continue;
439                }
440            }
441        }
442    }
443}
444
445mod interval_miner_ticker {
446    use std::sync::mpsc;
447    use std::thread;
448    use std::time::Duration;
449
450    use chrono::Timelike;
451    use chrono::Utc;
452    use tokio::time::Instant;
453    use tokio_util::sync::CancellationToken;
454
455    use crate::infra::tracing::warn_task_cancellation;
456    use crate::infra::tracing::warn_task_rx_closed;
457
458    pub async fn run(block_time: Duration, ticks_tx: mpsc::Sender<Instant>, cancellation: CancellationToken) {
459        const TASK_NAME: &str = "interval-miner-ticker";
460
461        // sync to next second
462        #[allow(clippy::expect_used)]
463        let next_second = (Utc::now() + Duration::from_secs(1))
464            .with_nanosecond(0)
465            .expect("nanosecond above is set to `0`, which is always less than 2 billion");
466
467        let time_to_sleep = (next_second - Utc::now()).to_std().unwrap_or_default();
468        thread::sleep(time_to_sleep);
469
470        // prepare ticker
471        let mut ticker = tokio::time::interval(block_time);
472        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);
473
474        loop {
475            if cancellation.is_cancelled() {
476                warn_task_cancellation(TASK_NAME);
477                return;
478            }
479
480            let tick = ticker.tick().await;
481            if ticks_tx.send(tick).is_err() {
482                warn_task_rx_closed(TASK_NAME);
483                break;
484            };
485        }
486    }
487}