Set up continuous P2P VES/USDT market history data collection, normalization, validation, and date-partitioned Parquet storage.
240 lines
10 KiB
Python
240 lines
10 KiB
Python
import os
|
||
import time
|
||
import signal
|
||
import logging
|
||
import traceback
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from binance_client import BinanceP2PClient, BinanceP2PError
|
||
from normalizer import normalize_ad
|
||
from validator import validate_row, validate_snapshot
|
||
from storage import store_parquet, read_checkpoint, write_checkpoint
|
||
from alert import write_alert_file
|
||
from utils import jitter, now_utc
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class P2PCollectorScheduler:
|
||
def __init__(self, config: dict):
|
||
self.config = config
|
||
self.running = True
|
||
|
||
# Configure directories
|
||
collection_cfg = config.get("collection", {})
|
||
self.output_dir = collection_cfg.get("output_dir", "./data/raw")
|
||
self.interval = collection_cfg.get("interval_seconds", 300)
|
||
self.pairs = collection_cfg.get("pairs", [{"asset": "USDT", "fiat": "VES"}])
|
||
|
||
# Checkpoint path: in the parent folder of output_dir (or same directory if raw)
|
||
# Spec says:
|
||
# data/
|
||
# ├── raw/
|
||
# └── checkpoint.json
|
||
# So we look at parent of output_dir
|
||
self.data_dir = str(Path(self.output_dir).parent)
|
||
self.checkpoint_path = os.path.join(self.data_dir, "checkpoint.json")
|
||
|
||
self.client = BinanceP2PClient(config)
|
||
self.consecutive_failures = 0
|
||
|
||
# Signal handlers
|
||
signal.signal(signal.SIGINT, self._handle_signal)
|
||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||
|
||
def _handle_signal(self, sig, frame):
|
||
logger.info(f"Received signal {sig}. Initiating graceful shutdown after current snapshot completes...")
|
||
self.running = False
|
||
|
||
def verify_directories(self):
|
||
"""Verifies that the output directory is writable by writing a test file and deleting it."""
|
||
os.makedirs(self.output_dir, exist_ok=True)
|
||
test_file = os.path.join(self.output_dir, ".write_test")
|
||
try:
|
||
with open(test_file, "w") as f:
|
||
f.write("test")
|
||
os.remove(test_file)
|
||
logger.info(f"Directory write verification passed for {self.output_dir}")
|
||
except Exception as e:
|
||
logger.critical(f"Directory verification failed on {self.output_dir}: {e}")
|
||
raise OSError(f"Output directory not writable: {e}")
|
||
|
||
def get_initial_wait_seconds(self) -> float:
|
||
"""Reads checkpoint to determine how long to wait before starting the loop."""
|
||
checkpoint = read_checkpoint(self.checkpoint_path)
|
||
last_completed = checkpoint.get("last_completed_snapshot")
|
||
if not last_completed:
|
||
return 0.0
|
||
|
||
try:
|
||
last_dt = datetime.fromisoformat(last_completed)
|
||
# Make sure timezone aware UTC
|
||
if last_dt.tzinfo is None:
|
||
last_dt = last_dt.replace(tzinfo=timezone.utc)
|
||
|
||
elapsed = (now_utc() - last_dt).total_seconds()
|
||
wait_time = self.interval - elapsed
|
||
if wait_time > 0:
|
||
logger.info(f"Resuming. Last snapshot completed {elapsed:.1f}s ago. Initial wait time: {wait_time:.1f}s.")
|
||
return wait_time
|
||
except Exception as e:
|
||
logger.warning(f"Error parsing last snapshot time from checkpoint: {e}. Starting immediately.")
|
||
|
||
return 0.0
|
||
|
||
def run_single_cycle(self) -> dict:
|
||
"""Runs a single snapshot collection cycle for all pairs."""
|
||
cycle_start_time = now_utc()
|
||
cycle_stats = {}
|
||
|
||
# We process each pair configured
|
||
for pair in self.pairs:
|
||
asset = pair.get("asset", "USDT")
|
||
fiat = pair.get("fiat", "VES")
|
||
|
||
start_ts = time.time()
|
||
|
||
# 1. Fetch raw advertisements
|
||
# Delay between trade types (snapshots) is 1 second
|
||
buy_raw = self.client.fetch_all_ads("BUY", asset, fiat)
|
||
time.sleep(1.0)
|
||
sell_raw = self.client.fetch_all_ads("SELL", asset, fiat)
|
||
|
||
# 2. Normalize and Filter individual rows
|
||
seen_adv_nos = set()
|
||
flat_buy = []
|
||
for ad in buy_raw:
|
||
norm = normalize_ad(ad, "BUY", cycle_start_time)
|
||
if validate_row(norm, self.config, seen_adv_nos):
|
||
flat_buy.append(norm)
|
||
|
||
flat_sell = []
|
||
for ad in sell_raw:
|
||
norm = normalize_ad(ad, "SELL", cycle_start_time)
|
||
if validate_row(norm, self.config, seen_adv_nos):
|
||
flat_sell.append(norm)
|
||
|
||
# 3. Snapshot-level Validation
|
||
combined_ads = flat_buy + flat_sell
|
||
validation_summary = validate_snapshot(combined_ads, cycle_start_time)
|
||
|
||
# 4. Storage (atomic writes to raw/buy_ads and raw/sell_ads)
|
||
buy_path = store_parquet(flat_buy, os.path.join(self.output_dir, "buy_ads"), cycle_start_time)
|
||
sell_path = store_parquet(flat_sell, os.path.join(self.output_dir, "sell_ads"), cycle_start_time)
|
||
|
||
elapsed = time.time() - start_ts
|
||
|
||
# 5. Snapshot Summary Log Line
|
||
# Format: 2026-06-05 13:30:00 UTC | BUY=47 ads [54.20–62.80] SELL=53 ads [58.00–68.50] | spread= -4.80 | took 3.2s | methods=[BANESCO,PAGO_MOVIL,MERCANTIL,...]
|
||
methods_str = ",".join(validation_summary["methods"])
|
||
logger.info(
|
||
f"{cycle_start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC | "
|
||
f"BUY={validation_summary['buy_count']} ads [{validation_summary['buy_min']:.2f}-{validation_summary['buy_max']:.2f}] "
|
||
f"SELL={validation_summary['sell_count']} ads [{validation_summary['sell_min']:.2f}-{validation_summary['sell_max']:.2f}] | "
|
||
f"spread={validation_summary['spread']:.2f} | took {elapsed:.1f}s | "
|
||
f"methods=[{methods_str}]"
|
||
)
|
||
|
||
cycle_stats[f"{asset}_{fiat}"] = {
|
||
"buy_count": validation_summary['buy_count'],
|
||
"sell_count": validation_summary['sell_count'],
|
||
"timestamp": cycle_start_time.isoformat()
|
||
}
|
||
|
||
return cycle_stats
|
||
|
||
def run(self, once: bool = False):
|
||
"""Starts the main execution loop."""
|
||
self.verify_directories()
|
||
|
||
if once:
|
||
logger.info("Executing a single collection cycle (--once)...")
|
||
try:
|
||
self.run_single_cycle()
|
||
logger.info("One-shot collection complete. Exiting.")
|
||
except Exception as e:
|
||
logger.error(f"Error occurred during one-shot collection: {e}")
|
||
logger.error(traceback.format_exc())
|
||
raise e
|
||
return
|
||
|
||
# Continuous loop startup
|
||
initial_wait = self.get_initial_wait_seconds()
|
||
if initial_wait > 0 and self.running:
|
||
logger.info(f"Sleeping for initial delay of {initial_wait:.1f}s...")
|
||
# Sleep in small steps to remain responsive to signals
|
||
step = 1.0
|
||
while initial_wait > 0 and self.running:
|
||
time.sleep(min(step, initial_wait))
|
||
initial_wait -= step
|
||
|
||
logger.info(f"Starting P2P data collector. Interval: {self.interval}s. Pairs: {self.pairs}")
|
||
|
||
while self.running:
|
||
cycle_start = now_utc()
|
||
try:
|
||
stats = self.run_single_cycle()
|
||
|
||
# Reset failure stats on success
|
||
self.consecutive_failures = 0
|
||
self.client.reset_429_backoff()
|
||
|
||
# Update checkpoint
|
||
checkpoint = read_checkpoint(self.checkpoint_path)
|
||
|
||
# Get stats for the primary pair (USDT_VES or first pair)
|
||
primary_pair_key = f"{self.pairs[0]['asset']}_{self.pairs[0]['fiat']}"
|
||
pair_stats = stats.get(primary_pair_key, {})
|
||
|
||
# Update stats in checkpoint
|
||
total_snapshots = checkpoint.get("total_snapshots", 0) + 1
|
||
first_snapshot = checkpoint.get("first_snapshot", cycle_start.isoformat())
|
||
|
||
checkpoint_data = {
|
||
"last_completed_snapshot": cycle_start.isoformat(),
|
||
"last_buy_ad_count": pair_stats.get("buy_count", 0),
|
||
"last_sell_ad_count": pair_stats.get("sell_count", 0),
|
||
"consecutive_failures": 0,
|
||
"total_snapshots": total_snapshots,
|
||
"first_snapshot": first_snapshot,
|
||
"version": "1.0"
|
||
}
|
||
write_checkpoint(self.checkpoint_path, checkpoint_data)
|
||
|
||
except Exception as e:
|
||
self.consecutive_failures += 1
|
||
tb_str = traceback.format_exc()
|
||
logger.error(f"Error during collection cycle (consecutive failures: {self.consecutive_failures}): {e}")
|
||
logger.error(tb_str)
|
||
|
||
# Write alert file after threshold
|
||
alert_threshold = self.config.get("alerts", {}).get("consecutive_failure_threshold", 5)
|
||
if self.consecutive_failures >= alert_threshold:
|
||
try:
|
||
write_alert_file(
|
||
self.config,
|
||
cycle_start,
|
||
str(e),
|
||
self.consecutive_failures,
|
||
tb_str
|
||
)
|
||
except Exception as alert_error:
|
||
logger.critical(f"Failed to write alert file: {alert_error}")
|
||
|
||
# Update checkpoint with failure status
|
||
checkpoint = read_checkpoint(self.checkpoint_path)
|
||
checkpoint["consecutive_failures"] = self.consecutive_failures
|
||
write_checkpoint(self.checkpoint_path, checkpoint)
|
||
|
||
# Determine sleep duration
|
||
if self.running:
|
||
sleep_sec = jitter(self.interval)
|
||
logger.info(f"Sleeping for {sleep_sec:.1f}s before next cycle...")
|
||
|
||
# Sleep in small steps to handle signal termination cleanly
|
||
step = 1.0
|
||
while sleep_sec > 0 and self.running:
|
||
time.sleep(min(step, sleep_sec))
|
||
sleep_sec -= step
|
||
|
||
logger.info("Collector has shut down gracefully.")
|