stratus/eth/external_rpc/
postgres.rs1#![allow(clippy::panic)]
2
3use std::time::Duration;
4
5use log::LevelFilter;
6use sqlx::ConnectOptions;
7use sqlx::PgPool;
8use sqlx::postgres::PgConnectOptions;
9use sqlx::postgres::PgPoolOptions;
10use sqlx::types::BigDecimal;
11
12use crate::alias::JsonValue;
13use crate::eth::external_rpc::ExternalBlockWithReceipts;
14use crate::eth::external_rpc::ExternalRpc;
15use crate::eth::primitives::Account;
16use crate::eth::primitives::Address;
17use crate::eth::primitives::BlockNumber;
18use crate::eth::primitives::ExternalBlock;
19use crate::eth::primitives::ExternalReceipt;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::Wei;
22use crate::ext::SleepReason;
23use crate::ext::to_json_value;
24use crate::ext::traced_sleep;
25use crate::log_and_err;
26
27const MAX_RETRIES: u64 = 50;
28
29pub struct PostgresExternalRpc {
30 pool: PgPool,
31}
32
33#[derive(Debug)]
34pub struct PostgresExternalRpcConfig {
35 pub url: String,
36 pub connections: u32,
37 pub acquire_timeout: Duration,
38 pub slow_query_warn_threshold: Duration,
39}
40
41impl PostgresExternalRpc {
42 pub async fn new(config: PostgresExternalRpcConfig) -> anyhow::Result<Self> {
44 tracing::info!(?config, "creating postgres external rpc storage");
45
46 let options = config
47 .url
48 .as_str()
49 .parse::<PgConnectOptions>()?
50 .log_slow_statements(LevelFilter::Warn, config.slow_query_warn_threshold);
51
52 let result = PgPoolOptions::new()
53 .min_connections(config.connections)
54 .max_connections(config.connections)
55 .acquire_timeout(config.acquire_timeout)
56 .connect_with(options)
57 .await;
58
59 let pool = match result {
60 Ok(pool) => pool,
61 Err(e) => return log_and_err!(reason = e, "failed to create postgres external rpc storage"),
62 };
63
64 Ok(Self { pool })
65 }
66}
67
68impl ExternalRpc for PostgresExternalRpc {
69 async fn read_max_block_number_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
70 tracing::debug!(%start, %end, "retrieving max external block");
71
72 let result = sqlx::query_file_scalar!("src/eth/external_rpc/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
73 .fetch_one(&self.pool)
74 .await;
75
76 match result {
77 Ok(Some(max)) => Ok(Some(max.into())),
78 Ok(None) => Ok(None),
79 Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"),
80 }
81 }
82
83 async fn read_block_and_receipts_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ExternalBlockWithReceipts>> {
84 tracing::debug!(%start, %end, "retrieving external receipts in range");
85 let mut attempt: u64 = 1;
86
87 loop {
88 let result = sqlx::query_file!(
89 "src/eth/external_rpc/sql/select_external_blocks_and_receipts_in_range.sql",
90 start.as_i64(),
91 end.as_i64()
92 )
93 .fetch_all(&self.pool)
94 .await;
95
96 match result {
97 Ok(rows) => {
98 let mut blocks_with_receipts: Vec<ExternalBlockWithReceipts> = Vec::with_capacity(rows.len());
99 for row in rows {
100 let block: ExternalBlock = row.block.try_into()?;
101 let receipts: Vec<ExternalReceipt> = row.receipts.into_iter().map(TryInto::try_into).collect::<Result<_, _>>()?;
102 blocks_with_receipts.push((block, receipts));
103 }
104 return Ok(blocks_with_receipts);
105 }
106 Err(e) =>
107 if attempt <= MAX_RETRIES {
108 tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now.");
109 attempt += 1;
110
111 let backoff = Duration::from_millis(attempt.pow(2));
112 traced_sleep(backoff, SleepReason::RetryBackoff).await;
113 } else {
114 return log_and_err!(reason = e, "failed to retrieve receipts");
115 },
116 }
117 }
118 }
119
120 async fn read_initial_accounts(&self) -> anyhow::Result<Vec<Account>> {
121 tracing::debug!("retrieving external balances");
122
123 let result = sqlx::query_file!("src/eth/external_rpc/sql/select_external_balances.sql")
124 .fetch_all(&self.pool)
125 .await;
126
127 match result {
128 Ok(rows) => {
129 let mut accounts: Vec<Account> = Vec::with_capacity(rows.len());
130 for row in rows {
131 let account = Account::new_with_balance(row.address.try_into()?, row.balance.try_into()?);
132 accounts.push(account);
133 }
134 Ok(accounts)
135 }
136 Err(e) => log_and_err!(reason = e, "failed to retrieve accounts with initial balances balances"),
137 }
138 }
139
140 async fn save_initial_account(&self, address: Address, balance: Wei) -> anyhow::Result<()> {
141 tracing::debug!(%address, %balance, "saving external balance");
142
143 let result = sqlx::query_file!(
144 "src/eth/external_rpc/sql/insert_external_balance.sql",
145 address.as_ref() as &[u8],
146 TryInto::<BigDecimal>::try_into(balance)?
147 )
148 .execute(&self.pool)
149 .await;
150
151 match result {
152 Ok(_) => Ok(()),
153 Err(e) => log_and_err!(reason = e, "failed to insert external balance"),
154 }
155 }
156
157 async fn save_block_and_receipts(&self, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, ExternalReceipt)>) -> anyhow::Result<()> {
158 tracing::debug!(?block, ?receipts, "saving external block and receipts");
159
160 let mut tx = match self.pool.begin().await {
161 Ok(tx) => tx,
162 Err(e) => return log_and_err!(reason = e, "failed to init postgres transaction"),
163 };
164
165 let receipts = receipts.iter().map(|(_, receipt)| to_json_value(receipt)).collect::<Vec<JsonValue>>();
166
167 let result = sqlx::query_file!(
169 "src/eth/external_rpc/sql/insert_external_block_and_receipts.sql",
170 number.as_i64(),
171 block,
172 &receipts,
173 )
174 .execute(&mut *tx)
175 .await;
176
177 match result {
178 Ok(_) => {}
179 Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
180 tracing::warn!(reason = ?e, "block unique violation, skipping");
181 }
182 Err(e) => return log_and_err!(reason = e, "failed to insert block"),
183 }
184
185 match tx.commit().await {
186 Ok(_) => Ok(()),
187 Err(e) => log_and_err!(reason = e, "failed to commit postgres transaction"),
188 }
189 }
190}