stratus/eth/external_rpc/
postgres.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#![allow(clippy::panic)]

use std::time::Duration;

use log::LevelFilter;
use sqlx::postgres::PgConnectOptions;
use sqlx::postgres::PgPoolOptions;
use sqlx::types::BigDecimal;
use sqlx::ConnectOptions;
use sqlx::PgPool;

use crate::alias::JsonValue;
use crate::eth::external_rpc::ExternalBlockWithReceipts;
use crate::eth::external_rpc::ExternalRpc;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;
use crate::eth::primitives::Wei;
use crate::ext::to_json_value;
use crate::ext::traced_sleep;
use crate::ext::SleepReason;
use crate::log_and_err;

const MAX_RETRIES: u64 = 50;

pub struct PostgresExternalRpc {
    pool: PgPool,
}

#[derive(Debug)]
pub struct PostgresExternalRpcConfig {
    pub url: String,
    pub connections: u32,
    pub acquire_timeout: Duration,
    pub slow_query_warn_threshold: Duration,
}

impl PostgresExternalRpc {
    /// Creates a new [`PostgresExternalRpc`].
    pub async fn new(config: PostgresExternalRpcConfig) -> anyhow::Result<Self> {
        tracing::info!(?config, "creating postgres external rpc storage");

        let options = config
            .url
            .as_str()
            .parse::<PgConnectOptions>()?
            .log_slow_statements(LevelFilter::Warn, config.slow_query_warn_threshold);

        let result = PgPoolOptions::new()
            .min_connections(config.connections)
            .max_connections(config.connections)
            .acquire_timeout(config.acquire_timeout)
            .connect_with(options)
            .await;

        let pool = match result {
            Ok(pool) => pool,
            Err(e) => return log_and_err!(reason = e, "failed to create postgres external rpc storage"),
        };

        Ok(Self { pool })
    }
}

impl ExternalRpc for PostgresExternalRpc {
    async fn read_max_block_number_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
        tracing::debug!(%start, %end, "retrieving max external block");

        let result = sqlx::query_file_scalar!("src/eth/external_rpc/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
            .fetch_one(&self.pool)
            .await;

        match result {
            Ok(Some(max)) => Ok(Some(max.into())),
            Ok(None) => Ok(None),
            Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"),
        }
    }

    async fn read_block_and_receipts_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ExternalBlockWithReceipts>> {
        tracing::debug!(%start, %end, "retrieving external receipts in range");
        let mut attempt: u64 = 1;

        loop {
            let result = sqlx::query_file!(
                "src/eth/external_rpc/sql/select_external_blocks_and_receipts_in_range.sql",
                start.as_i64(),
                end.as_i64()
            )
            .fetch_all(&self.pool)
            .await;

            match result {
                Ok(rows) => {
                    let mut blocks_with_receipts: Vec<ExternalBlockWithReceipts> = Vec::with_capacity(rows.len());
                    for row in rows {
                        let block: ExternalBlock = row.block.try_into()?;
                        let receipts: Vec<ExternalReceipt> = row.receipts.into_iter().map(TryInto::try_into).collect::<Result<_, _>>()?;
                        blocks_with_receipts.push((block, receipts));
                    }
                    return Ok(blocks_with_receipts);
                }
                Err(e) =>
                    if attempt <= MAX_RETRIES {
                        tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now.");
                        attempt += 1;

                        let backoff = Duration::from_millis(attempt.pow(2));
                        traced_sleep(backoff, SleepReason::RetryBackoff).await;
                    } else {
                        return log_and_err!(reason = e, "failed to retrieve receipts");
                    },
            }
        }
    }

    async fn read_initial_accounts(&self) -> anyhow::Result<Vec<Account>> {
        tracing::debug!("retrieving external balances");

        let result = sqlx::query_file!("src/eth/external_rpc/sql/select_external_balances.sql")
            .fetch_all(&self.pool)
            .await;

        match result {
            Ok(rows) => {
                let mut accounts: Vec<Account> = Vec::with_capacity(rows.len());
                for row in rows {
                    let account = Account::new_with_balance(row.address.try_into()?, row.balance.try_into()?);
                    accounts.push(account);
                }
                Ok(accounts)
            }
            Err(e) => log_and_err!(reason = e, "failed to retrieve accounts with initial balances balances"),
        }
    }

    async fn save_initial_account(&self, address: Address, balance: Wei) -> anyhow::Result<()> {
        tracing::debug!(%address, %balance, "saving external balance");

        let result = sqlx::query_file!(
            "src/eth/external_rpc/sql/insert_external_balance.sql",
            address.as_ref(),
            TryInto::<BigDecimal>::try_into(balance)?
        )
        .execute(&self.pool)
        .await;

        match result {
            Ok(_) => Ok(()),
            Err(e) => log_and_err!(reason = e, "failed to insert external balance"),
        }
    }

    async fn save_block_and_receipts(&self, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, ExternalReceipt)>) -> anyhow::Result<()> {
        tracing::debug!(?block, ?receipts, "saving external block and receipts");

        let mut tx = match self.pool.begin().await {
            Ok(tx) => tx,
            Err(e) => return log_and_err!(reason = e, "failed to init postgres transaction"),
        };

        let receipts = receipts.iter().map(|(_, receipt)| to_json_value(receipt)).collect::<Vec<JsonValue>>();

        // insert block
        let result = sqlx::query_file!(
            "src/eth/external_rpc/sql/insert_external_block_and_receipts.sql",
            number.as_i64(),
            block,
            &receipts,
        )
        .execute(&mut *tx)
        .await;

        match result {
            Ok(_) => {}
            Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
                tracing::warn!(reason = ?e, "block unique violation, skipping");
            }
            Err(e) => return log_and_err!(reason = e, "failed to insert block"),
        }

        match tx.commit().await {
            Ok(_) => Ok(()),
            Err(e) => log_and_err!(reason = e, "failed to commit postgres transaction"),
        }
    }
}