Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
144 lines
5.8 KiB
Python
144 lines
5.8 KiB
Python
import time
|
|
import logging
|
|
import httpx
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BinanceP2PError(Exception):
|
|
"""Base exception for Binance P2P client operations."""
|
|
pass
|
|
|
|
class RateLimitError(BinanceP2PError):
|
|
"""Raised when Binance P2P API returns HTTP 429 (Rate Limited)."""
|
|
pass
|
|
|
|
class APIError(BinanceP2PError):
|
|
"""Raised when the API returns a response with success=false or invalid structure."""
|
|
pass
|
|
|
|
class BinanceP2PClient:
|
|
def __init__(self, config: dict):
|
|
binance_cfg = config.get("binance", {})
|
|
self.base_url = binance_cfg.get("base_url", "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search")
|
|
self.user_agent = binance_cfg.get("user_agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36")
|
|
self.timeout = binance_cfg.get("timeout_seconds", 15)
|
|
self.max_pages = binance_cfg.get("max_pages", 10)
|
|
self.page_delay = binance_cfg.get("request_delay_seconds", 0.5)
|
|
|
|
self.headers = {
|
|
"Content-Type": "application/json",
|
|
"User-Agent": self.user_agent,
|
|
"Accept": "*/*",
|
|
"Origin": "https://p2p.binance.com",
|
|
"Referer": "https://p2p.binance.com/"
|
|
}
|
|
|
|
# Retry config
|
|
collection_cfg = config.get("collection", {})
|
|
self.max_retries = collection_cfg.get("retry_attempts", 3)
|
|
self.retry_delay_base = collection_cfg.get("retry_delay_base_seconds", 10)
|
|
|
|
# 429 Rate Limiting State
|
|
self.current_429_backoff = 60
|
|
self.had_429_this_cycle = False
|
|
|
|
def reset_429_backoff(self):
|
|
"""Resets the 429 backoff delay to its initial value (60s)."""
|
|
if self.current_429_backoff != 60:
|
|
logger.info("Resetting 429 rate limit backoff to 60s.")
|
|
self.current_429_backoff = 60
|
|
|
|
def double_429_backoff(self):
|
|
"""Doubles the 429 backoff delay, capping at 480s."""
|
|
self.had_429_this_cycle = True
|
|
logger.warning(f"Rate limited (429). Setting next backoff delay to {self.current_429_backoff}s.")
|
|
time.sleep(self.current_429_backoff)
|
|
self.current_429_backoff = min(self.current_429_backoff * 2, 480)
|
|
|
|
def _post_request_with_retries(self, body: dict) -> dict:
|
|
"""
|
|
Executes a POST request to the Binance P2P API with retries for connection
|
|
and 5xx errors, and special handling for 429 Rate Limits.
|
|
"""
|
|
for attempt in range(1, self.max_retries + 1):
|
|
try:
|
|
# Use httpx.Client for synchronous calls
|
|
with httpx.Client(headers=self.headers, timeout=self.timeout) as client:
|
|
resp = client.post(self.base_url, json=body)
|
|
|
|
# Handle 429 specifically
|
|
if resp.status_code == 429:
|
|
self.double_429_backoff()
|
|
raise RateLimitError("HTTP 429 Rate Limited by Binance.")
|
|
|
|
# Handle 5xx server errors
|
|
if 500 <= resp.status_code < 600:
|
|
logger.warning(f"Binance P2P API returned HTTP {resp.status_code} (attempt {attempt}/{self.max_retries}). Retrying in 60s...")
|
|
time.sleep(60)
|
|
continue
|
|
|
|
# Raise for other HTTP errors (4xx except 429)
|
|
resp.raise_for_status()
|
|
|
|
# Parse JSON
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise APIError(f"API response success=false: {data}")
|
|
|
|
return data
|
|
|
|
except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e:
|
|
logger.warning(f"Connection error occurred: {e} (attempt {attempt}/{self.max_retries}). Retrying in 30s...")
|
|
if attempt < self.max_retries:
|
|
time.sleep(30)
|
|
else:
|
|
raise BinanceP2PError(f"Failed to connect after {self.max_retries} attempts: {e}")
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}")
|
|
raise BinanceP2PError(f"HTTP Status Error: {e}")
|
|
|
|
raise BinanceP2PError("Failed to fetch P2P ads after maximum retries.")
|
|
|
|
def fetch_all_ads(self, trade_type: str, asset: str, fiat: str) -> list:
|
|
"""
|
|
Fetches all P2P advertisements for a given trade type, asset, and fiat,
|
|
handling pagination and page-level delays.
|
|
"""
|
|
all_ads = []
|
|
|
|
for page in range(1, self.max_pages + 1):
|
|
body = {
|
|
"asset": asset,
|
|
"fiat": fiat,
|
|
"tradeType": trade_type,
|
|
"page": page,
|
|
"rows": 20,
|
|
"payTypes": [],
|
|
"countries": [],
|
|
"publisherType": None,
|
|
"classify": "personal",
|
|
"filter": {}
|
|
}
|
|
|
|
logger.info(f"Fetching {trade_type} page {page}/{self.max_pages} for {asset}/{fiat}...")
|
|
data = self._post_request_with_retries(body)
|
|
|
|
ads = data.get("data", [])
|
|
total = data.get("total", 0)
|
|
|
|
all_ads.extend(ads)
|
|
logger.info(f"Retrieved {len(ads)} ads. Total collected so far: {len(all_ads)}/{total}")
|
|
|
|
# Stop if we've collected all available ads
|
|
if len(all_ads) >= total:
|
|
break
|
|
|
|
# Don't request a page that starts beyond total ads
|
|
if page * 20 >= total:
|
|
break
|
|
|
|
# Delay between pages
|
|
if page < self.max_pages:
|
|
time.sleep(self.page_delay)
|
|
|
|
return all_ads
|