1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::fmt::Debug;
5
6use alloy_primitives::B256;
7use alloy_primitives::U256;
8use chrono::DateTime;
9use chrono::Utc;
10use display_json::DebugAsJson;
11use hex_literal::hex;
12use itertools::Itertools;
13use serde::Deserialize;
14use serde::Serialize;
15use serde::ser::SerializeStruct;
16use uuid::Uuid;
17
18use crate::eth::primitives::Address;
19use crate::eth::primitives::BlockNumber;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogTopic;
22use crate::eth::primitives::TransactionMined;
23use crate::eth::primitives::UnixTime;
24use crate::if_else;
25
26#[derive(DebugAsJson)]
35pub struct AccountTransfers {
36 pub publication_id: Uuid,
42
43 pub publication_datetime: DateTime<Utc>,
47
48 pub account_address: Address,
52
53 pub transaction_hash: Hash,
57
58 pub transaction_index: u64,
64
65 pub contract_address: Address,
69
70 pub function_id: [u8; 4],
74
75 pub block_number: BlockNumber,
79
80 pub block_datetime: DateTime<Utc>,
84
85 pub transfers: Vec<AccountTransfer>,
87}
88
89impl AccountTransfers {
90 pub fn idempotency_key(&self) -> String {
96 format!("{}::{}", self.transaction_hash, self.account_address)
97 }
98}
99
100#[derive(DebugAsJson)]
102pub struct AccountTransfer {
103 pub token_address: Address,
109
110 pub debit_party_address: Address,
114
115 pub credit_party_address: Address,
119
120 pub direction: AccountTransferDirection,
124
125 pub amount: U256,
129}
130
131#[derive(DebugAsJson, strum::EnumIs, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum AccountTransferDirection {
135 Credit,
137
138 Debit,
140}
141
142impl Serialize for AccountTransfers {
147 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148 where
149 S: serde::Serializer,
150 {
151 let mut state = serializer.serialize_struct("AccountTransfersEvent", 11)?;
152
153 state.serialize_field("publication_id", &self.publication_id.to_string())?;
154 state.serialize_field("publication_datetime", &self.publication_datetime.to_rfc3339())?;
155 state.serialize_field("idempotency_key", &self.idempotency_key())?;
156 state.serialize_field("account_address", &self.account_address)?;
157 state.serialize_field("transaction_hash", &self.transaction_hash)?;
158 state.serialize_field("contract_address", &self.contract_address)?;
159 state.serialize_field("function_id", &const_hex::encode_prefixed(self.function_id))?;
160 state.serialize_field("block_number", &self.block_number.as_u64())?;
161 state.serialize_field("block_datetime", &self.block_datetime.to_rfc3339())?;
162 state.serialize_field("transaction_index", &self.transaction_index)?;
163 state.serialize_field("transfers", &self.transfers)?;
164 state.end()
165 }
166}
167
168impl Serialize for AccountTransfer {
169 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170 where
171 S: serde::Serializer,
172 {
173 let mut state = serializer.serialize_struct("AccountTransfer", 5)?;
174 state.serialize_field("token_address", &self.token_address)?;
175 state.serialize_field("debit_party_address", &self.debit_party_address)?;
176 state.serialize_field("credit_party_address", &self.credit_party_address)?;
177 state.serialize_field("amount", &self.amount.to_string())?;
178 state.serialize_field("direction", &self.direction)?;
179 state.end()
180 }
181}
182
183pub trait Event: Serialize + Sized + Debug {
189 fn event_key(&self) -> anyhow::Result<String>;
191
192 fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
196 Ok(HashMap::default())
197 }
198
199 fn event_payload(&self) -> anyhow::Result<String> {
203 Ok(serde_json::to_string(self)?)
204 }
205}
206
207impl Event for AccountTransfers {
208 fn event_key(&self) -> anyhow::Result<String> {
209 Ok(self.account_address.to_string())
210 }
211}
212
213const TRANSFER_EVENT: LogTopic = LogTopic(B256::new(hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")));
219
220pub fn transaction_to_events(block_timestamp: UnixTime, tx: Cow<TransactionMined>) -> Vec<AccountTransfers> {
222 let transfers = tx
224 .as_ref()
225 .logs
226 .iter()
227 .filter(|log| log.log.topic0.is_some_and(|topic0| topic0 == TRANSFER_EVENT))
228 .filter_map(|log| {
229 let amount_bytes: [u8; 32] = match log.log.data.0.clone().try_into() {
230 Ok(amount_bytes) => amount_bytes,
231 Err(_) => {
232 tracing::error!(?log.transaction_hash, "bug: event identified as ERC-20 transfer should have the amount as 32 bytes in the data field");
233 return None;
234 }
235 };
236
237 let token = log.log.address;
238 let from: Address = log.log.topic1?.into();
239 let to: Address = log.log.topic2?.into();
240 let amount = U256::from_be_bytes(amount_bytes); Some((token, from, to, amount))
243 })
244 .collect_vec();
245
246 let mut accounts = HashSet::new();
248 for (_, from, to, _) in &transfers {
249 accounts.insert(from);
250 accounts.insert(to);
251 }
252
253 let mut events = Vec::with_capacity(accounts.len());
255 for account in accounts {
256 let mut event = AccountTransfers {
258 publication_id: Uuid::now_v7(),
259 publication_datetime: Utc::now(),
260 account_address: *account,
261 transaction_hash: tx.input.hash,
262 transaction_index: tx.transaction_index.0,
263 contract_address: tx.input.to.unwrap_or_else(|| {
264 tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the contract address");
265 Address::ZERO
266 }),
267 function_id: tx.input.input[0..4].try_into().unwrap_or_else(|_| {
268 tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the 4-byte signature");
269 [0; 4]
270 }),
271 block_number: tx.block_number,
272 block_datetime: block_timestamp.into(),
273 transfers: vec![],
274 };
275
276 for (token, from, to, amount) in &transfers {
278 if account != from && account != to {
279 continue;
280 }
281 let direction = if_else!(account == from, AccountTransferDirection::Debit, AccountTransferDirection::Credit);
282 let transfer = AccountTransfer {
283 token_address: *token,
284 debit_party_address: *from,
285 credit_party_address: *to,
286 direction,
287 amount: *amount,
288 };
289 event.transfers.push(transfer);
290 }
291 events.push(event);
292 }
293
294 events
295}
296
297#[cfg(test)]
302mod tests {
303 use std::borrow::Cow;
304
305 use alloy_primitives::U256;
306 use chrono::DateTime;
307 use chrono::Utc;
308 use fake::Fake;
309 use fake::Faker;
310 use serde_json::json;
311 use uuid::Uuid;
312
313 use crate::eth::primitives::Address;
314 use crate::eth::primitives::BlockNumber;
315 use crate::eth::primitives::Bytes;
316 use crate::eth::primitives::Hash;
317 use crate::eth::primitives::LogMined;
318 use crate::eth::primitives::TransactionMined;
319 use crate::eth::primitives::UnixTime;
320 use crate::eth::primitives::test_accounts;
321 use crate::ext::to_json_value;
322 use crate::ledger::events::AccountTransfer;
323 use crate::ledger::events::AccountTransferDirection;
324 use crate::ledger::events::AccountTransfers;
325 use crate::ledger::events::TRANSFER_EVENT;
326 use crate::ledger::events::transaction_to_events;
327
328 #[test]
329 fn ledger_events_serde_account_transfers() {
330 let event = AccountTransfers {
331 publication_id: Uuid::nil(),
332 publication_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
333 account_address: Address::ZERO,
334 transaction_hash: Hash::ZERO,
335 transaction_index: 123,
336 block_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
337 contract_address: Address::ZERO,
338 function_id: [0, 0, 0, 0],
339 block_number: BlockNumber::ZERO,
340 transfers: vec![AccountTransfer {
341 token_address: Address::ZERO,
342 debit_party_address: Address::ZERO,
343 credit_party_address: Address::ZERO,
344 amount: U256::MAX,
345 direction: AccountTransferDirection::Credit,
346 }],
347 };
348 let expected = json!(
349 {
350 "publication_id": "00000000-0000-0000-0000-000000000000",
351 "publication_datetime": "2024-10-16T19:47:50+00:00",
352 "idempotency_key": "0x0000000000000000000000000000000000000000000000000000000000000000::0x0000000000000000000000000000000000000000",
353 "account_address": "0x0000000000000000000000000000000000000000",
354 "transaction_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
355 "transaction_index": 123,
356 "contract_address":"0x0000000000000000000000000000000000000000",
357 "function_id": "0x00000000",
358 "block_number": 0,
359 "block_datetime": "2024-10-16T19:47:50+00:00",
360 "transfers": [{
361 "token_address": "0x0000000000000000000000000000000000000000",
362 "debit_party_address": "0x0000000000000000000000000000000000000000",
363 "credit_party_address": "0x0000000000000000000000000000000000000000",
364 "direction": "credit",
365 "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639935"
366 }],
367 }
368 );
369 assert_eq!(to_json_value(&event), expected);
370 }
371
372 #[test]
373 fn ledger_events_serde_event_account_transfer_direction() {
374 assert_eq!(to_json_value(&AccountTransferDirection::Credit), json!("credit"));
375 assert_eq!(to_json_value(&AccountTransferDirection::Debit), json!("debit"));
376 }
377
378 #[test]
379 fn ledger_events_parse_transfer_events() {
380 let accounts = test_accounts();
382 let alice = &accounts[0];
383 let bob = &accounts[1];
384 let charlie = &accounts[2];
385 let token_address = Address::BRLC;
386 let amount_bytes = [
387 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255,
388 ];
389 let amount_u256 = U256::from_be_slice(&amount_bytes);
390
391 let block_timestamp: UnixTime = 1729108070.into();
393
394 let mut tx: TransactionMined = Fake::fake(&Faker);
396 tx.input.input = Bytes(vec![1, 2, 3, 4, 5, 6, 7, 8]);
397
398 let mut log_transfer1: LogMined = Fake::fake(&Faker);
399 log_transfer1.log.address = token_address;
400 log_transfer1.log.topic0 = Some(TRANSFER_EVENT);
401 log_transfer1.log.topic1 = Some(alice.address.into());
402 log_transfer1.log.topic2 = Some(bob.address.into());
403 log_transfer1.log.data = Bytes(amount_bytes.to_vec());
404
405 let mut log_transfer2: LogMined = Fake::fake(&Faker);
406 log_transfer2.log.address = token_address;
407 log_transfer2.log.topic0 = Some(TRANSFER_EVENT);
408 log_transfer2.log.topic1 = Some(bob.address.into());
409 log_transfer2.log.topic2 = Some(charlie.address.into());
410 log_transfer2.log.data = Bytes(amount_bytes.to_vec());
411
412 let log_random: LogMined = Fake::fake(&Faker);
413
414 tx.logs.push(log_transfer1);
415 tx.logs.push(log_random);
416 tx.logs.push(log_transfer2);
417
418 let events = transaction_to_events(block_timestamp, Cow::Borrowed(&tx));
420
421 assert_eq!(events.len(), 3); for event in events {
424 assert_eq!(&event.transaction_hash, &tx.input.hash);
425 assert_eq!(&event.contract_address, &tx.input.to.unwrap());
426 assert_eq!(&event.function_id[0..], &tx.input.input.0[0..4]);
427 assert_eq!(&event.block_number, &tx.block_number);
428 assert_eq!(&event.block_datetime, &DateTime::<Utc>::from(block_timestamp));
429
430 match event.account_address {
432 a if a == alice.address => assert_eq!(event.transfers.len(), 1),
433 a if a == bob.address => assert_eq!(event.transfers.len(), 2),
434 a if a == charlie.address => assert_eq!(event.transfers.len(), 1),
435 _ => panic!("invalid account"),
436 }
437 for transfer in event.transfers {
438 assert_eq!(transfer.token_address, token_address);
439
440 assert!(event.account_address == transfer.credit_party_address || event.account_address == transfer.debit_party_address);
441 if transfer.direction.is_credit() {
442 assert_eq!(event.account_address, transfer.credit_party_address);
443 } else {
444 assert_eq!(event.account_address, transfer.debit_party_address);
445 }
446 assert_eq!(transfer.amount, amount_u256);
447
448 let transfer_json = serde_json::to_value(transfer).unwrap();
450 assert_eq!(*transfer_json.get("amount").unwrap(), json!("65535"));
451 }
452 }
453 }
454}