Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
99 lines
4.1 KiB
Python
99 lines
4.1 KiB
Python
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def normalize_ad(raw_ad: dict, trade_type: str, fetched_at: datetime) -> dict:
|
|
"""
|
|
Normalizes a single P2P ad dictionary from the Binance API response
|
|
into a flattened dictionary schema matching the spec.
|
|
"""
|
|
adv = raw_ad.get("adv", {})
|
|
adver = raw_ad.get("advertiser", {})
|
|
|
|
# Check if critical structures are missing
|
|
if not adv:
|
|
logger.warning("Ad structure 'adv' is missing in raw ad data.")
|
|
if not adver:
|
|
logger.warning("Advertiser structure 'advertiser' is missing in raw ad data.")
|
|
|
|
# Extract payment methods
|
|
trade_methods = adv.get("tradeMethods") or []
|
|
payment_methods = []
|
|
payment_method_ids = []
|
|
for m in trade_methods:
|
|
if isinstance(m, dict):
|
|
pay_type = m.get("payType")
|
|
identifier = m.get("identifier")
|
|
if pay_type:
|
|
payment_methods.append(pay_type)
|
|
if identifier:
|
|
payment_method_ids.append(identifier)
|
|
|
|
# Safe float conversion helper
|
|
def safe_float(val, default=0.0, field_name=None):
|
|
if val is None:
|
|
return default
|
|
try:
|
|
return float(val)
|
|
except (ValueError, TypeError) as e:
|
|
if field_name:
|
|
logger.warning(f"Could not convert field '{field_name}' value {val!r} to float: {e}")
|
|
return default
|
|
|
|
# Safe int conversion helper
|
|
def safe_int(val, default=0, field_name=None):
|
|
if val is None:
|
|
return default
|
|
try:
|
|
return int(val)
|
|
except (ValueError, TypeError) as e:
|
|
if field_name:
|
|
logger.warning(f"Could not convert field '{field_name}' value {val!r} to int: {e}")
|
|
return default
|
|
|
|
# Convert createTime (milliseconds since epoch) to datetime
|
|
create_time_ms = adv.get("createTime")
|
|
if create_time_ms is not None:
|
|
try:
|
|
ad_created_at = datetime.fromtimestamp(safe_float(create_time_ms) / 1000.0, tz=timezone.utc)
|
|
except Exception as e:
|
|
logger.warning(f"Could not parse ad createTime {create_time_ms}: {e}")
|
|
ad_created_at = fetched_at
|
|
else:
|
|
ad_created_at = fetched_at
|
|
|
|
# Check for missing expected fields to log warnings, but don't fail
|
|
required_keys = ["advNo", "asset", "fiatUnit", "price"]
|
|
for key in required_keys:
|
|
if key not in adv:
|
|
logger.warning(f"Expected key '{key}' not found in 'adv' structure of ad: {raw_ad}")
|
|
|
|
if "userNo" not in adver:
|
|
logger.warning(f"Expected key 'userNo' not found in 'advertiser' structure of ad: {raw_ad}")
|
|
|
|
return {
|
|
"snapshot_id": f"{fetched_at.strftime('%Y%m%dT%H%M%SZ')}_{trade_type}",
|
|
"fetched_at": fetched_at,
|
|
"fetched_date": fetched_at.strftime("%Y-%m-%d"),
|
|
"trade_type": trade_type,
|
|
"adv_no": adv.get("advNo", ""),
|
|
"asset": adv.get("asset", "USDT"),
|
|
"fiat": adv.get("fiatUnit", "VES"),
|
|
"price": safe_float(adv.get("price"), 0.0, "price"),
|
|
"surplus_amount": safe_float(adv.get("surplusAmount"), 0.0, "surplusAmount"),
|
|
"min_amount": safe_float(adv.get("minSingleTransAmount"), 0.0, "minSingleTransAmount"),
|
|
"max_amount": safe_float(adv.get("maxSingleTransAmount"), 0.0, "maxSingleTransAmount"),
|
|
"tradable_quantity": safe_float(adv.get("tradableQuantity"), 0.0, "tradableQuantity"),
|
|
"advertiser_no": adver.get("userNo", ""),
|
|
"advertiser_name": adver.get("nickName", ""),
|
|
"advertiser_type": adver.get("userType", "user"),
|
|
"month_order_count": safe_int(adver.get("monthOrderCount"), 0, "monthOrderCount"),
|
|
"month_finish_rate": safe_float(adver.get("monthFinishRate"), 0.0, "monthFinishRate"),
|
|
"positive_rate": safe_float(adver.get("positiveRate"), 0.0, "positiveRate"),
|
|
"user_positive_rate": safe_float(adver.get("userPositiveRate"), 0.0, "userPositiveRate"),
|
|
"payment_methods": payment_methods,
|
|
"payment_method_ids": payment_method_ids,
|
|
"ad_created_at": ad_created_at,
|
|
"price_type": adv.get("priceType", "FIXED"),
|
|
}
|