use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use chrono::DateTime;
use chrono::Utc;
use display_json::DebugAsJson;
use ethereum_types::H256;
use ethereum_types::U256;
use hex_literal::hex;
use itertools::Itertools;
use serde::ser::SerializeStruct;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::Hash;
use crate::eth::primitives::LogTopic;
use crate::eth::primitives::TransactionMined;
use crate::eth::primitives::UnixTime;
use crate::if_else;
#[derive(DebugAsJson)]
pub struct AccountTransfers {
pub publication_id: Uuid,
pub publication_datetime: DateTime<Utc>,
pub account_address: Address,
pub transaction_hash: Hash,
pub transaction_index: u64,
pub contract_address: Address,
pub function_id: [u8; 4],
pub block_number: BlockNumber,
pub block_datetime: DateTime<Utc>,
pub transfers: Vec<AccountTransfer>,
}
impl AccountTransfers {
pub fn idempotency_key(&self) -> String {
format!("{}::{}", self.transaction_hash, self.account_address)
}
}
#[derive(DebugAsJson)]
pub struct AccountTransfer {
pub token_address: Address,
pub debit_party_address: Address,
pub credit_party_address: Address,
pub direction: AccountTransferDirection,
pub amount: U256,
}
#[derive(DebugAsJson, strum::EnumIs, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AccountTransferDirection {
Credit,
Debit,
}
impl Serialize for AccountTransfers {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("AccountTransfersEvent", 11)?;
state.serialize_field("publication_id", &self.publication_id.to_string())?;
state.serialize_field("publication_datetime", &self.publication_datetime.to_rfc3339())?;
state.serialize_field("idempotency_key", &self.idempotency_key())?;
state.serialize_field("account_address", &self.account_address)?;
state.serialize_field("transaction_hash", &self.transaction_hash)?;
state.serialize_field("contract_address", &self.contract_address)?;
state.serialize_field("function_id", &const_hex::encode_prefixed(self.function_id))?;
state.serialize_field("block_number", &self.block_number.as_u64())?;
state.serialize_field("block_datetime", &self.block_datetime.to_rfc3339())?;
state.serialize_field("transaction_index", &self.transaction_index)?;
state.serialize_field("transfers", &self.transfers)?;
state.end()
}
}
impl Serialize for AccountTransfer {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("AccountTransfer", 5)?;
state.serialize_field("token_address", &self.token_address)?;
state.serialize_field("debit_party_address", &self.debit_party_address)?;
state.serialize_field("credit_party_address", &self.credit_party_address)?;
state.serialize_field("amount", &self.amount.to_string())?;
state.serialize_field("direction", &self.direction)?;
state.end()
}
}
pub trait Event: Serialize + Sized + Debug {
fn event_key(&self) -> anyhow::Result<String>;
fn event_headers(&self) -> anyhow::Result<HashMap<String, String>> {
Ok(HashMap::default())
}
fn event_payload(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string(self)?)
}
}
impl Event for AccountTransfers {
fn event_key(&self) -> anyhow::Result<String> {
Ok(self.account_address.to_string())
}
}
const TRANSFER_EVENT: LogTopic = LogTopic(H256(hex!("ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")));
pub fn transaction_to_events(block_timestamp: UnixTime, tx: Cow<TransactionMined>) -> Vec<AccountTransfers> {
let transfers = tx
.as_ref()
.logs
.iter()
.filter(|log| log.log.topic0.is_some_and(|topic0| topic0 == TRANSFER_EVENT))
.filter_map(|log| {
let amount_bytes: [u8; 32] = match log.log.data.0.clone().try_into() {
Ok(amount_bytes) => amount_bytes,
Err(_) => {
tracing::error!(?log.transaction_hash, "bug: event identified as ERC-20 transfer should have the amount as 32 bytes in the data field");
return None;
}
};
let token = log.log.address;
let from: Address = log.log.topic1?.into();
let to: Address = log.log.topic2?.into();
let amount = U256::from_big_endian(&amount_bytes); Some((token, from, to, amount))
})
.collect_vec();
let mut accounts = HashSet::new();
for (_, from, to, _) in &transfers {
accounts.insert(from);
accounts.insert(to);
}
let mut events = Vec::with_capacity(accounts.len());
for account in accounts {
let mut event = AccountTransfers {
publication_id: Uuid::now_v7(),
publication_datetime: Utc::now(),
account_address: *account,
transaction_hash: tx.input.hash,
transaction_index: tx.transaction_index.0,
contract_address: tx.input.to.unwrap_or_else(|| {
tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the contract address");
Address::ZERO
}),
function_id: tx.input.input[0..4].try_into().unwrap_or_else(|_| {
tracing::error!(?tx.input.hash, "bug: transaction emitting transfers must have the 4-byte signature");
[0; 4]
}),
block_number: tx.block_number,
block_datetime: block_timestamp.into(),
transfers: vec![],
};
for (token, from, to, amount) in &transfers {
if account != from && account != to {
continue;
}
let direction = if_else!(account == from, AccountTransferDirection::Debit, AccountTransferDirection::Credit);
let transfer = AccountTransfer {
token_address: *token,
debit_party_address: *from,
credit_party_address: *to,
direction,
amount: *amount,
};
event.transfers.push(transfer);
}
events.push(event);
}
events
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use chrono::DateTime;
use chrono::Utc;
use ethereum_types::U256;
use fake::Fake;
use fake::Faker;
use serde_json::json;
use uuid::Uuid;
use crate::eth::primitives::test_accounts;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::Bytes;
use crate::eth::primitives::Hash;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::TransactionMined;
use crate::eth::primitives::UnixTime;
use crate::ext::to_json_value;
use crate::ledger::events::transaction_to_events;
use crate::ledger::events::AccountTransfer;
use crate::ledger::events::AccountTransferDirection;
use crate::ledger::events::AccountTransfers;
use crate::ledger::events::TRANSFER_EVENT;
#[test]
fn ledger_events_serde_account_transfers() {
let event = AccountTransfers {
publication_id: Uuid::nil(),
publication_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
account_address: Address::ZERO,
transaction_hash: Hash::ZERO,
transaction_index: 123,
block_datetime: "2024-10-16T19:47:50Z".parse().unwrap(),
contract_address: Address::ZERO,
function_id: [0, 0, 0, 0],
block_number: BlockNumber::ZERO,
transfers: vec![AccountTransfer {
token_address: Address::ZERO,
debit_party_address: Address::ZERO,
credit_party_address: Address::ZERO,
amount: U256::max_value(),
direction: AccountTransferDirection::Credit,
}],
};
let expected = json!(
{
"publication_id": "00000000-0000-0000-0000-000000000000",
"publication_datetime": "2024-10-16T19:47:50+00:00",
"idempotency_key": "0x0000000000000000000000000000000000000000000000000000000000000000::0x0000000000000000000000000000000000000000",
"account_address": "0x0000000000000000000000000000000000000000",
"transaction_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"transaction_index": 123,
"contract_address":"0x0000000000000000000000000000000000000000",
"function_id": "0x00000000",
"block_number": 0,
"block_datetime": "2024-10-16T19:47:50+00:00",
"transfers": [{
"token_address": "0x0000000000000000000000000000000000000000",
"debit_party_address": "0x0000000000000000000000000000000000000000",
"credit_party_address": "0x0000000000000000000000000000000000000000",
"direction": "credit",
"amount": "115792089237316195423570985008687907853269984665640564039457584007913129639935"
}],
}
);
assert_eq!(to_json_value(&event), expected);
}
#[test]
fn ledger_events_serde_event_account_transfer_direction() {
assert_eq!(to_json_value(&AccountTransferDirection::Credit), json!("credit"));
assert_eq!(to_json_value(&AccountTransferDirection::Debit), json!("debit"));
}
#[test]
fn ledger_events_parse_transfer_events() {
let accounts = test_accounts();
let alice = &accounts[0];
let bob = &accounts[1];
let charlie = &accounts[2];
let token_address = Address::BRLC;
let amount_bytes = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255,
];
let amount_u256 = U256::from_big_endian(&amount_bytes);
let block_timestamp: UnixTime = 1729108070.into();
let mut tx: TransactionMined = Fake::fake(&Faker);
tx.input.input = Bytes(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let mut log_transfer1: LogMined = Fake::fake(&Faker);
log_transfer1.log.address = token_address;
log_transfer1.log.topic0 = Some(TRANSFER_EVENT);
log_transfer1.log.topic1 = Some(alice.address.into());
log_transfer1.log.topic2 = Some(bob.address.into());
log_transfer1.log.data = Bytes(amount_bytes.to_vec());
let mut log_transfer2: LogMined = Fake::fake(&Faker);
log_transfer2.log.address = token_address;
log_transfer2.log.topic0 = Some(TRANSFER_EVENT);
log_transfer2.log.topic1 = Some(bob.address.into());
log_transfer2.log.topic2 = Some(charlie.address.into());
log_transfer2.log.data = Bytes(amount_bytes.to_vec());
let log_random: LogMined = Fake::fake(&Faker);
tx.logs.push(log_transfer1);
tx.logs.push(log_random);
tx.logs.push(log_transfer2);
let events = transaction_to_events(block_timestamp, Cow::Borrowed(&tx));
assert_eq!(events.len(), 3); for event in events {
assert_eq!(&event.transaction_hash, &tx.input.hash);
assert_eq!(&event.contract_address, &tx.input.to.unwrap());
assert_eq!(&event.function_id[0..], &tx.input.input.0[0..4]);
assert_eq!(&event.block_number, &tx.block_number);
assert_eq!(&event.block_datetime, &DateTime::<Utc>::from(block_timestamp));
match event.account_address {
a if a == alice.address => assert_eq!(event.transfers.len(), 1),
a if a == bob.address => assert_eq!(event.transfers.len(), 2),
a if a == charlie.address => assert_eq!(event.transfers.len(), 1),
_ => panic!("invalid account"),
}
for transfer in event.transfers {
assert_eq!(transfer.token_address, token_address);
assert!(event.account_address == transfer.credit_party_address || event.account_address == transfer.debit_party_address);
if transfer.direction.is_credit() {
assert_eq!(event.account_address, transfer.credit_party_address);
} else {
assert_eq!(event.account_address, transfer.debit_party_address);
}
assert_eq!(transfer.amount, amount_u256);
let transfer_json = serde_json::to_value(transfer).unwrap();
assert_eq!(*transfer_json.get("amount").unwrap(), json!("65535"));
}
}
}
}