stratus/ledger/
events.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::fmt::Debug;
5
6use alloy_primitives::B256;
7use alloy_primitives::U256;
8use chrono::DateTime;
9use chrono::Utc;
10use display_json::DebugAsJson;
11use hex_literal::hex;
12use itertools::Itertools;
13use serde::Deserialize;
14use serde::Serialize;
15use serde::ser::SerializeStruct;
16use uuid::Uuid;
17
18use crate::eth::primitives::Address;
19use crate::eth::primitives::BlockNumber;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogTopic;
22use crate::eth::primitives::TransactionMined;
23use crate::eth::primitives::UnixTime;
24use crate::if_else;
25
26/// Represents token transfers (debits and credits) associated with a specific Ethereum account within a single transaction.
27///
28/// The `account_address` field identifies the primary account involved in these transfers.
29///
30/// An event will be generated for each account involved in a transaction, meaning if a transaction is not the primary in this event,
31/// in another event it will treated as the primary and credit and debit operations adjusted accordingly.
32///
33/// A single event can contain multiple token transfers (e.g., a customer is debited for a card payment but receives a credit as cashback)
34#[derive(DebugAsJson)]
35pub struct AccountTransfers {
36    /// ID of the event publication.
37    ///
38    /// If the event is republished, a new ID will be generated for the publication.
39    ///
40    /// Format: UUID v7 (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
41    pub publication_id: Uuid,
42
43    /// Datetime of the event publication.
44    ///
45    /// Format: ISO 8601
46    pub publication_datetime: DateTime<Utc>,
47
48    /// Address of the account that is part of all transfers. Also referenced as primary account address.
49    ///
50    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
51    pub account_address: Address,
52
53    /// Hash of the Ethereum transaction that originated transfers.
54    ///
55    /// Format: Prefixed transaction hash - 32 bytes - 0x1234567890123456789012345678901234567890123456789012345678901234
56    pub transaction_hash: Hash,
57
58    /// Index of the transaction in the block where it was generated.
59    ///
60    /// Used for ordering multiple events from the same user that happened in the same block.
61    ///
62    /// Format: Integer in base 10 - Range: 0 to [`u64::MAX`]
63    pub transaction_index: u64,
64
65    /// Address of the contract that originated transfers.
66    ///
67    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
68    pub contract_address: Address,
69
70    /// Identifier of the Ethereum function that originated transfers.
71    ///
72    /// Format: Prefixed function signature - 4 bytes - 0x12345678
73    pub function_id: [u8; 4],
74
75    /// Number of the block that originated transfers.
76    ///
77    /// Format: Integer in base 10 - Range: 0 to [`u64::MAX`]
78    pub block_number: BlockNumber,
79
80    /// Datetime of the Ethereum block that originated transfers.
81    ///
82    /// Format: ISO 8601
83    pub block_datetime: DateTime<Utc>,
84
85    /// List of transfers the `account_address` is part of.
86    pub transfers: Vec<AccountTransfer>,
87}
88
89impl AccountTransfers {
90    /// Idempotency key of the event payload.
91    ///
92    /// It is unique for each distinct event, but consistent across retries or republications of the same event payload.
93    ///
94    /// Format: String - transaction_hash::account_address - 0x1234567890123456789012345678901234567890123456789012345678901234::0x1234567890123456789012345678901234567890
95    pub fn idempotency_key(&self) -> String {
96        format!("{}::{}", self.transaction_hash, self.account_address)
97    }
98}
99
100/// Represents a token transfer between a debit party and a credit party that happened inside a transaction.
101#[derive(DebugAsJson)]
102pub struct AccountTransfer {
103    /// Address of the token contract that executed the transfer between `debit_party_address` and `credit_party_address`.
104    ///
105    /// It may differ from the `contract_address` because any contract can execute transfers in token contracts.
106    ///
107    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
108    pub token_address: Address,
109
110    /// Address of the account from which the token was subtracted.
111    ///
112    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
113    pub debit_party_address: Address,
114
115    /// Address of the account to which the token was added.
116    ///
117    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
118    pub credit_party_address: Address,
119
120    /// Direction of the transfer relative to the primary account address (credit or debit).
121    ///
122    /// Format: [debit, credit]
123    pub direction: AccountTransferDirection,
124
125    /// Amount transferred from debit party to credit party.
126    ///
127    /// Format: Integer in base 10 - Formatted as String to avoid losing precision - Range: 0 to [`U256::MAX`].
128    pub amount: U256,
129}
130
131/// Direction of a transfer relative to the primary account address.
132#[derive(DebugAsJson, strum::EnumIs, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum AccountTransferDirection {
135    /// `account_address` is being credited.
136    Credit,
137
138    /// `account_address` is being debited.
139    Debit,
140}
141
142// -----------------------------------------------------------------------------
143// Serializers
144// -----------------------------------------------------------------------------
145
146impl Serialize for AccountTransfers {
147    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        let mut state = serializer.serialize_struct("AccountTransfersEvent", 11)?;
152
153        state.serialize_field("publication_id", &self.publication_id.to_string())?;
154        state.serialize_field("publication_datetime", &self.publication_datetime.to_rfc3339())?;
155        state.serialize_field("idempotency_key", &self.idempotency_key())?;
156        state.serialize_field("account_address", &self.account_address)?;
157        state.serialize_field("transaction_hash", &self.transaction_hash)?;
158        state.serialize_field("contract_address", &self.contract_address)?;
159        state.serialize_field("function_id", &const_hex::encode_prefixed(self.function_id))?;
160        state.serialize_field("block_number", &self.block_number.as_u64())?;
161        state.serialize_field("block_datetime", &self.block_datetime.to_rfc3339())?;
162        state.serialize_field("transaction_index", &self.transaction_index)?;
163        state.serialize_field("transfers", &self.transfers)?;
164        state.end()
165    }
166}
167
168impl Serialize for AccountTransfer {
169    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170    where
171        S: serde::Serializer,
172    {
173        let mut state = serializer.serialize_struct("AccountTransfer", 5)?;
174        state.serialize_field("token_address", &self.token_address)?;
175        state.serialize_field("debit_party_address", &self.debit_party_address)?;
176        state.serialize_field("credit_party_address", &self.credit_party_address)?;
177        state.serialize_field("amount", &self.amount.to_string())?;
178        state.serialize_field("direction", &self.direction)?;
179        state.end()
180    }
181}
182
183// -----------------------------------------------------------------------------
184// Marker Trait
185// -----------------------------------------------------------------------------
186
187/// Struct is an event that can be published to external systems.
188pub trait Event: Serialize + Sized + Debug {
189    /// Returns the partition key component of the event.
190    fn event_key(&self) -> anyhow::Result<String>;
191
192    /// Returns the headers component of the event.
193    ///
194    /// By default, it returns empty headers.
195    fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
196        Ok(HashMap::default())
197    }
198
199    /// Returns the payload component of the event.
200    ///
201    /// By default, it serializes the implementing struct as JSON.
202    fn event_payload(&self) -> anyhow::Result<String> {
203        Ok(serde_json::to_string(self)?)
204    }
205}
206
207impl Event for AccountTransfers {
208    fn event_key(&self) -> anyhow::Result<String> {
209        Ok(self.account_address.to_string())
210    }
211}
212
213// -----------------------------------------------------------------------------
214// Conversions
215// -----------------------------------------------------------------------------
216
217/// ERC-20 transfer event hash.
218const TRANSFER_EVENT: LogTopic = LogTopic(B256::new(hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")));
219
220/// Converts a mined transaction into multiple account transfers events to be published.
221pub fn transaction_to_events(block_timestamp: UnixTime, tx: Cow<TransactionMined>) -> Vec<AccountTransfers> {
222    // identify token transfers in transaction
223    let transfers = tx
224        .as_ref()
225        .logs
226        .iter()
227        .filter(|log| log.log.topic0.is_some_and(|topic0| topic0 == TRANSFER_EVENT))
228        .filter_map(|log| {
229            let amount_bytes: [u8; 32] = match log.log.data.0.clone().try_into() {
230                Ok(amount_bytes) => amount_bytes,
231                Err(_) => {
232                    tracing::error!(?log.transaction_hash, "bug: event identified as ERC-20 transfer should have the amount as 32 bytes in the data field");
233                    return None;
234                }
235            };
236
237            let token = log.log.address;
238            let from: Address = log.log.topic1?.into();
239            let to: Address = log.log.topic2?.into();
240            let amount = U256::from_be_bytes(amount_bytes); // TODO: review
241
242            Some((token, from, to, amount))
243        })
244        .collect_vec();
245
246    // identify accounts involved in transfers
247    let mut accounts = HashSet::new();
248    for (_, from, to, _) in &transfers {
249        accounts.insert(from);
250        accounts.insert(to);
251    }
252
253    // for each account, generate an event
254    let mut events = Vec::with_capacity(accounts.len());
255    for account in accounts {
256        // generate base event
257        let mut event = AccountTransfers {
258            publication_id: Uuid::now_v7(),
259            publication_datetime: Utc::now(),
260            account_address: *account,
261            transaction_hash: tx.input.hash,
262            transaction_index: tx.transaction_index.0,
263            contract_address: tx.input.to.unwrap_or_else(|| {
264                tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the contract address");
265                Address::ZERO
266            }),
267            function_id: tx.input.input[0..4].try_into().unwrap_or_else(|_| {
268                tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the 4-byte signature");
269                [0; 4]
270            }),
271            block_number: tx.block_number,
272            block_datetime: block_timestamp.into(),
273            transfers: vec![],
274        };
275
276        // generate transfers
277        for (token, from, to, amount) in &transfers {
278            if account != from && account != to {
279                continue;
280            }
281            let direction = if_else!(account == from, AccountTransferDirection::Debit, AccountTransferDirection::Credit);
282            let transfer = AccountTransfer {
283                token_address: *token,
284                debit_party_address: *from,
285                credit_party_address: *to,
286                direction,
287                amount: *amount,
288            };
289            event.transfers.push(transfer);
290        }
291        events.push(event);
292    }
293
294    events
295}
296
297// -----------------------------------------------------------------------------
298// Tests
299// -----------------------------------------------------------------------------
300
301#[cfg(test)]
302mod tests {
303    use std::borrow::Cow;
304
305    use alloy_primitives::U256;
306    use chrono::DateTime;
307    use chrono::Utc;
308    use fake::Fake;
309    use fake::Faker;
310    use serde_json::json;
311    use uuid::Uuid;
312
313    use crate::eth::primitives::Address;
314    use crate::eth::primitives::BlockNumber;
315    use crate::eth::primitives::Bytes;
316    use crate::eth::primitives::Hash;
317    use crate::eth::primitives::LogMined;
318    use crate::eth::primitives::TransactionMined;
319    use crate::eth::primitives::UnixTime;
320    use crate::eth::primitives::test_accounts;
321    use crate::ext::to_json_value;
322    use crate::ledger::events::AccountTransfer;
323    use crate::ledger::events::AccountTransferDirection;
324    use crate::ledger::events::AccountTransfers;
325    use crate::ledger::events::TRANSFER_EVENT;
326    use crate::ledger::events::transaction_to_events;
327
328    #[test]
329    fn ledger_events_serde_account_transfers() {
330        let event = AccountTransfers {
331            publication_id: Uuid::nil(),
332            publication_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
333            account_address: Address::ZERO,
334            transaction_hash: Hash::ZERO,
335            transaction_index: 123,
336            block_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
337            contract_address: Address::ZERO,
338            function_id: [0, 0, 0, 0],
339            block_number: BlockNumber::ZERO,
340            transfers: vec![AccountTransfer {
341                token_address: Address::ZERO,
342                debit_party_address: Address::ZERO,
343                credit_party_address: Address::ZERO,
344                amount: U256::MAX,
345                direction: AccountTransferDirection::Credit,
346            }],
347        };
348        let expected = json!(
349            {
350                "publication_id": "00000000-0000-0000-0000-000000000000",
351                "publication_datetime": "2024-10-16T19:47:50+00:00",
352                "idempotency_key": "0x0000000000000000000000000000000000000000000000000000000000000000::0x0000000000000000000000000000000000000000",
353                "account_address": "0x0000000000000000000000000000000000000000",
354                "transaction_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
355                "transaction_index": 123,
356                "contract_address":"0x0000000000000000000000000000000000000000",
357                "function_id": "0x00000000",
358                "block_number": 0,
359                "block_datetime": "2024-10-16T19:47:50+00:00",
360                "transfers": [{
361                    "token_address": "0x0000000000000000000000000000000000000000",
362                    "debit_party_address": "0x0000000000000000000000000000000000000000",
363                    "credit_party_address": "0x0000000000000000000000000000000000000000",
364                    "direction": "credit",
365                    "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639935"
366                }],
367            }
368        );
369        assert_eq!(to_json_value(&event), expected);
370    }
371
372    #[test]
373    fn ledger_events_serde_event_account_transfer_direction() {
374        assert_eq!(to_json_value(&AccountTransferDirection::Credit), json!("credit"));
375        assert_eq!(to_json_value(&AccountTransferDirection::Debit), json!("debit"));
376    }
377
378    #[test]
379    fn ledger_events_parse_transfer_events() {
380        // reference values
381        let accounts = test_accounts();
382        let alice = &accounts[0];
383        let bob = &accounts[1];
384        let charlie = &accounts[2];
385        let token_address = Address::BRLC;
386        let amount_bytes = [
387            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255,
388        ];
389        let amount_u256 = U256::from_be_slice(&amount_bytes);
390
391        // 1. generate fake block data transaction and block data
392        let block_timestamp: UnixTime = 1729108070.into();
393
394        // 2. generate fake tx data
395        let mut tx: TransactionMined = Fake::fake(&Faker);
396        tx.input.input = Bytes(vec![1, 2, 3, 4, 5, 6, 7, 8]);
397
398        let mut log_transfer1: LogMined = Fake::fake(&Faker);
399        log_transfer1.log.address = token_address;
400        log_transfer1.log.topic0 = Some(TRANSFER_EVENT);
401        log_transfer1.log.topic1 = Some(alice.address.into());
402        log_transfer1.log.topic2 = Some(bob.address.into());
403        log_transfer1.log.data = Bytes(amount_bytes.to_vec());
404
405        let mut log_transfer2: LogMined = Fake::fake(&Faker);
406        log_transfer2.log.address = token_address;
407        log_transfer2.log.topic0 = Some(TRANSFER_EVENT);
408        log_transfer2.log.topic1 = Some(bob.address.into());
409        log_transfer2.log.topic2 = Some(charlie.address.into());
410        log_transfer2.log.data = Bytes(amount_bytes.to_vec());
411
412        let log_random: LogMined = Fake::fake(&Faker);
413
414        tx.logs.push(log_transfer1);
415        tx.logs.push(log_random);
416        tx.logs.push(log_transfer2);
417
418        // 3. parse events
419        let events = transaction_to_events(block_timestamp, Cow::Borrowed(&tx));
420
421        // 4. assert events
422        assert_eq!(events.len(), 3); // number of accounts involved in all transactions
423        for event in events {
424            assert_eq!(&event.transaction_hash, &tx.input.hash);
425            assert_eq!(&event.contract_address, &tx.input.to.unwrap());
426            assert_eq!(&event.function_id[0..], &tx.input.input.0[0..4]);
427            assert_eq!(&event.block_number, &tx.block_number);
428            assert_eq!(&event.block_datetime, &DateTime::<Utc>::from(block_timestamp));
429
430            // assert transfers
431            match event.account_address {
432                a if a == alice.address => assert_eq!(event.transfers.len(), 1),
433                a if a == bob.address => assert_eq!(event.transfers.len(), 2),
434                a if a == charlie.address => assert_eq!(event.transfers.len(), 1),
435                _ => panic!("invalid account"),
436            }
437            for transfer in event.transfers {
438                assert_eq!(transfer.token_address, token_address);
439
440                assert!(event.account_address == transfer.credit_party_address || event.account_address == transfer.debit_party_address);
441                if transfer.direction.is_credit() {
442                    assert_eq!(event.account_address, transfer.credit_party_address);
443                } else {
444                    assert_eq!(event.account_address, transfer.debit_party_address);
445                }
446                assert_eq!(transfer.amount, amount_u256);
447
448                // assert json format
449                let transfer_json = serde_json::to_value(transfer).unwrap();
450                assert_eq!(*transfer_json.get("amount").unwrap(), json!("65535"));
451            }
452        }
453    }
454}