import time import logging import httpx logger = logging.getLogger(__name__) class BinanceP2PError(Exception): """Base exception for Binance P2P client operations.""" pass class RateLimitError(BinanceP2PError): """Raised when Binance P2P API returns HTTP 429 (Rate Limited).""" pass class APIError(BinanceP2PError): """Raised when the API returns a response with success=false or invalid structure.""" pass class BinanceP2PClient: def __init__(self, config: dict): binance_cfg = config.get("binance", {}) self.base_url = binance_cfg.get("base_url", "https://p2p.binance.com/bapi/c2c/v2/friendly/c2c/adv/search") self.user_agent = binance_cfg.get("user_agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36") self.timeout = binance_cfg.get("timeout_seconds", 15) self.max_pages = binance_cfg.get("max_pages", 10) self.page_delay = binance_cfg.get("request_delay_seconds", 0.5) self.headers = { "Content-Type": "application/json", "User-Agent": self.user_agent, "Accept": "*/*", "Origin": "https://p2p.binance.com", "Referer": "https://p2p.binance.com/" } # Retry config collection_cfg = config.get("collection", {}) self.max_retries = collection_cfg.get("retry_attempts", 3) self.retry_delay_base = collection_cfg.get("retry_delay_base_seconds", 10) # 429 Rate Limiting State self.current_429_backoff = 60 self.had_429_this_cycle = False def reset_429_backoff(self): """Resets the 429 backoff delay to its initial value (60s).""" if self.current_429_backoff != 60: logger.info("Resetting 429 rate limit backoff to 60s.") self.current_429_backoff = 60 def double_429_backoff(self): """Doubles the 429 backoff delay, capping at 480s.""" self.had_429_this_cycle = True logger.warning(f"Rate limited (429). Setting next backoff delay to {self.current_429_backoff}s.") time.sleep(self.current_429_backoff) self.current_429_backoff = min(self.current_429_backoff * 2, 480) def _post_request_with_retries(self, body: dict) -> dict: """ Executes a POST request to the Binance P2P API with retries for connection and 5xx errors, and special handling for 429 Rate Limits. """ for attempt in range(1, self.max_retries + 1): try: # Use httpx.Client for synchronous calls with httpx.Client(headers=self.headers, timeout=self.timeout) as client: resp = client.post(self.base_url, json=body) # Handle 429 specifically if resp.status_code == 429: self.double_429_backoff() raise RateLimitError("HTTP 429 Rate Limited by Binance.") # Handle 5xx server errors if 500 <= resp.status_code < 600: logger.warning(f"Binance P2P API returned HTTP {resp.status_code} (attempt {attempt}/{self.max_retries}). Retrying in 60s...") time.sleep(60) continue # Raise for other HTTP errors (4xx except 429) resp.raise_for_status() # Parse JSON data = resp.json() if not data.get("success"): raise APIError(f"API response success=false: {data}") return data except (httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout) as e: logger.warning(f"Connection error occurred: {e} (attempt {attempt}/{self.max_retries}). Retrying in 30s...") if attempt < self.max_retries: time.sleep(30) else: raise BinanceP2PError(f"Failed to connect after {self.max_retries} attempts: {e}") except httpx.HTTPStatusError as e: logger.error(f"HTTP Error {e.response.status_code}: {e.response.text}") raise BinanceP2PError(f"HTTP Status Error: {e}") raise BinanceP2PError("Failed to fetch P2P ads after maximum retries.") def fetch_all_ads(self, trade_type: str, asset: str, fiat: str) -> list: """ Fetches all P2P advertisements for a given trade type, asset, and fiat, handling pagination and page-level delays. """ all_ads = [] for page in range(1, self.max_pages + 1): body = { "asset": asset, "fiat": fiat, "tradeType": trade_type, "page": page, "rows": 20, "payTypes": [], "countries": [], "publisherType": None, "classify": "personal", "filter": {} } logger.info(f"Fetching {trade_type} page {page}/{self.max_pages} for {asset}/{fiat}...") data = self._post_request_with_retries(body) ads = data.get("data", []) total = data.get("total", 0) all_ads.extend(ads) logger.info(f"Retrieved {len(ads)} ads. Total collected so far: {len(all_ads)}/{total}") # Stop if we've collected all available ads if len(all_ads) >= total: break # Don't request a page that starts beyond total ads if page * 20 >= total: break # Delay between pages if page < self.max_pages: time.sleep(self.page_delay) return all_ads