stratus/infra/blockchain_client/
blockchain_client.rs1use std::time::Duration;
2
3use anyhow::Context;
4use anyhow::bail;
5use jsonrpsee::core::ClientError;
6use jsonrpsee::core::client::ClientT;
7use jsonrpsee::core::client::Subscription;
8use jsonrpsee::core::client::SubscriptionClientT;
9use jsonrpsee::http_client::HttpClient;
10use jsonrpsee::http_client::HttpClientBuilder;
11use jsonrpsee::ws_client::WsClient;
12use jsonrpsee::ws_client::WsClientBuilder;
13use tokio::sync::RwLock;
14use tokio::sync::RwLockReadGuard;
15
16use crate::GlobalState;
17use crate::alias::AlloyBytes;
18use crate::alias::AlloyTransaction;
19use crate::alias::JsonValue;
20use crate::eth::primitives::Address;
21use crate::eth::primitives::Block;
22use crate::eth::primitives::BlockNumber;
23use crate::eth::primitives::ExternalBlock;
24use crate::eth::primitives::ExternalBlockWithReceipts;
25use crate::eth::primitives::ExternalReceipt;
26use crate::eth::primitives::Hash;
27use crate::eth::primitives::StratusError;
28use crate::eth::primitives::TransactionError;
29use crate::eth::primitives::Wei;
30use crate::eth::rpc::RpcClientApp;
31use crate::eth::storage::permanent::rocks::types::BlockChangesRocksdb;
32use crate::ext::DisplayExt;
33use crate::ext::to_json_value;
34use crate::infra::tracing::TracingExt;
35use crate::log_and_err;
36
37#[derive(Debug)]
38pub struct BlockchainClient {
39 http: HttpClient,
40 pub http_url: String,
41 ws: Option<RwLock<WsClient>>,
42 ws_url: Option<String>,
43 timeout: Duration,
44 #[allow(dead_code)]
45 max_response_size_bytes: u32,
46}
47
48impl BlockchainClient {
49 pub async fn new_http(http_url: &str, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<Self> {
51 Self::new_http_ws(http_url, None, timeout, max_response_size_bytes).await
52 }
53
54 pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<Self> {
56 tracing::info!(%http_url, "creating blockchain client");
57
58 let http = Self::build_http_client(http_url, timeout, max_response_size_bytes)?;
60
61 let ws = if let Some(ws_url) = ws_url {
63 Some(RwLock::new(Self::build_ws_client(ws_url, timeout).await?))
64 } else {
65 None
66 };
67
68 let client = Self {
69 http,
70 http_url: http_url.to_owned(),
71 ws,
72 ws_url: ws_url.map(|x| x.to_owned()),
73 timeout,
74 max_response_size_bytes,
75 };
76
77 client.fetch_listening().await?;
79
80 Ok(client)
81 }
82
83 fn build_http_client(url: &str, timeout: Duration, max_response_size_bytes: u32) -> anyhow::Result<HttpClient> {
84 tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client");
85 match HttpClientBuilder::default()
86 .request_timeout(timeout)
87 .max_response_size(max_response_size_bytes)
88 .build(url)
89 {
90 Ok(http) => {
91 tracing::info!(%url, timeout = %timeout.to_string_ext(), "created blockchain http client");
92 Ok(http)
93 }
94 Err(e) => {
95 tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client");
96 Err(e).context("failed to create blockchain http client")
97 }
98 }
99 }
100
101 async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result<WsClient> {
102 tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client");
103 match WsClientBuilder::new().connection_timeout(timeout).build(url).await {
104 Ok(ws) => {
105 tracing::info!(%url, timeout = %timeout.to_string_ext(), "created blockchain websocket client");
106 Ok(ws)
107 }
108 Err(e) => {
109 tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client");
110 Err(e).context("failed to create blockchain websocket client")
111 }
112 }
113 }
114
115 pub fn supports_ws(&self) -> bool {
121 self.ws.is_some()
122 }
123
124 async fn require_ws(&self) -> anyhow::Result<RwLockReadGuard<'_, WsClient>> {
126 match &self.ws {
127 Some(ws) => Ok(ws.read().await),
128 None => log_and_err!("blockchain client not connected to websocket"),
129 }
130 }
131
132 pub async fn fetch_listening(&self) -> anyhow::Result<()> {
138 tracing::debug!("fetching listening status");
139
140 let result = self.http.request::<bool, _>("net_listening", [(); 0]).await;
141 match result {
142 Ok(_) => Ok(()),
143 Err(e) => log_and_err!(reason = e, "failed to fetch listening status"),
144 }
145 }
146
147 pub async fn fetch_block_number(&self) -> anyhow::Result<BlockNumber> {
149 tracing::debug!("fetching block number");
150
151 let result = self.http.request::<BlockNumber, _>("eth_blockNumber", [(); 0]).await;
152
153 match result {
154 Ok(number) => Ok(number),
155 Err(e) => log_and_err!(reason = e, "failed to fetch current block number"),
156 }
157 }
158
159 pub async fn fetch_block_and_receipts(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlockWithReceipts>> {
161 tracing::debug!(%block_number, "fetching block");
162
163 let number = to_json_value(block_number);
164 let result = self
165 .http
166 .request::<Option<ExternalBlockWithReceipts>, _>("stratus_getBlockAndReceipts", [number])
167 .await;
168
169 match result {
170 Ok(block) => Ok(block),
171 Err(e) => log_and_err!(reason = e, "failed to fetch block with receipts"),
172 }
173 }
174
175 pub async fn fetch_block_with_changes(&self, block_number: BlockNumber) -> anyhow::Result<Option<(Block, BlockChangesRocksdb)>> {
177 tracing::debug!(%block_number, "fetching block with changes");
178
179 let number = to_json_value(block_number);
180 let result = self
181 .http
182 .request::<Option<(Block, BlockChangesRocksdb)>, _>("stratus_getBlockWithChanges", [number])
183 .await;
184
185 match result {
186 Ok(block) => Ok(block),
187 Err(e) => log_and_err!(reason = e, "failed to fetch block with changes"),
188 }
189 }
190
191 pub async fn fetch_block(&self, block_number: BlockNumber) -> anyhow::Result<Option<ExternalBlock>> {
193 tracing::debug!(%block_number, "fetching block");
194
195 let number = to_json_value(block_number);
196 let result = self
197 .http
198 .request::<Option<ExternalBlock>, _>("eth_getBlockByNumber", [number, JsonValue::Bool(true)])
199 .await;
200
201 match result {
202 Ok(block) => Ok(block),
203 Err(e) => log_and_err!(reason = e, "failed to fetch block by number"),
204 }
205 }
206
207 pub async fn fetch_transaction(&self, tx_hash: Hash) -> anyhow::Result<Option<AlloyTransaction>> {
209 tracing::debug!(%tx_hash, "fetching transaction");
210
211 let hash = to_json_value(tx_hash);
212 let result = self.http.request::<Option<AlloyTransaction>, _>("eth_getTransactionByHash", [hash]).await;
213
214 match result {
215 Ok(tx) => Ok(tx),
216 Err(e) => log_and_err!(reason = e, "failed to fetch transaction by hash"),
217 }
218 }
219
220 pub async fn fetch_receipt(&self, tx_hash: Hash) -> anyhow::Result<Option<ExternalReceipt>> {
222 tracing::debug!(%tx_hash, "fetching transaction receipt");
223
224 let hash = to_json_value(tx_hash);
225 let result = self.http.request::<Option<ExternalReceipt>, _>("eth_getTransactionReceipt", [hash]).await;
226
227 match result {
228 Ok(receipt) => Ok(receipt),
229 Err(e) => log_and_err!(reason = e, "failed to fetch transaction receipt by hash"),
230 }
231 }
232
233 pub async fn fetch_balance(&self, address: Address, block_number: Option<BlockNumber>) -> anyhow::Result<Wei> {
235 tracing::debug!(%address, block_number = %block_number.or_empty(), "fetching account balance");
236
237 let address = to_json_value(address);
238 let number = to_json_value(block_number);
239 let result = self.http.request::<Wei, _>("eth_getBalance", [address, number]).await;
240
241 match result {
242 Ok(receipt) => Ok(receipt),
243 Err(e) => log_and_err!(reason = e, "failed to fetch account balance"),
244 }
245 }
246
247 pub async fn send_raw_transaction_to_leader(&self, tx: AlloyBytes, rpc_client: &RpcClientApp) -> Result<Hash, StratusError> {
253 tracing::debug!("sending raw transaction to leader");
254
255 let tx = to_json_value(tx);
256 let rpc_client = to_json_value(rpc_client);
257 let result = self.http.request::<Hash, _>("eth_sendRawTransaction", [tx, rpc_client]).await;
258
259 match result {
260 Ok(hash) => Ok(hash),
261 Err(ClientError::Call(response)) => Err(TransactionError::LeaderFailed(response.into_owned()).into()),
262 Err(e) => {
263 tracing::error!(reason = ?e, "failed to send raw transaction to leader");
264 Err(TransactionError::ForwardToLeaderFailed.into())
265 }
266 }
267 }
268
269 pub async fn subscribe_new_heads(&self) -> anyhow::Result<Subscription<ExternalBlock>> {
274 const TASK_NAME: &str = "blockchain::subscribe_new_heads";
275 tracing::debug!("subscribing to newHeads event");
276
277 let mut first_attempt = true;
278 loop {
279 if GlobalState::is_shutdown_warn(TASK_NAME) {
280 bail!("shutdown warning");
281 };
282
283 let ws_read = self.require_ws().await?;
284 let result = ws_read
285 .subscribe::<ExternalBlock, _>("eth_subscribe", [JsonValue::String("newHeads".to_owned())], "eth_unsubscribe")
286 .await;
287
288 match result {
289 Ok(sub) => return Ok(sub),
291
292 e @ Err(ClientError::RestartNeeded(_)) => {
294 if first_attempt {
296 tracing::error!(reason = ?e, %first_attempt, "failed to subscribe to newHeads event. trying to reconnect websocket client now.");
297 } else {
298 tracing::error!(reason = ?e, %first_attempt, "failed to subscribe to newHeads event. will not try to reconnect websocket client.");
299 return e.context("failed to subscribe to newHeads event");
300 }
301 first_attempt = false;
302
303 let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?;
305 drop(ws_read);
306 let mut ws_write = self.ws.as_ref().unwrap().write().await;
307 let _ = std::mem::replace(&mut *ws_write, new_ws_client);
308 }
309
310 Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"),
312 }
313 }
314 }
315}