import os import time import signal import logging import traceback from datetime import datetime, timezone from pathlib import Path from binance_client import BinanceP2PClient, BinanceP2PError from normalizer import normalize_ad from validator import validate_row, validate_snapshot from storage import store_parquet, read_checkpoint, write_checkpoint from alert import write_alert_file from utils import jitter, now_utc logger = logging.getLogger(__name__) class P2PCollectorScheduler: def __init__(self, config: dict): self.config = config self.running = True # Configure directories collection_cfg = config.get("collection", {}) self.output_dir = collection_cfg.get("output_dir", "./data/raw") self.interval = collection_cfg.get("interval_seconds", 300) self.pairs = collection_cfg.get("pairs", [{"asset": "USDT", "fiat": "VES"}]) # Checkpoint path: in the parent folder of output_dir (or same directory if raw) # Spec says: # data/ # ├── raw/ # └── checkpoint.json # So we look at parent of output_dir self.data_dir = str(Path(self.output_dir).parent) self.checkpoint_path = os.path.join(self.data_dir, "checkpoint.json") self.client = BinanceP2PClient(config) self.consecutive_failures = 0 # Signal handlers signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) def _handle_signal(self, sig, frame): logger.info(f"Received signal {sig}. Initiating graceful shutdown after current snapshot completes...") self.running = False def verify_directories(self): """Verifies that the output directory is writable by writing a test file and deleting it.""" os.makedirs(self.output_dir, exist_ok=True) test_file = os.path.join(self.output_dir, ".write_test") try: with open(test_file, "w") as f: f.write("test") os.remove(test_file) logger.info(f"Directory write verification passed for {self.output_dir}") except Exception as e: logger.critical(f"Directory verification failed on {self.output_dir}: {e}") raise OSError(f"Output directory not writable: {e}") def get_initial_wait_seconds(self) -> float: """Reads checkpoint to determine how long to wait before starting the loop.""" checkpoint = read_checkpoint(self.checkpoint_path) last_completed = checkpoint.get("last_completed_snapshot") if not last_completed: return 0.0 try: last_dt = datetime.fromisoformat(last_completed) # Make sure timezone aware UTC if last_dt.tzinfo is None: last_dt = last_dt.replace(tzinfo=timezone.utc) elapsed = (now_utc() - last_dt).total_seconds() wait_time = self.interval - elapsed if wait_time > 0: logger.info(f"Resuming. Last snapshot completed {elapsed:.1f}s ago. Initial wait time: {wait_time:.1f}s.") return wait_time except Exception as e: logger.warning(f"Error parsing last snapshot time from checkpoint: {e}. Starting immediately.") return 0.0 def run_single_cycle(self) -> dict: """Runs a single snapshot collection cycle for all pairs.""" cycle_start_time = now_utc() cycle_stats = {} # We process each pair configured for pair in self.pairs: asset = pair.get("asset", "USDT") fiat = pair.get("fiat", "VES") start_ts = time.time() # 1. Fetch raw advertisements # Delay between trade types (snapshots) is 1 second buy_raw = self.client.fetch_all_ads("BUY", asset, fiat) time.sleep(1.0) sell_raw = self.client.fetch_all_ads("SELL", asset, fiat) # 2. Normalize and Filter individual rows seen_adv_nos = set() flat_buy = [] for ad in buy_raw: norm = normalize_ad(ad, "BUY", cycle_start_time) if validate_row(norm, self.config, seen_adv_nos): flat_buy.append(norm) flat_sell = [] for ad in sell_raw: norm = normalize_ad(ad, "SELL", cycle_start_time) if validate_row(norm, self.config, seen_adv_nos): flat_sell.append(norm) # 3. Snapshot-level Validation combined_ads = flat_buy + flat_sell validation_summary = validate_snapshot(combined_ads, cycle_start_time) # 4. Storage (atomic writes to raw/buy_ads and raw/sell_ads) buy_path = store_parquet(flat_buy, os.path.join(self.output_dir, "buy_ads"), cycle_start_time) sell_path = store_parquet(flat_sell, os.path.join(self.output_dir, "sell_ads"), cycle_start_time) elapsed = time.time() - start_ts # 5. Snapshot Summary Log Line # Format: 2026-06-05 13:30:00 UTC | BUY=47 ads [54.20–62.80] SELL=53 ads [58.00–68.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...] methods_str = ",".join(validation_summary["methods"]) logger.info( f"{cycle_start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC | " f"BUY={validation_summary['buy_count']} ads [{validation_summary['buy_min']:.2f}-{validation_summary['buy_max']:.2f}] " f"SELL={validation_summary['sell_count']} ads [{validation_summary['sell_min']:.2f}-{validation_summary['sell_max']:.2f}] | " f"spread={validation_summary['spread']:.2f} | took {elapsed:.1f}s | " f"methods=[{methods_str}]" ) cycle_stats[f"{asset}_{fiat}"] = { "buy_count": validation_summary['buy_count'], "sell_count": validation_summary['sell_count'], "timestamp": cycle_start_time.isoformat() } return cycle_stats def run(self, once: bool = False): """Starts the main execution loop.""" self.verify_directories() if once: logger.info("Executing a single collection cycle (--once)...") try: self.run_single_cycle() logger.info("One-shot collection complete. Exiting.") except Exception as e: logger.error(f"Error occurred during one-shot collection: {e}") logger.error(traceback.format_exc()) raise e return # Continuous loop startup initial_wait = self.get_initial_wait_seconds() if initial_wait > 0 and self.running: logger.info(f"Sleeping for initial delay of {initial_wait:.1f}s...") # Sleep in small steps to remain responsive to signals step = 1.0 while initial_wait > 0 and self.running: time.sleep(min(step, initial_wait)) initial_wait -= step logger.info(f"Starting P2P data collector. Interval: {self.interval}s. Pairs: {self.pairs}") while self.running: cycle_start = now_utc() try: stats = self.run_single_cycle() # Reset failure stats on success self.consecutive_failures = 0 self.client.reset_429_backoff() # Update checkpoint checkpoint = read_checkpoint(self.checkpoint_path) # Get stats for the primary pair (USDT_VES or first pair) primary_pair_key = f"{self.pairs[0]['asset']}_{self.pairs[0]['fiat']}" pair_stats = stats.get(primary_pair_key, {}) # Update stats in checkpoint total_snapshots = checkpoint.get("total_snapshots", 0) + 1 first_snapshot = checkpoint.get("first_snapshot", cycle_start.isoformat()) checkpoint_data = { "last_completed_snapshot": cycle_start.isoformat(), "last_buy_ad_count": pair_stats.get("buy_count", 0), "last_sell_ad_count": pair_stats.get("sell_count", 0), "consecutive_failures": 0, "total_snapshots": total_snapshots, "first_snapshot": first_snapshot, "version": "1.0" } write_checkpoint(self.checkpoint_path, checkpoint_data) except Exception as e: self.consecutive_failures += 1 tb_str = traceback.format_exc() logger.error(f"Error during collection cycle (consecutive failures: {self.consecutive_failures}): {e}") logger.error(tb_str) # Write alert file after threshold alert_threshold = self.config.get("alerts", {}).get("consecutive_failure_threshold", 5) if self.consecutive_failures >= alert_threshold: try: write_alert_file( self.config, cycle_start, str(e), self.consecutive_failures, tb_str ) except Exception as alert_error: logger.critical(f"Failed to write alert file: {alert_error}") # Update checkpoint with failure status checkpoint = read_checkpoint(self.checkpoint_path) checkpoint["consecutive_failures"] = self.consecutive_failures write_checkpoint(self.checkpoint_path, checkpoint) # Determine sleep duration if self.running: sleep_sec = jitter(self.interval) logger.info(f"Sleeping for {sleep_sec:.1f}s before next cycle...") # Sleep in small steps to handle signal termination cleanly step = 1.0 while sleep_sec > 0 and self.running: time.sleep(min(step, sleep_sec)) sleep_sec -= step logger.info("Collector has shut down gracefully.")