stratus/eth/external_rpc/
postgres.rs

1#![allow(clippy::panic)]
2
3use std::time::Duration;
4
5use log::LevelFilter;
6use sqlx::ConnectOptions;
7use sqlx::PgPool;
8use sqlx::postgres::PgConnectOptions;
9use sqlx::postgres::PgPoolOptions;
10use sqlx::types::BigDecimal;
11
12use crate::alias::JsonValue;
13use crate::eth::external_rpc::ExternalBlockWithReceipts;
14use crate::eth::external_rpc::ExternalRpc;
15use crate::eth::primitives::Account;
16use crate::eth::primitives::Address;
17use crate::eth::primitives::BlockNumber;
18use crate::eth::primitives::ExternalBlock;
19use crate::eth::primitives::ExternalReceipt;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::Wei;
22use crate::ext::SleepReason;
23use crate::ext::to_json_value;
24use crate::ext::traced_sleep;
25use crate::log_and_err;
26
27const MAX_RETRIES: u64 = 50;
28
29pub struct PostgresExternalRpc {
30    pool: PgPool,
31}
32
33#[derive(Debug)]
34pub struct PostgresExternalRpcConfig {
35    pub url: String,
36    pub connections: u32,
37    pub acquire_timeout: Duration,
38    pub slow_query_warn_threshold: Duration,
39}
40
41impl PostgresExternalRpc {
42    /// Creates a new [`PostgresExternalRpc`].
43    pub async fn new(config: PostgresExternalRpcConfig) -> anyhow::Result<Self> {
44        tracing::info!(?config, "creating postgres external rpc storage");
45
46        let options = config
47            .url
48            .as_str()
49            .parse::<PgConnectOptions>()?
50            .log_slow_statements(LevelFilter::Warn, config.slow_query_warn_threshold);
51
52        let result = PgPoolOptions::new()
53            .min_connections(config.connections)
54            .max_connections(config.connections)
55            .acquire_timeout(config.acquire_timeout)
56            .connect_with(options)
57            .await;
58
59        let pool = match result {
60            Ok(pool) => pool,
61            Err(e) => return log_and_err!(reason = e, "failed to create postgres external rpc storage"),
62        };
63
64        Ok(Self { pool })
65    }
66}
67
68impl ExternalRpc for PostgresExternalRpc {
69    async fn read_max_block_number_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
70        tracing::debug!(%start, %end, "retrieving max external block");
71
72        let result = sqlx::query_file_scalar!("src/eth/external_rpc/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
73            .fetch_one(&self.pool)
74            .await;
75
76        match result {
77            Ok(Some(max)) => Ok(Some(max.into())),
78            Ok(None) => Ok(None),
79            Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"),
80        }
81    }
82
83    async fn read_block_and_receipts_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ExternalBlockWithReceipts>> {
84        tracing::debug!(%start, %end, "retrieving external receipts in range");
85        let mut attempt: u64 = 1;
86
87        loop {
88            let result = sqlx::query_file!(
89                "src/eth/external_rpc/sql/select_external_blocks_and_receipts_in_range.sql",
90                start.as_i64(),
91                end.as_i64()
92            )
93            .fetch_all(&self.pool)
94            .await;
95
96            match result {
97                Ok(rows) => {
98                    let mut blocks_with_receipts: Vec<ExternalBlockWithReceipts> = Vec::with_capacity(rows.len());
99                    for row in rows {
100                        let block: ExternalBlock = row.block.try_into()?;
101                        let receipts: Vec<ExternalReceipt> = row.receipts.into_iter().map(TryInto::try_into).collect::<Result<_, _>>()?;
102                        blocks_with_receipts.push((block, receipts));
103                    }
104                    return Ok(blocks_with_receipts);
105                }
106                Err(e) =>
107                    if attempt <= MAX_RETRIES {
108                        tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now.");
109                        attempt += 1;
110
111                        let backoff = Duration::from_millis(attempt.pow(2));
112                        traced_sleep(backoff, SleepReason::RetryBackoff).await;
113                    } else {
114                        return log_and_err!(reason = e, "failed to retrieve receipts");
115                    },
116            }
117        }
118    }
119
120    async fn read_initial_accounts(&self) -> anyhow::Result<Vec<Account>> {
121        tracing::debug!("retrieving external balances");
122
123        let result = sqlx::query_file!("src/eth/external_rpc/sql/select_external_balances.sql")
124            .fetch_all(&self.pool)
125            .await;
126
127        match result {
128            Ok(rows) => {
129                let mut accounts: Vec<Account> = Vec::with_capacity(rows.len());
130                for row in rows {
131                    let account = Account::new_with_balance(row.address.try_into()?, row.balance.try_into()?);
132                    accounts.push(account);
133                }
134                Ok(accounts)
135            }
136            Err(e) => log_and_err!(reason = e, "failed to retrieve accounts with initial balances balances"),
137        }
138    }
139
140    async fn save_initial_account(&self, address: Address, balance: Wei) -> anyhow::Result<()> {
141        tracing::debug!(%address, %balance, "saving external balance");
142
143        let result = sqlx::query_file!(
144            "src/eth/external_rpc/sql/insert_external_balance.sql",
145            address.as_ref() as &[u8],
146            TryInto::<BigDecimal>::try_into(balance)?
147        )
148        .execute(&self.pool)
149        .await;
150
151        match result {
152            Ok(_) => Ok(()),
153            Err(e) => log_and_err!(reason = e, "failed to insert external balance"),
154        }
155    }
156
157    async fn save_block_and_receipts(&self, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, ExternalReceipt)>) -> anyhow::Result<()> {
158        tracing::debug!(?block, ?receipts, "saving external block and receipts");
159
160        let mut tx = match self.pool.begin().await {
161            Ok(tx) => tx,
162            Err(e) => return log_and_err!(reason = e, "failed to init postgres transaction"),
163        };
164
165        let receipts = receipts.iter().map(|(_, receipt)| to_json_value(receipt)).collect::<Vec<JsonValue>>();
166
167        // insert block
168        let result = sqlx::query_file!(
169            "src/eth/external_rpc/sql/insert_external_block_and_receipts.sql",
170            number.as_i64(),
171            block,
172            &receipts,
173        )
174        .execute(&mut *tx)
175        .await;
176
177        match result {
178            Ok(_) => {}
179            Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
180                tracing::warn!(reason = ?e, "block unique violation, skipping");
181            }
182            Err(e) => return log_and_err!(reason = e, "failed to insert block"),
183        }
184
185        match tx.commit().await {
186            Ok(_) => Ok(()),
187            Err(e) => log_and_err!(reason = e, "failed to commit postgres transaction"),
188        }
189    }
190}