stratus/ledger/
events.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::fmt::Debug;
5
6use alloy_primitives::B256;
7use alloy_primitives::U256;
8use chrono::DateTime;
9use chrono::Utc;
10use display_json::DebugAsJson;
11use hex_literal::hex;
12use itertools::Itertools;
13use serde::Deserialize;
14use serde::Serialize;
15use serde::ser::SerializeStruct;
16use uuid::Uuid;
17
18use crate::eth::primitives::Address;
19use crate::eth::primitives::BlockNumber;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogTopic;
22use crate::eth::primitives::TransactionMined;
23use crate::eth::primitives::UnixTime;
24use crate::if_else;
25
26/// Represents token transfers (debits and credits) associated with a specific Ethereum account within a single transaction.
27///
28/// The `account_address` field identifies the primary account involved in these transfers.
29///
30/// An event will be generated for each account involved in a transaction, meaning if a transaction is not the primary in this event,
31/// in another event it will treated as the primary and credit and debit operations adjusted accordingly.
32///
33/// A single event can contain multiple token transfers (e.g., a customer is debited for a card payment but receives a credit as cashback)
34#[derive(DebugAsJson)]
35pub struct AccountTransfers {
36    /// ID of the event publication.
37    ///
38    /// If the event is republished, a new ID will be generated for the publication.
39    ///
40    /// Format: UUID v7 (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
41    pub publication_id: Uuid,
42
43    /// Datetime of the event publication.
44    ///
45    /// Format: ISO 8601
46    pub publication_datetime: DateTime<Utc>,
47
48    /// Address of the account that is part of all transfers. Also referenced as primary account address.
49    ///
50    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
51    pub account_address: Address,
52
53    /// Hash of the Ethereum transaction that originated transfers.
54    ///
55    /// Format: Prefixed transaction hash - 32 bytes - 0x1234567890123456789012345678901234567890123456789012345678901234
56    pub transaction_hash: Hash,
57
58    /// Index of the transaction in the block where it was generated.
59    ///
60    /// Used for ordering multiple events from the same user that happened in the same block.
61    ///
62    /// Format: Integer in base 10 - Range: 0 to [`u64::MAX`]
63    pub transaction_index: u64,
64
65    /// Address of the contract that originated transfers.
66    ///
67    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
68    pub contract_address: Address,
69
70    /// Identifier of the Ethereum function that originated transfers.
71    ///
72    /// Format: Prefixed function signature - 4 bytes - 0x12345678
73    pub function_id: [u8; 4],
74
75    /// Number of the block that originated transfers.
76    ///
77    /// Format: Integer in base 10 - Range: 0 to [`u64::MAX`]
78    pub block_number: BlockNumber,
79
80    /// Datetime of the Ethereum block that originated transfers.
81    ///
82    /// Format: ISO 8601
83    pub block_datetime: DateTime<Utc>,
84
85    /// List of transfers the `account_address` is part of.
86    pub transfers: Vec<AccountTransfer>,
87}
88
89impl AccountTransfers {
90    /// Idempotency key of the event payload.
91    ///
92    /// It is unique for each distinct event, but consistent across retries or republications of the same event payload.
93    ///
94    /// Format: String - transaction_hash::account_address - 0x1234567890123456789012345678901234567890123456789012345678901234::0x1234567890123456789012345678901234567890
95    pub fn idempotency_key(&self) -> String {
96        format!("{}::{}", self.transaction_hash, self.account_address)
97    }
98}
99
100/// Represents a token transfer between a debit party and a credit party that happened inside a transaction.
101#[derive(DebugAsJson)]
102pub struct AccountTransfer {
103    /// Address of the token contract that executed the transfer between `debit_party_address` and `credit_party_address`.
104    ///
105    /// It may differ from the `contract_address` because any contract can execute transfers in token contracts.
106    ///
107    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
108    pub token_address: Address,
109
110    /// Address of the account from which the token was subtracted.
111    ///
112    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
113    pub debit_party_address: Address,
114
115    /// Address of the account to which the token was added.
116    ///
117    /// Format: Prefixed account address - 20 bytes - 0x1234567890123456789012345678901234567890
118    pub credit_party_address: Address,
119
120    /// Direction of the transfer relative to the primary account address (credit or debit).
121    ///
122    /// Format: [debit, credit]
123    pub direction: AccountTransferDirection,
124
125    /// Amount transferred from debit party to credit party.
126    ///
127    /// Format: Integer in base 10 - Formatted as String to avoid losing precision - Range: 0 to [`U256::MAX`].
128    pub amount: U256,
129}
130
131/// Direction of a transfer relative to the primary account address.
132#[derive(DebugAsJson, strum::EnumIs, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum AccountTransferDirection {
135    /// `account_address` is being credited.
136    Credit,
137
138    /// `account_address` is being debited.
139    Debit,
140}
141
142// -----------------------------------------------------------------------------
143// Serializers
144// -----------------------------------------------------------------------------
145
146impl Serialize for AccountTransfers {
147    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148    where
149        S: serde::Serializer,
150    {
151        let mut state = serializer.serialize_struct("AccountTransfersEvent", 11)?;
152
153        state.serialize_field("publication_id", &self.publication_id.to_string())?;
154        state.serialize_field("publication_datetime", &self.publication_datetime.to_rfc3339())?;
155        state.serialize_field("idempotency_key", &self.idempotency_key())?;
156        state.serialize_field("account_address", &self.account_address)?;
157        state.serialize_field("transaction_hash", &self.transaction_hash)?;
158        state.serialize_field("contract_address", &self.contract_address)?;
159        state.serialize_field("function_id", &const_hex::encode_prefixed(self.function_id))?;
160        state.serialize_field("block_number", &self.block_number.as_u64())?;
161        state.serialize_field("block_datetime", &self.block_datetime.to_rfc3339())?;
162        state.serialize_field("transaction_index", &self.transaction_index)?;
163        state.serialize_field("transfers", &self.transfers)?;
164        state.end()
165    }
166}
167
168impl Serialize for AccountTransfer {
169    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170    where
171        S: serde::Serializer,
172    {
173        let mut state = serializer.serialize_struct("AccountTransfer", 5)?;
174        state.serialize_field("token_address", &self.token_address)?;
175        state.serialize_field("debit_party_address", &self.debit_party_address)?;
176        state.serialize_field("credit_party_address", &self.credit_party_address)?;
177        state.serialize_field("amount", &self.amount.to_string())?;
178        state.serialize_field("direction", &self.direction)?;
179        state.end()
180    }
181}
182
183// -----------------------------------------------------------------------------
184// Marker Trait
185// -----------------------------------------------------------------------------
186
187/// Struct is an event that can be published to external systems.
188pub trait Event: Serialize + Sized + Debug {
189    /// Returns the partition key component of the event.
190    fn event_key(&self) -> anyhow::Result<String>;
191
192    /// Returns the headers component of the event.
193    ///
194    /// By default, it returns empty headers.
195    fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
196        Ok(HashMap::default())
197    }
198
199    /// Returns the payload component of the event.
200    ///
201    /// By default, it serializes the implementing struct as JSON.
202    fn event_payload(&self) -> anyhow::Result<String> {
203        Ok(serde_json::to_string(self)?)
204    }
205}
206
207impl Event for AccountTransfers {
208    fn event_key(&self) -> anyhow::Result<String> {
209        Ok(self.account_address.to_string())
210    }
211}
212
213// -----------------------------------------------------------------------------
214// Conversions
215// -----------------------------------------------------------------------------
216
217/// ERC-20 transfer event hash.
218const TRANSFER_EVENT: LogTopic = LogTopic(B256::new(hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")));
219
220/// Converts a mined transaction into multiple account transfers events to be published.
221pub fn transaction_to_events(block_timestamp: UnixTime, tx: Cow<TransactionMined>) -> Vec<AccountTransfers> {
222    let hash = tx.info.hash;
223    // identify token transfers in transaction
224    let transfers = tx
225        .as_ref()
226        .result
227        .execution
228        .logs
229        .iter()
230        .filter(|log| log.topic0.is_some_and(|topic0| topic0 == TRANSFER_EVENT))
231        .filter_map(|log| {
232            let amount_bytes: [u8; 32] = match log.data.0.clone().try_into() {
233                Ok(amount_bytes) => amount_bytes,
234                Err(_) => {
235                    tracing::error!(
236                        ?hash,
237                        "bug: event identified as ERC-20 transfer should have the amount as 32 bytes in the data field"
238                    );
239                    return None;
240                }
241            };
242
243            let token = log.address;
244            let from: Address = log.topic1?.into();
245            let to: Address = log.topic2?.into();
246            let amount = U256::from_be_bytes(amount_bytes); // TODO: review
247
248            Some((token, from, to, amount))
249        })
250        .collect_vec();
251
252    // identify accounts involved in transfers
253    let mut accounts = HashSet::new();
254    for (_, from, to, _) in &transfers {
255        accounts.insert(from);
256        accounts.insert(to);
257    }
258
259    // for each account, generate an event
260    let mut events = Vec::with_capacity(accounts.len());
261    for account in accounts {
262        // generate base event
263        let mut event = AccountTransfers {
264            publication_id: Uuid::now_v7(),
265            publication_datetime: Utc::now(),
266            account_address: *account,
267            transaction_hash: tx.info.hash,
268            transaction_index: tx.mined_data.index.0,
269            contract_address: tx.evm_input.to.unwrap_or_else(|| {
270                tracing::error!(?tx.info.hash, "bug: transaction emitting transfers must have the contract address");
271                Address::ZERO
272            }),
273            function_id: tx.evm_input.data[0..4].try_into().unwrap_or_else(|_| {
274                tracing::error!(?tx.info.hash, "bug: transaction emitting transfers must have the 4-byte signature");
275                [0; 4]
276            }),
277            block_number: tx.evm_input.block_number,
278            block_datetime: block_timestamp.into(),
279            transfers: vec![],
280        };
281
282        // generate transfers
283        for (token, from, to, amount) in &transfers {
284            if account != from && account != to {
285                continue;
286            }
287            let direction = if_else!(account == from, AccountTransferDirection::Debit, AccountTransferDirection::Credit);
288            let transfer = AccountTransfer {
289                token_address: *token,
290                debit_party_address: *from,
291                credit_party_address: *to,
292                direction,
293                amount: *amount,
294            };
295            event.transfers.push(transfer);
296        }
297        events.push(event);
298    }
299
300    events
301}
302
303// -----------------------------------------------------------------------------
304// Tests
305// -----------------------------------------------------------------------------
306
307#[cfg(test)]
308mod tests {
309    use std::borrow::Cow;
310
311    use alloy_primitives::U256;
312    use chrono::DateTime;
313    use chrono::Utc;
314    use fake::Fake;
315    use fake::Faker;
316    use serde_json::json;
317    use uuid::Uuid;
318
319    use crate::eth::primitives::Address;
320    use crate::eth::primitives::BlockNumber;
321    use crate::eth::primitives::Bytes;
322    use crate::eth::primitives::Hash;
323    use crate::eth::primitives::Log;
324    use crate::eth::primitives::TransactionMined;
325    use crate::eth::primitives::UnixTime;
326    use crate::eth::primitives::test_accounts;
327    use crate::ext::to_json_value;
328    use crate::ledger::events::AccountTransfer;
329    use crate::ledger::events::AccountTransferDirection;
330    use crate::ledger::events::AccountTransfers;
331    use crate::ledger::events::TRANSFER_EVENT;
332    use crate::ledger::events::transaction_to_events;
333
334    #[test]
335    fn ledger_events_serde_account_transfers() {
336        let event = AccountTransfers {
337            publication_id: Uuid::nil(),
338            publication_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
339            account_address: Address::ZERO,
340            transaction_hash: Hash::ZERO,
341            transaction_index: 123,
342            block_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
343            contract_address: Address::ZERO,
344            function_id: [0, 0, 0, 0],
345            block_number: BlockNumber::ZERO,
346            transfers: vec![AccountTransfer {
347                token_address: Address::ZERO,
348                debit_party_address: Address::ZERO,
349                credit_party_address: Address::ZERO,
350                amount: U256::MAX,
351                direction: AccountTransferDirection::Credit,
352            }],
353        };
354        let expected = json!(
355            {
356                "publication_id": "00000000-0000-0000-0000-000000000000",
357                "publication_datetime": "2024-10-16T19:47:50+00:00",
358                "idempotency_key": "0x0000000000000000000000000000000000000000000000000000000000000000::0x0000000000000000000000000000000000000000",
359                "account_address": "0x0000000000000000000000000000000000000000",
360                "transaction_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
361                "transaction_index": 123,
362                "contract_address":"0x0000000000000000000000000000000000000000",
363                "function_id": "0x00000000",
364                "block_number": 0,
365                "block_datetime": "2024-10-16T19:47:50+00:00",
366                "transfers": [{
367                    "token_address": "0x0000000000000000000000000000000000000000",
368                    "debit_party_address": "0x0000000000000000000000000000000000000000",
369                    "credit_party_address": "0x0000000000000000000000000000000000000000",
370                    "direction": "credit",
371                    "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639935"
372                }],
373            }
374        );
375        assert_eq!(to_json_value(&event), expected);
376    }
377
378    #[test]
379    fn ledger_events_serde_event_account_transfer_direction() {
380        assert_eq!(to_json_value(&AccountTransferDirection::Credit), json!("credit"));
381        assert_eq!(to_json_value(&AccountTransferDirection::Debit), json!("debit"));
382    }
383
384    #[test]
385    fn ledger_events_parse_transfer_events() {
386        // reference values
387        let accounts = test_accounts();
388        let alice = &accounts[0];
389        let bob = &accounts[1];
390        let charlie = &accounts[2];
391        let token_address = Address::BRLC;
392        let amount_bytes = [
393            0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255,
394        ];
395        let amount_u256 = U256::from_be_slice(&amount_bytes);
396
397        // 1. generate fake block data transaction and block data
398        let block_timestamp: UnixTime = 1729108070.into();
399
400        // 2. generate fake tx data
401        let mut tx: TransactionMined = Fake::fake(&Faker);
402        tx.execution.evm_input.to = Some(Faker::fake(&Faker));
403        tx.execution.evm_input.data = Bytes(vec![1, 2, 3, 4, 5, 6, 7, 8]);
404
405        let mut log_transfer1: Log = Fake::fake(&Faker);
406        log_transfer1.address = token_address;
407        log_transfer1.topic0 = Some(TRANSFER_EVENT);
408        log_transfer1.topic1 = Some(alice.address.into());
409        log_transfer1.topic2 = Some(bob.address.into());
410        log_transfer1.data = Bytes(amount_bytes.to_vec());
411
412        let mut log_transfer2: Log = Fake::fake(&Faker);
413        log_transfer2.address = token_address;
414        log_transfer2.topic0 = Some(TRANSFER_EVENT);
415        log_transfer2.topic1 = Some(bob.address.into());
416        log_transfer2.topic2 = Some(charlie.address.into());
417        log_transfer2.data = Bytes(amount_bytes.to_vec());
418
419        let log_random: Log = Fake::fake(&Faker);
420
421        tx.execution.result.execution.logs.push(log_transfer1);
422        tx.execution.result.execution.logs.push(log_random);
423        tx.execution.result.execution.logs.push(log_transfer2);
424
425        // 3. parse events
426        let events = transaction_to_events(block_timestamp, Cow::Borrowed(&tx));
427
428        // 4. assert events
429        assert_eq!(events.len(), 3); // number of accounts involved in all transactions
430        for event in events {
431            assert_eq!(&event.transaction_hash, &tx.info.hash);
432            assert_eq!(&event.contract_address, &tx.evm_input.to.unwrap());
433            assert_eq!(&event.function_id[0..], &tx.evm_input.data.0[0..4]);
434            assert_eq!(&event.block_number, &tx.evm_input.block_number);
435            assert_eq!(&event.block_datetime, &DateTime::<Utc>::from(block_timestamp));
436
437            // assert transfers
438            match event.account_address {
439                a if a == alice.address => assert_eq!(event.transfers.len(), 1),
440                a if a == bob.address => assert_eq!(event.transfers.len(), 2),
441                a if a == charlie.address => assert_eq!(event.transfers.len(), 1),
442                _ => panic!("invalid account"),
443            }
444            for transfer in event.transfers {
445                assert_eq!(transfer.token_address, token_address);
446
447                assert!(event.account_address == transfer.credit_party_address || event.account_address == transfer.debit_party_address);
448                if transfer.direction.is_credit() {
449                    assert_eq!(event.account_address, transfer.credit_party_address);
450                } else {
451                    assert_eq!(event.account_address, transfer.debit_party_address);
452                }
453                assert_eq!(transfer.amount, amount_u256);
454
455                // assert json format
456                let transfer_json = serde_json::to_value(transfer).unwrap();
457                assert_eq!(*transfer_json.get("amount").unwrap(), json!("65535"));
458            }
459        }
460    }
461}