stratus/infra/blockchain_client/
blockchain_client.rs

1use std::time::Duration;
2
3use anyhow::Context;
4use anyhow::bail;
5use jsonrpsee::core::ClientError;
6use jsonrpsee::core::client::ClientT;
7use jsonrpsee::core::client::Subscription;
8use jsonrpsee::core::client::SubscriptionClientT;
9use jsonrpsee::http_client::HttpClient;
10use jsonrpsee::http_client::HttpClientBuilder;
11use jsonrpsee::ws_client::WsClient;
12use jsonrpsee::ws_client::WsClientBuilder;
13use tokio::sync::RwLock;
14use tokio::sync::RwLockReadGuard;
15
16use crate::GlobalState;
17use crate::alias::AlloyBytes;
18use crate::alias::AlloyTransaction;
19use crate::alias::JsonValue;
20use crate::eth::primitives::Address;
21use crate::eth::primitives::Block;
22use crate::eth::primitives::BlockNumber;
23use crate::eth::primitives::ExternalBlock;
24use crate::eth::primitives::ExternalBlockWithReceipts;
25use crate::eth::primitives::ExternalReceipt;
26use crate::eth::primitives::Hash;
27use crate::eth::primitives::StratusError;
28use crate::eth::primitives::TransactionError;
29use crate::eth::primitives::Wei;
30use crate::eth::rpc::RpcClientApp;
31use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
32use crate::ext::DisplayExt;
33use crate::ext::to_json_value;
34use crate::infra::tracing::TracingExt;
35use crate::log_and_err;
36
37#[derive(Debug)]
38pub struct BlockchainClient {
39    http: HttpClient,
40    pub http_url: String,
41    ws: Option<RwLock<WsClient>>,
42    ws_url: Option<String>,
43    timeout: Duration,
44    #[allow(dead_code)]
45    max_response_size_bytes: u32,
46}
47
48impl BlockchainClient {
49    /// Creates a new RPC client connected only to HTTP.
50    pub async fn new_http(http_url: &str, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<Self> {
51        Self::new_http_ws(http_url, None, timeout, max_response_size_bytes).await
52    }
53
54    /// Creates a new RPC client connected to HTTP and optionally to WS.
55    pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<Self> {
56        tracing::info!(%http_url, "creating blockchain client");
57
58        // build http provider
59        let http = Self::build_http_client(http_url, timeout, max_response_size_bytes)?;
60
61        // build ws provider
62        let ws = if let Some(ws_url) = ws_url {
63            Some(RwLock::new(Self::build_ws_client(ws_url, timeout).await?))
64        } else {
65            None
66        };
67
68        let client = Self {
69            http,
70            http_url: http_url.to_owned(),
71            ws,
72            ws_url: ws_url.map(|x| x.to_owned()),
73            timeout,
74            max_response_size_bytes,
75        };
76
77        // check health before assuming it is ok
78        client.fetch_listening().await?;
79
80        Ok(client)
81    }
82
83    fn build_http_client(url: &str, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<HttpClient> {
84        tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client");
85        match HttpClientBuilder::default()
86            .request_timeout(timeout)
87            .max_response_size(max_response_size_bytes)
88            .build(url)
89        {
90            Ok(http) => {
91                tracing::info!(%url, timeout = %timeout.to_string_ext(), "created blockchain http client");
92                Ok(http)
93            }
94            Err(e) => {
95                tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client");
96                Err(e).context("failed to create blockchain http client")
97            }
98        }
99    }
100
101    async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result<WsClient> {
102        tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client");
103        match WsClientBuilder::new().connection_timeout(timeout).build(url).await {
104            Ok(ws) => {
105                tracing::info!(%url, timeout = %timeout.to_string_ext(), "created blockchain websocket client");
106                Ok(ws)
107            }
108            Err(e) => {
109                tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client");
110                Err(e).context("failed to create blockchain websocket client")
111            }
112        }
113    }
114
115    // -------------------------------------------------------------------------
116    // Websocket
117    // -------------------------------------------------------------------------
118
119    /// Checks if the supports websocket connection.
120    pub fn supports_ws(&self) -> bool {
121        self.ws.is_some()
122    }
123
124    /// Validates it is connected to websocket and returns a reference to the websocket client.
125    async fn require_ws(&self) -> anyhow::Result<RwLockReadGuard<'_, WsClient>> {
126        match &self.ws {
127            Some(ws) => Ok(ws.read().await),
128            None => log_and_err!("blockchain client not connected to websocket"),
129        }
130    }
131
132    // -------------------------------------------------------------------------
133    // RPC queries
134    // -------------------------------------------------------------------------
135
136    /// Checks if the blockchain is listening.
137    pub async fn fetch_listening(&self) -> anyhow::Result<()> {
138        tracing::debug!("fetching listening status");
139
140        let result = self.http.request::<bool, _>("net_listening", [(); 0]).await;
141        match result {
142            Ok(_) => Ok(()),
143            Err(e) => log_and_err!(reason = e, "failed to fetch listening status"),
144        }
145    }
146
147    /// Fetches the current block number.
148    pub async fn fetch_block_number(&self) -> anyhow::Result<BlockNumber> {
149        tracing::debug!("fetching block number");
150
151        let result = self.http.request::<BlockNumber, _>("eth_blockNumber", [(); 0]).await;
152
153        match result {
154            Ok(number) => Ok(number),
155            Err(e) => log_and_err!(reason = e, "failed to fetch current block number"),
156        }
157    }
158
159    /// Fetches a block by number with receipts.
160    pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlockWithReceipts>> {
161        tracing::debug!(%block_number, "fetching block");
162
163        let number = to_json_value(block_number);
164        let result = self
165            .http
166            .request::<Option<ExternalBlockWithReceipts>, _>("stratus_getBlockAndReceipts", [number])
167            .await;
168
169        match result {
170            Ok(block) => Ok(block),
171            Err(e) => log_and_err!(reason = e, "failed to fetch block with receipts"),
172        }
173    }
174
175    /// Fetches a block by number with changes.
176    pub async fn fetch_block_with_changes(&self, block_number: BlockNumber) -> anyhow::Result<Option<(Block, BlockChangesRocksdb)>> {
177        tracing::debug!(%block_number, "fetching block with changes");
178
179        let number = to_json_value(block_number);
180        let result = self
181            .http
182            .request::<Option<(Block, BlockChangesRocksdb)>, _>("stratus_getBlockWithChanges", [number])
183            .await;
184
185        match result {
186            Ok(block) => Ok(block),
187            Err(e) => log_and_err!(reason = e, "failed to fetch block with changes"),
188        }
189    }
190
191    /// Fetches a block by number.
192    pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlock>> {
193        tracing::debug!(%block_number, "fetching block");
194
195        let number = to_json_value(block_number);
196        let result = self
197            .http
198            .request::<Option<ExternalBlock>, _>("eth_getBlockByNumber", [number, JsonValue::Bool(true)])
199            .await;
200
201        match result {
202            Ok(block) => Ok(block),
203            Err(e) => log_and_err!(reason = e, "failed to fetch block by number"),
204        }
205    }
206
207    /// Fetches a transaction by hash.
208    pub async fn fetch_transaction(&self, tx_hash: Hash) -> anyhow::Result<Option<AlloyTransaction>> {
209        tracing::debug!(%tx_hash, "fetching transaction");
210
211        let hash = to_json_value(tx_hash);
212        let result = self.http.request::<Option<AlloyTransaction>, _>("eth_getTransactionByHash", [hash]).await;
213
214        match result {
215            Ok(tx) => Ok(tx),
216            Err(e) => log_and_err!(reason = e, "failed to fetch transaction by hash"),
217        }
218    }
219
220    /// Fetches a receipt by hash.
221    pub async fn fetch_receipt(&self, tx_hash: Hash) -> anyhow::Result<Option<ExternalReceipt>> {
222        tracing::debug!(%tx_hash, "fetching transaction receipt");
223
224        let hash = to_json_value(tx_hash);
225        let result = self.http.request::<Option<ExternalReceipt>, _>("eth_getTransactionReceipt", [hash]).await;
226
227        match result {
228            Ok(receipt) => Ok(receipt),
229            Err(e) => log_and_err!(reason = e, "failed to fetch transaction receipt by hash"),
230        }
231    }
232
233    /// Fetches account balance by address and block number.
234    pub async fn fetch_balance(&self, address: Address, block_number: Option<BlockNumber>) -> anyhow::Result<Wei> {
235        tracing::debug!(%address, block_number = %block_number.or_empty(), "fetching account balance");
236
237        let address = to_json_value(address);
238        let number = to_json_value(block_number);
239        let result = self.http.request::<Wei, _>("eth_getBalance", [address, number]).await;
240
241        match result {
242            Ok(receipt) => Ok(receipt),
243            Err(e) => log_and_err!(reason = e, "failed to fetch account balance"),
244        }
245    }
246
247    // -------------------------------------------------------------------------
248    // RPC mutations
249    // -------------------------------------------------------------------------
250
251    /// Forwards a transaction to leader.
252    pub async fn send_raw_transaction_to_leader(&self, tx: AlloyBytes, rpc_client: &RpcClientApp) -> Result<Hash, StratusError> {
253        tracing::debug!("sending raw transaction to leader");
254
255        let tx = to_json_value(tx);
256        let rpc_client = to_json_value(rpc_client);
257        let result = self.http.request::<Hash, _>("eth_sendRawTransaction", [tx, rpc_client]).await;
258
259        match result {
260            Ok(hash) => Ok(hash),
261            Err(ClientError::Call(response)) => Err(TransactionError::LeaderFailed(response.into_owned()).into()),
262            Err(e) => {
263                tracing::error!(reason = ?e, "failed to send raw transaction to leader");
264                Err(TransactionError::ForwardToLeaderFailed.into())
265            }
266        }
267    }
268
269    // -------------------------------------------------------------------------
270    // RPC subscriptions
271    // -------------------------------------------------------------------------
272
273    pub async fn subscribe_new_heads(&self) -> anyhow::Result<Subscription<ExternalBlock>> {
274        const TASK_NAME: &str = "blockchain::subscribe_new_heads";
275        tracing::debug!("subscribing to newHeads event");
276
277        let mut first_attempt = true;
278        loop {
279            if GlobalState::is_shutdown_warn(TASK_NAME) {
280                bail!("shutdown warning");
281            };
282
283            let ws_read = self.require_ws().await?;
284            let result = ws_read
285                .subscribe::<ExternalBlock, _>("eth_subscribe", [JsonValue::String("newHeads".to_owned())], "eth_unsubscribe")
286                .await;
287
288            match result {
289                // subscribed
290                Ok(sub) => return Ok(sub),
291
292                // failed and need to reconnect
293                e @ Err(ClientError::RestartNeeded(_)) => {
294                    // will try to reconnect websocket client only in first attempt
295                    if first_attempt {
296                        tracing::error!(reason = ?e, %first_attempt, "failed to subscribe to newHeads event. trying to reconnect websocket client now.");
297                    } else {
298                        tracing::error!(reason = ?e, %first_attempt, "failed to subscribe to newHeads event. will not try to reconnect websocket client.");
299                        return e.context("failed to subscribe to newHeads event");
300                    }
301                    first_attempt = false;
302
303                    // reconnect websocket client
304                    let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?;
305                    drop(ws_read);
306                    let mut ws_write = self.ws.as_ref().unwrap().write().await;
307                    let _ = std::mem::replace(&mut *ws_write, new_ws_client);
308                }
309
310                // failed and cannot do anything
311                Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"),
312            }
313        }
314    }
315}