binance-p2p-market-history/p2p-collector/binance_client.py
Gabriel Ramos 2c41a7a6b3 feat: implement binance p2p collector daemon
Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
2026-06-05 14:40:05 -04:00

144 lines
5.8 KiB
Python

import time
import logging
import httpx
logger = logging.getLogger(__name__)
class BinanceP2PError(Exception):
"""Base exception for Binance P2P client operations."""
pass
class RateLimitError(BinanceP2PError):
"""Raised when Binance P2P API returns HTTP 429 (Rate Limited)."""
pass
class APIError(BinanceP2PError):
"""Raised when the API returns a response with success=false or invalid structure."""
pass
class BinanceP2PClient:
def __init__(self, config: dict):
binance_cfg = config.get("binance", {})
self.base_url = binance_cfg.get("base_url", "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search")
self.user_agent = binance_cfg.get("user_agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36")
self.timeout = binance_cfg.get("timeout_seconds", 15)
self.max_pages = binance_cfg.get("max_pages", 10)
self.page_delay = binance_cfg.get("request_delay_seconds", 0.5)
self.headers = {
"Content-Type": "application/json",
"User-Agent": self.user_agent,
"Accept": "*/*",
"Origin": "https://p2p.binance.com",
"Referer": "https://p2p.binance.com/"
}
# Retry config
collection_cfg = config.get("collection", {})
self.max_retries = collection_cfg.get("retry_attempts", 3)
self.retry_delay_base = collection_cfg.get("retry_delay_base_seconds", 10)
# 429 Rate Limiting State
self.current_429_backoff = 60
self.had_429_this_cycle = False
def reset_429_backoff(self):
"""Resets the 429 backoff delay to its initial value (60s)."""
if self.current_429_backoff != 60:
logger.info("Resetting 429 rate limit backoff to 60s.")
self.current_429_backoff = 60
def double_429_backoff(self):
"""Doubles the 429 backoff delay, capping at 480s."""
self.had_429_this_cycle = True
logger.warning(f"Rate limited (429). Setting next backoff delay to {self.current_429_backoff}s.")
time.sleep(self.current_429_backoff)
self.current_429_backoff = min(self.current_429_backoff * 2, 480)
def _post_request_with_retries(self, body: dict) -> dict:
"""
Executes a POST request to the Binance P2P API with retries for connection
and 5xx errors, and special handling for 429 Rate Limits.
"""
for attempt in range(1, self.max_retries + 1):
try:
# Use httpx.Client for synchronous calls
with httpx.Client(headers=self.headers, timeout=self.timeout) as client:
resp = client.post(self.base_url, json=body)
# Handle 429 specifically
if resp.status_code == 429:
self.double_429_backoff()
raise RateLimitError("HTTP 429 Rate Limited by Binance.")
# Handle 5xx server errors
if 500 <= resp.status_code < 600:
logger.warning(f"Binance P2P API returned HTTP {resp.status_code} (attempt {attempt}/{self.max_retries}). Retrying in 60s...")
time.sleep(60)
continue
# Raise for other HTTP errors (4xx except 429)
resp.raise_for_status()
# Parse JSON
data = resp.json()
if not data.get("success"):
raise APIError(f"API response success=false: {data}")
return data
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
logger.warning(f"Connection error occurred: {e} (attempt {attempt}/{self.max_retries}). Retrying in 30s...")
if attempt < self.max_retries:
time.sleep(30)
else:
raise BinanceP2PError(f"Failed to connect after {self.max_retries} attempts: {e}")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}")
raise BinanceP2PError(f"HTTP Status Error: {e}")
raise BinanceP2PError("Failed to fetch P2P ads after maximum retries.")
def fetch_all_ads(self, trade_type: str, asset: str, fiat: str) -> list:
"""
Fetches all P2P advertisements for a given trade type, asset, and fiat,
handling pagination and page-level delays.
"""
all_ads = []
for page in range(1, self.max_pages + 1):
body = {
"asset": asset,
"fiat": fiat,
"tradeType": trade_type,
"page": page,
"rows": 20,
"payTypes": [],
"countries": [],
"publisherType": None,
"classify": "personal",
"filter": {}
}
logger.info(f"Fetching {trade_type} page {page}/{self.max_pages} for {asset}/{fiat}...")
data = self._post_request_with_retries(body)
ads = data.get("data", [])
total = data.get("total", 0)
all_ads.extend(ads)
logger.info(f"Retrieved {len(ads)} ads. Total collected so far: {len(all_ads)}/{total}")
# Stop if we've collected all available ads
if len(all_ads) >= total:
break
# Don't request a page that starts beyond total ads
if page * 20 >= total:
break
# Delay between pages
if page < self.max_pages:
time.sleep(self.page_delay)
return all_ads