1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::collections::HashSet;
4use std::fmt::Debug;
5
6use alloy_primitives::B256;
7use alloy_primitives::U256;
8use chrono::DateTime;
9use chrono::Utc;
10use display_json::DebugAsJson;
11use hex_literal::hex;
12use itertools::Itertools;
13use serde::Deserialize;
14use serde::Serialize;
15use serde::ser::SerializeStruct;
16use uuid::Uuid;
17
18use crate::eth::primitives::Address;
19use crate::eth::primitives::BlockNumber;
20use crate::eth::primitives::Hash;
21use crate::eth::primitives::LogTopic;
22use crate::eth::primitives::TransactionMined;
23use crate::eth::primitives::UnixTime;
24use crate::if_else;
25
26#[derive(DebugAsJson)]
35pub struct AccountTransfers {
36 pub publication_id: Uuid,
42
43 pub publication_datetime: DateTime<Utc>,
47
48 pub account_address: Address,
52
53 pub transaction_hash: Hash,
57
58 pub transaction_index: u64,
64
65 pub contract_address: Address,
69
70 pub function_id: [u8; 4],
74
75 pub block_number: BlockNumber,
79
80 pub block_datetime: DateTime<Utc>,
84
85 pub transfers: Vec<AccountTransfer>,
87}
88
89impl AccountTransfers {
90 pub fn idempotency_key(&self) -> String {
96 format!("{}::{}", self.transaction_hash, self.account_address)
97 }
98}
99
100#[derive(DebugAsJson)]
102pub struct AccountTransfer {
103 pub token_address: Address,
109
110 pub debit_party_address: Address,
114
115 pub credit_party_address: Address,
119
120 pub direction: AccountTransferDirection,
124
125 pub amount: U256,
129}
130
131#[derive(DebugAsJson, strum::EnumIs, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub enum AccountTransferDirection {
135 Credit,
137
138 Debit,
140}
141
142impl Serialize for AccountTransfers {
147 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
148 where
149 S: serde::Serializer,
150 {
151 let mut state = serializer.serialize_struct("AccountTransfersEvent", 11)?;
152
153 state.serialize_field("publication_id", &self.publication_id.to_string())?;
154 state.serialize_field("publication_datetime", &self.publication_datetime.to_rfc3339())?;
155 state.serialize_field("idempotency_key", &self.idempotency_key())?;
156 state.serialize_field("account_address", &self.account_address)?;
157 state.serialize_field("transaction_hash", &self.transaction_hash)?;
158 state.serialize_field("contract_address", &self.contract_address)?;
159 state.serialize_field("function_id", &const_hex::encode_prefixed(self.function_id))?;
160 state.serialize_field("block_number", &self.block_number.as_u64())?;
161 state.serialize_field("block_datetime", &self.block_datetime.to_rfc3339())?;
162 state.serialize_field("transaction_index", &self.transaction_index)?;
163 state.serialize_field("transfers", &self.transfers)?;
164 state.end()
165 }
166}
167
168impl Serialize for AccountTransfer {
169 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
170 where
171 S: serde::Serializer,
172 {
173 let mut state = serializer.serialize_struct("AccountTransfer", 5)?;
174 state.serialize_field("token_address", &self.token_address)?;
175 state.serialize_field("debit_party_address", &self.debit_party_address)?;
176 state.serialize_field("credit_party_address", &self.credit_party_address)?;
177 state.serialize_field("amount", &self.amount.to_string())?;
178 state.serialize_field("direction", &self.direction)?;
179 state.end()
180 }
181}
182
183pub trait Event: Serialize + Sized + Debug {
189 fn event_key(&self) -> anyhow::Result<String>;
191
192 fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
196 Ok(HashMap::default())
197 }
198
199 fn event_payload(&self) -> anyhow::Result<String> {
203 Ok(serde_json::to_string(self)?)
204 }
205}
206
207impl Event for AccountTransfers {
208 fn event_key(&self) -> anyhow::Result<String> {
209 Ok(self.account_address.to_string())
210 }
211}
212
213const TRANSFER_EVENT: LogTopic = LogTopic(B256::new(hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")));
219
220pub fn transaction_to_events(block_timestamp: UnixTime, tx: Cow<TransactionMined>) -> Vec<AccountTransfers> {
222 let hash = tx.info.hash;
223 let transfers = tx
225 .as_ref()
226 .result
227 .execution
228 .logs
229 .iter()
230 .filter(|log| log.topic0.is_some_and(|topic0| topic0 == TRANSFER_EVENT))
231 .filter_map(|log| {
232 let amount_bytes: [u8; 32] = match log.data.0.clone().try_into() {
233 Ok(amount_bytes) => amount_bytes,
234 Err(_) => {
235 tracing::error!(
236 ?hash,
237 "bug: event identified as ERC-20 transfer should have the amount as 32 bytes in the data field"
238 );
239 return None;
240 }
241 };
242
243 let token = log.address;
244 let from: Address = log.topic1?.into();
245 let to: Address = log.topic2?.into();
246 let amount = U256::from_be_bytes(amount_bytes); Some((token, from, to, amount))
249 })
250 .collect_vec();
251
252 let mut accounts = HashSet::new();
254 for (_, from, to, _) in &transfers {
255 accounts.insert(from);
256 accounts.insert(to);
257 }
258
259 let mut events = Vec::with_capacity(accounts.len());
261 for account in accounts {
262 let mut event = AccountTransfers {
264 publication_id: Uuid::now_v7(),
265 publication_datetime: Utc::now(),
266 account_address: *account,
267 transaction_hash: tx.info.hash,
268 transaction_index: tx.mined_data.index.0,
269 contract_address: tx.evm_input.to.unwrap_or_else(|| {
270 tracing::error!(?tx.info.hash, "bug: transaction emitting transfers must have the contract address");
271 Address::ZERO
272 }),
273 function_id: tx.evm_input.data[0..4].try_into().unwrap_or_else(|_| {
274 tracing::error!(?tx.info.hash, "bug: transaction emitting transfers must have the 4-byte signature");
275 [0; 4]
276 }),
277 block_number: tx.evm_input.block_number,
278 block_datetime: block_timestamp.into(),
279 transfers: vec![],
280 };
281
282 for (token, from, to, amount) in &transfers {
284 if account != from && account != to {
285 continue;
286 }
287 let direction = if_else!(account == from, AccountTransferDirection::Debit, AccountTransferDirection::Credit);
288 let transfer = AccountTransfer {
289 token_address: *token,
290 debit_party_address: *from,
291 credit_party_address: *to,
292 direction,
293 amount: *amount,
294 };
295 event.transfers.push(transfer);
296 }
297 events.push(event);
298 }
299
300 events
301}
302
303#[cfg(test)]
308mod tests {
309 use std::borrow::Cow;
310
311 use alloy_primitives::U256;
312 use chrono::DateTime;
313 use chrono::Utc;
314 use fake::Fake;
315 use fake::Faker;
316 use serde_json::json;
317 use uuid::Uuid;
318
319 use crate::eth::primitives::Address;
320 use crate::eth::primitives::BlockNumber;
321 use crate::eth::primitives::Bytes;
322 use crate::eth::primitives::Hash;
323 use crate::eth::primitives::Log;
324 use crate::eth::primitives::TransactionMined;
325 use crate::eth::primitives::UnixTime;
326 use crate::eth::primitives::test_accounts;
327 use crate::ext::to_json_value;
328 use crate::ledger::events::AccountTransfer;
329 use crate::ledger::events::AccountTransferDirection;
330 use crate::ledger::events::AccountTransfers;
331 use crate::ledger::events::TRANSFER_EVENT;
332 use crate::ledger::events::transaction_to_events;
333
334 #[test]
335 fn ledger_events_serde_account_transfers() {
336 let event = AccountTransfers {
337 publication_id: Uuid::nil(),
338 publication_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
339 account_address: Address::ZERO,
340 transaction_hash: Hash::ZERO,
341 transaction_index: 123,
342 block_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
343 contract_address: Address::ZERO,
344 function_id: [0, 0, 0, 0],
345 block_number: BlockNumber::ZERO,
346 transfers: vec![AccountTransfer {
347 token_address: Address::ZERO,
348 debit_party_address: Address::ZERO,
349 credit_party_address: Address::ZERO,
350 amount: U256::MAX,
351 direction: AccountTransferDirection::Credit,
352 }],
353 };
354 let expected = json!(
355 {
356 "publication_id": "00000000-0000-0000-0000-000000000000",
357 "publication_datetime": "2024-10-16T19:47:50+00:00",
358 "idempotency_key": "0x0000000000000000000000000000000000000000000000000000000000000000::0x0000000000000000000000000000000000000000",
359 "account_address": "0x0000000000000000000000000000000000000000",
360 "transaction_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
361 "transaction_index": 123,
362 "contract_address":"0x0000000000000000000000000000000000000000",
363 "function_id": "0x00000000",
364 "block_number": 0,
365 "block_datetime": "2024-10-16T19:47:50+00:00",
366 "transfers": [{
367 "token_address": "0x0000000000000000000000000000000000000000",
368 "debit_party_address": "0x0000000000000000000000000000000000000000",
369 "credit_party_address": "0x0000000000000000000000000000000000000000",
370 "direction": "credit",
371 "amount": "115792089237316195423570985008687907853269984665640564039457584007913129639935"
372 }],
373 }
374 );
375 assert_eq!(to_json_value(&event), expected);
376 }
377
378 #[test]
379 fn ledger_events_serde_event_account_transfer_direction() {
380 assert_eq!(to_json_value(&AccountTransferDirection::Credit), json!("credit"));
381 assert_eq!(to_json_value(&AccountTransferDirection::Debit), json!("debit"));
382 }
383
384 #[test]
385 fn ledger_events_parse_transfer_events() {
386 let accounts = test_accounts();
388 let alice = &accounts[0];
389 let bob = &accounts[1];
390 let charlie = &accounts[2];
391 let token_address = Address::BRLC;
392 let amount_bytes = [
393 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255,
394 ];
395 let amount_u256 = U256::from_be_slice(&amount_bytes);
396
397 let block_timestamp: UnixTime = 1729108070.into();
399
400 let mut tx: TransactionMined = Fake::fake(&Faker);
402 tx.execution.evm_input.to = Some(Faker::fake(&Faker));
403 tx.execution.evm_input.data = Bytes(vec![1, 2, 3, 4, 5, 6, 7, 8]);
404
405 let mut log_transfer1: Log = Fake::fake(&Faker);
406 log_transfer1.address = token_address;
407 log_transfer1.topic0 = Some(TRANSFER_EVENT);
408 log_transfer1.topic1 = Some(alice.address.into());
409 log_transfer1.topic2 = Some(bob.address.into());
410 log_transfer1.data = Bytes(amount_bytes.to_vec());
411
412 let mut log_transfer2: Log = Fake::fake(&Faker);
413 log_transfer2.address = token_address;
414 log_transfer2.topic0 = Some(TRANSFER_EVENT);
415 log_transfer2.topic1 = Some(bob.address.into());
416 log_transfer2.topic2 = Some(charlie.address.into());
417 log_transfer2.data = Bytes(amount_bytes.to_vec());
418
419 let log_random: Log = Fake::fake(&Faker);
420
421 tx.execution.result.execution.logs.push(log_transfer1);
422 tx.execution.result.execution.logs.push(log_random);
423 tx.execution.result.execution.logs.push(log_transfer2);
424
425 let events = transaction_to_events(block_timestamp, Cow::Borrowed(&tx));
427
428 assert_eq!(events.len(), 3); for event in events {
431 assert_eq!(&event.transaction_hash, &tx.info.hash);
432 assert_eq!(&event.contract_address, &tx.evm_input.to.unwrap());
433 assert_eq!(&event.function_id[0..], &tx.evm_input.data.0[0..4]);
434 assert_eq!(&event.block_number, &tx.evm_input.block_number);
435 assert_eq!(&event.block_datetime, &DateTime::<Utc>::from(block_timestamp));
436
437 match event.account_address {
439 a if a == alice.address => assert_eq!(event.transfers.len(), 1),
440 a if a == bob.address => assert_eq!(event.transfers.len(), 2),
441 a if a == charlie.address => assert_eq!(event.transfers.len(), 1),
442 _ => panic!("invalid account"),
443 }
444 for transfer in event.transfers {
445 assert_eq!(transfer.token_address, token_address);
446
447 assert!(event.account_address == transfer.credit_party_address || event.account_address == transfer.debit_party_address);
448 if transfer.direction.is_credit() {
449 assert_eq!(event.account_address, transfer.credit_party_address);
450 } else {
451 assert_eq!(event.account_address, transfer.debit_party_address);
452 }
453 assert_eq!(transfer.amount, amount_u256);
454
455 let transfer_json = serde_json::to_value(transfer).unwrap();
457 assert_eq!(*transfer_json.get("amount").unwrap(), json!("65535"));
458 }
459 }
460 }
461}