binance-p2p-market-history/p2p-collector/collect_p2p.py
Gabriel Ramos 2c41a7a6b3 feat: implement binance p2p collector daemon
Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
2026-06-05 14:40:05 -04:00

176 lines
5.7 KiB
Python

import os
import argparse
import sys
import logging
import yaml
from pathlib import Path
import pandas as pd
from alert import setup_logging
from scheduler import P2PCollectorScheduler
logger = logging.getLogger("collect_p2p")
def load_config(config_path: str) -> dict:
"""Loads the YAML configuration file."""
if not os.path.exists(config_path):
print(f"Error: Config file not found at {config_path}", file=sys.stderr)
sys.exit(1)
try:
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except Exception as e:
print(f"Error parsing config file {config_path}: {e}", file=sys.stderr)
sys.exit(1)
def validate_parquet_files(path_str: str):
"""
Validates existing Parquet files at the specified path (file or directory).
Prints a report of the contents and validation status.
"""
path = Path(path_str)
if not path.exists():
print(f"Error: Path does not exist: {path_str}", file=sys.stderr)
sys.exit(1)
files = []
if path.is_file():
if path.suffix == ".parquet":
files.append(path)
elif path.is_dir():
files = list(path.glob("**/*.parquet"))
if not files:
print(f"No Parquet files found at {path_str}")
return
print(f"Validating {len(files)} Parquet file(s)...")
print("-" * 60)
total_rows = 0
total_buy = 0
total_sell = 0
critical_errors = []
warnings = []
expected_columns = {
"snapshot_id", "fetched_at", "fetched_date", "trade_type", "adv_no",
"asset", "fiat", "price", "surplus_amount", "min_amount", "max_amount",
"tradable_quantity", "advertiser_no", "advertiser_name", "advertiser_type",
"month_order_count", "month_finish_rate", "positive_rate", "user_positive_rate",
"payment_methods", "payment_method_ids", "ad_created_at", "price_type"
}
for f in sorted(files):
# Skip schema files unless they are specifically targeted
if f.name == "_schema.parquet" and len(files) > 1:
continue
try:
df = pd.read_parquet(f)
rows = len(df)
total_rows += rows
print(f"File: {f.name} ({rows} rows)")
# Check columns
cols = set(df.columns)
missing_cols = expected_columns - cols
extra_cols = cols - expected_columns
if missing_cols:
critical_errors.append(f"{f.name}: Missing expected columns: {missing_cols}")
if extra_cols:
warnings.append(f"{f.name}: Has extra columns: {extra_cols}")
# Analyze trade types
if "trade_type" in df.columns:
buy_cnt = (df["trade_type"] == "BUY").sum()
sell_cnt = (df["trade_type"] == "SELL").sum()
total_buy += buy_cnt
total_sell += sell_cnt
print(f" Trade types: BUY={buy_cnt}, SELL={sell_cnt}")
# Check critical nulls
critical_fields = ["price", "adv_no", "advertiser_no"]
for col in critical_fields:
if col in df.columns:
null_cnt = df[col].isnull().sum()
if null_cnt > 0:
critical_errors.append(f"{f.name}: Column '{col}' has {null_cnt} null values")
# Check types
if "price" in df.columns and not pd.api.types.is_float_dtype(df["price"]):
critical_errors.append(f"{f.name}: Column 'price' is not float type")
# Prices summary
if "price" in df.columns and rows > 0:
print(f" Price range: [{df['price'].min():.2f} - {df['price'].max():.2f}]")
# Payment methods
if "payment_methods" in df.columns and rows > 0:
methods = set()
# payment_methods could be list of lists/arrays
for item in df["payment_methods"]:
if isinstance(item, str):
methods.add(item)
elif hasattr(item, "__iter__"):
methods.update(item)
print(f" Payment methods ({len(methods)}): {sorted(list(methods))}")
except Exception as e:
critical_errors.append(f"{f.name}: Failed to read/validate: {e}")
print("=" * 60)
print("VALIDATION SUMMARY")
print("=" * 60)
print(f"Total files validated: {len(files)}")
print(f"Total rows: {total_rows}")
print(f"Total BUY ads: {total_buy}")
print(f"Total SELL ads: {total_sell}")
print("\nWarnings:")
if warnings:
for w in warnings:
print(f" - {w}")
else:
print(" None")
print("\nCritical Errors:")
if critical_errors:
for err in critical_errors:
print(f" - [FAIL] {err}")
sys.exit(1)
else:
print(" [PASS] No critical validation issues found!")
sys.exit(0)
def main():
parser = argparse.ArgumentParser(description="Binance P2P Data Collector")
parser.add_argument("--config", default="config.yaml", help="Path to config.yaml file")
parser.add_argument("--once", action="store_true", help="Run a single collection cycle and exit")
parser.add_argument("--validate", help="Path to Parquet file or directory to validate")
args = parser.parse_args()
# If validate-only mode requested
if args.validate:
validate_parquet_files(args.validate)
return
# Load configuration
config = load_config(args.config)
# Setup logging
setup_logging(config)
# Initialize scheduler
scheduler = P2PCollectorScheduler(config)
# Run scheduler
scheduler.run(once=args.once)
if __name__ == "__main__":
main()