binance-p2p-market-history/p2p-collector/normalizer.py
Gabriel Ramos 2c41a7a6b3 feat: implement binance p2p collector daemon
Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
2026-06-05 14:40:05 -04:00

99 lines
4.1 KiB
Python

import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict:
"""
Normalizes a single P2P ad dictionary from the Binance API response
into a flattened dictionary schema matching the spec.
"""
adv = raw_ad.get("adv", {})
adver = raw_ad.get("advertiser", {})
# Check if critical structures are missing
if not adv:
logger.warning("Ad structure 'adv' is missing in raw ad data.")
if not adver:
logger.warning("Advertiser structure 'advertiser' is missing in raw ad data.")
# Extract payment methods
trade_methods = adv.get("tradeMethods") or []
payment_methods = []
payment_method_ids = []
for m in trade_methods:
if isinstance(m, dict):
pay_type = m.get("payType")
identifier = m.get("identifier")
if pay_type:
payment_methods.append(pay_type)
if identifier:
payment_method_ids.append(identifier)
# Safe float conversion helper
def safe_float(val, default=0.0, field_name=None):
if val is None:
return default
try:
return float(val)
except (ValueError, TypeError) as e:
if field_name:
logger.warning(f"Could not convert field '{field_name}' value {val!r} to float: {e}")
return default
# Safe int conversion helper
def safe_int(val, default=0, field_name=None):
if val is None:
return default
try:
return int(val)
except (ValueError, TypeError) as e:
if field_name:
logger.warning(f"Could not convert field '{field_name}' value {val!r} to int: {e}")
return default
# Convert createTime (milliseconds since epoch) to datetime
create_time_ms = adv.get("createTime")
if create_time_ms is not None:
try:
ad_created_at = datetime.fromtimestamp(safe_float(create_time_ms) / 1000.0, tz=timezone.utc)
except Exception as e:
logger.warning(f"Could not parse ad createTime {create_time_ms}: {e}")
ad_created_at = fetched_at
else:
ad_created_at = fetched_at
# Check for missing expected fields to log warnings, but don't fail
required_keys = ["advNo", "asset", "fiatUnit", "price"]
for key in required_keys:
if key not in adv:
logger.warning(f"Expected key '{key}' not found in 'adv' structure of ad: {raw_ad}")
if "userNo" not in adver:
logger.warning(f"Expected key 'userNo' not found in 'advertiser' structure of ad: {raw_ad}")
return {
"snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}",
"fetched_at": fetched_at,
"fetched_date": fetched_at.strftime("%Y-%m-%d"),
"trade_type": trade_type,
"adv_no": adv.get("advNo", ""),
"asset": adv.get("asset", "USDT"),
"fiat": adv.get("fiatUnit", "VES"),
"price": safe_float(adv.get("price"), 0.0, "price"),
"surplus_amount": safe_float(adv.get("surplusAmount"), 0.0, "surplusAmount"),
"min_amount": safe_float(adv.get("minSingleTransAmount"), 0.0, "minSingleTransAmount"),
"max_amount": safe_float(adv.get("maxSingleTransAmount"), 0.0, "maxSingleTransAmount"),
"tradable_quantity": safe_float(adv.get("tradableQuantity"), 0.0, "tradableQuantity"),
"advertiser_no": adver.get("userNo", ""),
"advertiser_name": adver.get("nickName", ""),
"advertiser_type": adver.get("userType", "user"),
"month_order_count": safe_int(adver.get("monthOrderCount"), 0, "monthOrderCount"),
"month_finish_rate": safe_float(adver.get("monthFinishRate"), 0.0, "monthFinishRate"),
"positive_rate": safe_float(adver.get("positiveRate"), 0.0, "positiveRate"),
"user_positive_rate": safe_float(adver.get("userPositiveRate"), 0.0, "userPositiveRate"),
"payment_methods": payment_methods,
"payment_method_ids": payment_method_ids,
"ad_created_at": ad_created_at,
"price_type": adv.get("priceType", "FIXED"),
}